This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e9e49ff [BEAM-13015] Use 20% of memory when the maximum has been
configured. (#16420)
e9e49ff is described below
commit e9e49ff1fcb516350de7ad6affc80455373d7f38
Author: Lukasz Cwik <[email protected]>
AuthorDate: Tue Jan 4 20:00:47 2022 -0800
[BEAM-13015] Use 20% of memory when the maximum has been configured.
(#16420)
* [BEAM-13015] Use 20% of memory when the maximum has been configured.
The boot.go always sets the -Xmx value but we need a fallback incase used
in different environments.
---
.../apache/beam/sdk/options/SdkHarnessOptions.java | 54 +++++++++++++++++++---
.../beam/sdk/options/SdkHarnessOptionsTest.java | 48 ++++++++++++++++---
2 files changed, 90 insertions(+), 12 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
index 4578873..022d521 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
@@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.util.InstanceBuilder;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.index.qual.NonNegative;
/** Options that are used to control configuration of the SDK harness. */
@@ -104,6 +105,10 @@ public interface SdkHarnessOptions extends PipelineOptions
{
* user state.
*
* <p>CAUTION: If set too large, SDK harness instances may run into OOM
conditions more easily.
+ *
+ * <p>See {@link DefaultMaxCacheMemoryUsageMbFactory} for details on how
{@link
+ * #getMaxCacheMemoryUsageMb() maxCacheMemoryUsageMb} is computed if this
parameter is
+ * unspecified.
*/
@Description(
"The size (in MB) for the process wide cache within the SDK harness. The
cache is responsible for "
@@ -116,11 +121,33 @@ public interface SdkHarnessOptions extends
PipelineOptions {
void setMaxCacheMemoryUsageMb(@NonNegative int value);
/**
+ * Size (in % [0 - 100]) for the process wide cache within the SDK harness.
The cache is
+ * responsible for storing all values which are cached within a bundle and
across bundles such as
+ * side inputs and user state.
+ *
+ * <p>This parameter will only be used if an explicit value was not
specified for {@link
+ * #getMaxCacheMemoryUsageMb() maxCacheMemoryUsageMb}.
+ */
+ @Description(
+ "The size (in % [0 - 100]) for the process wide cache within the SDK
harness. The cache is responsible for "
+ + "storing all values which are cached within a bundle and across
bundles such as side inputs "
+ + "and user state. CAUTION: If set too large, SDK harness instances
may run into OOM conditions more easily.")
+ @Default.Float(20)
+ @NonNegative
+ float getMaxCacheMemoryUsagePercent();
+
+ void setMaxCacheMemoryUsagePercent(@NonNegative float value);
+
+ /**
* An instance of this class will be used to specify the maximum amount of
memory to allocate to a
* cache within an SDK harness instance.
*
* <p>This parameter will only be used if an explicit value was not
specified for {@link
* #getMaxCacheMemoryUsageMb() maxCacheMemoryUsageMb}.
+ *
+ * <p>See {@link DefaultMaxCacheMemoryUsageMb} for details on how {@link
+ * #getMaxCacheMemoryUsageMb() maxCacheMemoryUsageMb} is computed if this
parameter is
+ * unspecified.
*/
@Description(
"An instance of this class will be used to specify the maximum amount of
memory to allocate to a "
@@ -131,8 +158,9 @@ public interface SdkHarnessOptions extends PipelineOptions {
void setMaxCacheMemoryUsageMbClass(Class<? extends MaxCacheMemoryUsageMb>
kls);
/**
- * A {@link DefaultValueFactory} which specifies the maximum amount of
memory to allocate to the
- * process wide cache within an SDK harness instance.
+ * A {@link DefaultValueFactory} which constructs an instance of the class
specified by {@link
+ * #getMaxCacheMemoryUsageMbClass maxCacheMemoryUsageMbClass} to compute the
maximum amount of
+ * memory to allocate to the process wide cache within an SDK harness
instance.
*/
class DefaultMaxCacheMemoryUsageMbFactory implements
DefaultValueFactory<@NonNegative Integer> {
@@ -157,14 +185,28 @@ public interface SdkHarnessOptions extends
PipelineOptions {
/**
* The default implementation which detects how much memory to use for a
process wide cache.
*
- * <p>TODO(BEAM-13015): Detect the amount of memory to use instead of
hard-coding to 100.
+ * <p>If the {@link Runtime} provides a maximum amount of memory (typically
specified with {@code
+ * -Xmx} JVM argument), then {@link #getMaxCacheMemoryUsagePercent
maxCacheMemoryUsagePercent}
+ * will be used to compute the upper bound as a percentage of the maximum
amount of memory.
+ * Otherwise {@code 100} is returned.
*/
class DefaultMaxCacheMemoryUsageMb implements MaxCacheMemoryUsageMb {
@Override
public int getMaxCacheMemoryUsage(PipelineOptions options) {
- // TODO(BEAM-13015): Detect environment type and produce a value based
upon the maximum amount
- // of memory available.
- return 100;
+ return getMaxCacheMemoryUsage(options, Runtime.getRuntime().maxMemory());
+ }
+
+ @VisibleForTesting
+ int getMaxCacheMemoryUsage(PipelineOptions options, long maxMemory) {
+ if (maxMemory == Long.MAX_VALUE) {
+ return 100;
+ }
+ float maxPercent =
options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsagePercent();
+ if (maxPercent < 0 || maxPercent > 100) {
+ throw new IllegalArgumentException(
+ "--maxCacheMemoryUsagePercent must be between 0 and 100.");
+ }
+ return (int) (maxMemory / 1048576. * maxPercent / 100.);
}
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/SdkHarnessOptionsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/SdkHarnessOptionsTest.java
index c574495..9c8f70a 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/SdkHarnessOptionsTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/SdkHarnessOptionsTest.java
@@ -19,14 +19,14 @@ package org.apache.beam.sdk.options;
import static org.apache.beam.sdk.options.SdkHarnessOptions.LogLevel.WARN;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
import com.fasterxml.jackson.databind.ObjectMapper;
+import
org.apache.beam.sdk.options.SdkHarnessOptions.DefaultMaxCacheMemoryUsageMb;
import
org.apache.beam.sdk.options.SdkHarnessOptions.SdkHarnessLogLevelOverrides;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -36,13 +36,49 @@ public class SdkHarnessOptionsTest {
private static final ObjectMapper MAPPER =
new ObjectMapper()
.registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
- @Rule public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testDefaultMaxCacheMemoryUsageMbWhenRuntimeReturnsInvalidValue()
{
+ assertEquals(
+ 100,
+ new DefaultMaxCacheMemoryUsageMb()
+ .getMaxCacheMemoryUsage(PipelineOptionsFactory.create(),
Long.MAX_VALUE));
+ }
+
+ @Test
+ public void testDefaultMaxCacheMemoryUsageMbWhenRuntimeReturnsValidValue() {
+ assertEquals(
+ 20,
+ new DefaultMaxCacheMemoryUsageMb()
+ .getMaxCacheMemoryUsage(PipelineOptionsFactory.create(), 100 *
1024 * 1024));
+ }
+
+ @Test
+ public void testDefaultMaxCacheMemoryUsageMbWhenInvalidPercentage() {
+ assertThrows(
+ "maxCacheMemoryUsagePercent must be between 0 and 100",
+ IllegalArgumentException.class,
+ () ->
+ new DefaultMaxCacheMemoryUsageMb()
+ .getMaxCacheMemoryUsage(
+
PipelineOptionsFactory.fromArgs("--maxCacheMemoryUsagePercent=-1").create(),
+ 100 * 1024 * 1024));
+ assertThrows(
+ "maxCacheMemoryUsagePercent must be between 0 and 100",
+ IllegalArgumentException.class,
+ () ->
+ new DefaultMaxCacheMemoryUsageMb()
+ .getMaxCacheMemoryUsage(
+
PipelineOptionsFactory.fromArgs("--maxCacheMemoryUsagePercent=101").create(),
+ 100 * 1024 * 1024));
+ }
@Test
public void testSdkHarnessLogLevelOverrideWithInvalidLogLevel() {
- expectedException.expect(IllegalArgumentException.class);
- expectedException.expectMessage("Unsupported log level");
- SdkHarnessLogLevelOverrides.from(ImmutableMap.of("Name", "FakeLevel"));
+ assertThrows(
+ "Unsupported log level",
+ IllegalArgumentException.class,
+ () -> SdkHarnessLogLevelOverrides.from(ImmutableMap.of("Name",
"FakeLevel")));
}
@Test