This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e9e49ff  [BEAM-13015] Use 20% of memory when the maximum has been 
configured. (#16420)
e9e49ff is described below

commit e9e49ff1fcb516350de7ad6affc80455373d7f38
Author: Lukasz Cwik <[email protected]>
AuthorDate: Tue Jan 4 20:00:47 2022 -0800

    [BEAM-13015] Use 20% of memory when the maximum has been configured. 
(#16420)
    
    * [BEAM-13015] Use 20% of memory when the maximum has been configured.
    
    The boot.go always sets the -Xmx value but we need a fallback incase used 
in different environments.
---
 .../apache/beam/sdk/options/SdkHarnessOptions.java | 54 +++++++++++++++++++---
 .../beam/sdk/options/SdkHarnessOptionsTest.java    | 48 ++++++++++++++++---
 2 files changed, 90 insertions(+), 12 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
index 4578873..022d521 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.util.InstanceBuilder;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.checkerframework.checker.index.qual.NonNegative;
 
 /** Options that are used to control configuration of the SDK harness. */
@@ -104,6 +105,10 @@ public interface SdkHarnessOptions extends PipelineOptions 
{
    * user state.
    *
    * <p>CAUTION: If set too large, SDK harness instances may run into OOM 
conditions more easily.
+   *
+   * <p>See {@link DefaultMaxCacheMemoryUsageMbFactory} for details on how 
{@link
+   * #getMaxCacheMemoryUsageMb() maxCacheMemoryUsageMb} is computed if this 
parameter is
+   * unspecified.
    */
   @Description(
       "The size (in MB) for the process wide cache within the SDK harness. The 
cache is responsible for "
@@ -116,11 +121,33 @@ public interface SdkHarnessOptions extends 
PipelineOptions {
   void setMaxCacheMemoryUsageMb(@NonNegative int value);
 
   /**
+   * Size (in % [0 - 100]) for the process wide cache within the SDK harness. 
The cache is
+   * responsible for storing all values which are cached within a bundle and 
across bundles such as
+   * side inputs and user state.
+   *
+   * <p>This parameter will only be used if an explicit value was not 
specified for {@link
+   * #getMaxCacheMemoryUsageMb() maxCacheMemoryUsageMb}.
+   */
+  @Description(
+      "The size (in % [0 - 100]) for the process wide cache within the SDK 
harness. The cache is responsible for "
+          + "storing all values which are cached within a bundle and across 
bundles such as side inputs "
+          + "and user state. CAUTION: If set too large, SDK harness instances 
may run into OOM conditions more easily.")
+  @Default.Float(20)
+  @NonNegative
+  float getMaxCacheMemoryUsagePercent();
+
+  void setMaxCacheMemoryUsagePercent(@NonNegative float value);
+
+  /**
    * An instance of this class will be used to specify the maximum amount of 
memory to allocate to a
    * cache within an SDK harness instance.
    *
    * <p>This parameter will only be used if an explicit value was not 
specified for {@link
    * #getMaxCacheMemoryUsageMb() maxCacheMemoryUsageMb}.
+   *
+   * <p>See {@link DefaultMaxCacheMemoryUsageMb} for details on how {@link
+   * #getMaxCacheMemoryUsageMb() maxCacheMemoryUsageMb} is computed if this 
parameter is
+   * unspecified.
    */
   @Description(
       "An instance of this class will be used to specify the maximum amount of 
memory to allocate to a "
@@ -131,8 +158,9 @@ public interface SdkHarnessOptions extends PipelineOptions {
   void setMaxCacheMemoryUsageMbClass(Class<? extends MaxCacheMemoryUsageMb> 
kls);
 
   /**
-   * A {@link DefaultValueFactory} which specifies the maximum amount of 
memory to allocate to the
-   * process wide cache within an SDK harness instance.
+   * A {@link DefaultValueFactory} which constructs an instance of the class 
specified by {@link
+   * #getMaxCacheMemoryUsageMbClass maxCacheMemoryUsageMbClass} to compute the 
maximum amount of
+   * memory to allocate to the process wide cache within an SDK harness 
instance.
    */
   class DefaultMaxCacheMemoryUsageMbFactory implements 
DefaultValueFactory<@NonNegative Integer> {
 
@@ -157,14 +185,28 @@ public interface SdkHarnessOptions extends 
PipelineOptions {
   /**
    * The default implementation which detects how much memory to use for a 
process wide cache.
    *
-   * <p>TODO(BEAM-13015): Detect the amount of memory to use instead of 
hard-coding to 100.
+   * <p>If the {@link Runtime} provides a maximum amount of memory (typically 
specified with {@code
+   * -Xmx} JVM argument), then {@link #getMaxCacheMemoryUsagePercent 
maxCacheMemoryUsagePercent}
+   * will be used to compute the upper bound as a percentage of the maximum 
amount of memory.
+   * Otherwise {@code 100} is returned.
    */
   class DefaultMaxCacheMemoryUsageMb implements MaxCacheMemoryUsageMb {
     @Override
     public int getMaxCacheMemoryUsage(PipelineOptions options) {
-      // TODO(BEAM-13015): Detect environment type and produce a value based 
upon the maximum amount
-      // of memory available.
-      return 100;
+      return getMaxCacheMemoryUsage(options, Runtime.getRuntime().maxMemory());
+    }
+
+    @VisibleForTesting
+    int getMaxCacheMemoryUsage(PipelineOptions options, long maxMemory) {
+      if (maxMemory == Long.MAX_VALUE) {
+        return 100;
+      }
+      float maxPercent = 
options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsagePercent();
+      if (maxPercent < 0 || maxPercent > 100) {
+        throw new IllegalArgumentException(
+            "--maxCacheMemoryUsagePercent must be between 0 and 100.");
+      }
+      return (int) (maxMemory / 1048576. * maxPercent / 100.);
     }
   }
 
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/SdkHarnessOptionsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/SdkHarnessOptionsTest.java
index c574495..9c8f70a 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/SdkHarnessOptionsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/SdkHarnessOptionsTest.java
@@ -19,14 +19,14 @@ package org.apache.beam.sdk.options;
 
 import static org.apache.beam.sdk.options.SdkHarnessOptions.LogLevel.WARN;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.beam.sdk.options.SdkHarnessOptions.DefaultMaxCacheMemoryUsageMb;
 import 
org.apache.beam.sdk.options.SdkHarnessOptions.SdkHarnessLogLevelOverrides;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -36,13 +36,49 @@ public class SdkHarnessOptionsTest {
   private static final ObjectMapper MAPPER =
       new ObjectMapper()
           
.registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
-  @Rule public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testDefaultMaxCacheMemoryUsageMbWhenRuntimeReturnsInvalidValue() 
{
+    assertEquals(
+        100,
+        new DefaultMaxCacheMemoryUsageMb()
+            .getMaxCacheMemoryUsage(PipelineOptionsFactory.create(), 
Long.MAX_VALUE));
+  }
+
+  @Test
+  public void testDefaultMaxCacheMemoryUsageMbWhenRuntimeReturnsValidValue() {
+    assertEquals(
+        20,
+        new DefaultMaxCacheMemoryUsageMb()
+            .getMaxCacheMemoryUsage(PipelineOptionsFactory.create(), 100 * 
1024 * 1024));
+  }
+
+  @Test
+  public void testDefaultMaxCacheMemoryUsageMbWhenInvalidPercentage() {
+    assertThrows(
+        "maxCacheMemoryUsagePercent must be between 0 and 100",
+        IllegalArgumentException.class,
+        () ->
+            new DefaultMaxCacheMemoryUsageMb()
+                .getMaxCacheMemoryUsage(
+                    
PipelineOptionsFactory.fromArgs("--maxCacheMemoryUsagePercent=-1").create(),
+                    100 * 1024 * 1024));
+    assertThrows(
+        "maxCacheMemoryUsagePercent must be between 0 and 100",
+        IllegalArgumentException.class,
+        () ->
+            new DefaultMaxCacheMemoryUsageMb()
+                .getMaxCacheMemoryUsage(
+                    
PipelineOptionsFactory.fromArgs("--maxCacheMemoryUsagePercent=101").create(),
+                    100 * 1024 * 1024));
+  }
 
   @Test
   public void testSdkHarnessLogLevelOverrideWithInvalidLogLevel() {
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage("Unsupported log level");
-    SdkHarnessLogLevelOverrides.from(ImmutableMap.of("Name", "FakeLevel"));
+    assertThrows(
+        "Unsupported log level",
+        IllegalArgumentException.class,
+        () -> SdkHarnessLogLevelOverrides.from(ImmutableMap.of("Name", 
"FakeLevel")));
   }
 
   @Test

Reply via email to