This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 65f31eb6e SAMZA-2763: Support worker JVM opts for Samza Beam portable 
mode (#1689)
65f31eb6e is described below

commit 65f31eb6e7da19a39b082635d20f730059aac8cb
Author: Bharath Kumarasubramanian <[email protected]>
AuthorDate: Mon Nov 20 14:02:51 2023 -0800

    SAMZA-2763: Support worker JVM opts for Samza Beam portable mode (#1689)
    
    Summary: Support JVM options for worker process in Samza Beam portable mode
    Description: With portable mode support for Samza Beam, we want to tune and 
configure the JVM options for worker process. In this PR, we add support by 
introducing worker.opts configuration and autosizing integration support.
    
    Changes:
    - Added worker.opts configuration
    - Add autosizing integration support for Xmx
    - Updated configuration table and website
    
    API Changes: None
    
    Usage Instructions: worker.opts can be used similar to other samza 
application configuration although it only applies to Samza Beam portable 
execution mode and is ignored otherwise.
    
    Upgrade Instructions: None
---
 .../versioned/jobs/configuration-table.html        | 15 ++++++
 .../java/org/apache/samza/config/JobConfig.java    |  1 +
 .../apache/samza/config/ShellCommandConfig.java    | 52 +++++++++++++-----
 .../org/apache/samza/job/ShellCommandBuilder.java  |  1 +
 .../samza/config/TestShellCommandConfig.java       | 63 +++++++++++++++++++++-
 .../apache/samza/job/TestShellCommandBuilder.java  |  3 ++
 6 files changed, 119 insertions(+), 16 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html 
b/docs/learn/documentation/versioned/jobs/configuration-table.html
index f4c8d4d7b..e00c983d8 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -2048,6 +2048,21 @@
                     </th>
                 </tr>
 
+                <tr>
+                    <td class="property" id="worker-opts">worker.opts</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        Any JVM options to include in the command line when 
executing worker process in portable execution of Samza using beam. For example,
+                        this can be used to set the JVM heap size, to tune the 
garbage collector, or to enable
+                        <a 
href="/learn/tutorials/{{site.version}}/remote-debugging-samza.html">remote 
debugging</a>.
+                        Anything you put in <code>worker.opts</code> gets 
forwarded directly to the commandline of worker process as part of the JVM 
invocation.
+                        <b>Note:</b> The configuration only applies for Samza 
Beam portable mode.
+                        <dl>
+                            <dt>Example: 
<code>worker.opts=-XX:+HeapDumpOnOutOfMemoryError 
-XX:+UseConcMarkSweepGC</code></dt>
+                        </dl>
+                    </td>
+                </tr>
+
                 <tr>
                     <td class="property" 
id="yarn-package-path">yarn.package.path</td>
                     <td class="default"></td>
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index b9aa82cf2..3d0b53262 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -158,6 +158,7 @@ public class JobConfig extends MapConfig {
   public static final String JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB = 
JOB_AUTOSIZING_CONFIG_PREFIX + "container.maxheap.mb";
   public static final String JOB_AUTOSIZING_CONTAINER_MEMORY_MB = 
JOB_AUTOSIZING_CONFIG_PREFIX + "container.memory.mb";
   public static final String JOB_AUTOSIZING_CONTAINER_MAX_CORES = 
JOB_AUTOSIZING_CONFIG_PREFIX + "container.cpu.cores";
+  public static final String JOB_AUTOSIZING_WORKER_MAX_HEAP_MB = 
JOB_AUTOSIZING_CONFIG_PREFIX + "worker.maxheap.mb";
 
   public static final String COORDINATOR_STREAM_FACTORY = 
"job.coordinatorstream.config.factory";
   public static final String DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY = 
"org.apache.samza.util.DefaultCoordinatorStreamConfigFactory";
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
index 73bcf8ec6..90780c81d 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
@@ -18,7 +18,9 @@
  */
 package org.apache.samza.config;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
 
 
 public class ShellCommandConfig extends MapConfig {
@@ -77,6 +79,7 @@ public class ShellCommandConfig extends MapConfig {
 
   public static final String COMMAND_SHELL_EXECUTE = "task.execute";
   public static final String TASK_JVM_OPTS = "task.opts";
+  public static final String WORKER_JVM_OPTS = "worker.opts";
   public static final String TASK_JAVA_HOME = "task.java.home";
 
   /**
@@ -97,20 +100,19 @@ public class ShellCommandConfig extends MapConfig {
   }
 
   public Optional<String> getTaskOpts() {
-    Optional<String> jvmOpts = 
Optional.ofNullable(get(ShellCommandConfig.TASK_JVM_OPTS));
-    Optional<String> maxHeapMbOptional = 
Optional.ofNullable(get(JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB));
-    if (new JobConfig(this).getAutosizingEnabled() && 
maxHeapMbOptional.isPresent()) {
-      String maxHeapMb = maxHeapMbOptional.get();
-      String xmxSetting = "-Xmx" + maxHeapMb + "m";
-      if (jvmOpts.isPresent() && jvmOpts.get().contains("-Xmx")) {
-        jvmOpts = Optional.of(jvmOpts.get().replaceAll("-Xmx\\S+", 
xmxSetting));
-      } else if (jvmOpts.isPresent()) {
-        jvmOpts = Optional.of(jvmOpts.get().concat(" " + xmxSetting));
-      } else {
-        jvmOpts = Optional.of(xmxSetting);
-      }
-    }
-    return jvmOpts;
+    String taskOpts = get(ShellCommandConfig.TASK_JVM_OPTS);
+    String autosizingContainerMaxHeap = 
get(JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB);
+
+    return Optional.ofNullable(getFinalJvmOptions(taskOpts, 
autosizingContainerMaxHeap));
+  }
+
+  /**
+   * Returns the worker opts for the application if available.
+   */
+  public Optional<String> getWorkerOpts() {
+    String autosizingWorkerHeapMb = 
get(JobConfig.JOB_AUTOSIZING_WORKER_MAX_HEAP_MB);
+    String workerOpts = get(ShellCommandConfig.WORKER_JVM_OPTS);
+    return Optional.ofNullable(getFinalJvmOptions(workerOpts, 
autosizingWorkerHeapMb));
   }
 
   public Optional<String> getJavaHome() {
@@ -120,4 +122,26 @@ public class ShellCommandConfig extends MapConfig {
   public Optional<String> getAdditionalClasspathDir() {
     return 
Optional.ofNullable(get(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR));
   }
+
+  /**
+   * Returns the final JVM options by applying the heap override if available 
to the jvm opts
+   */
+  @VisibleForTesting
+  String getFinalJvmOptions(String jvmOpts, String maxHeapOverride) {
+    String finalJvmOpts = jvmOpts;
+    if (new JobConfig(this).getAutosizingEnabled() && 
StringUtils.isNotEmpty(maxHeapOverride)) {
+      String xmxSetting = "-Xmx" + maxHeapOverride + "m";
+      if (StringUtils.isNotBlank(jvmOpts)) {
+        if (jvmOpts.contains("-Xmx")) {
+          finalJvmOpts = jvmOpts.replaceAll("-Xmx\\S+", xmxSetting);
+        } else {
+          finalJvmOpts = jvmOpts.concat(" " + xmxSetting);
+        }
+      } else {
+        finalJvmOpts = xmxSetting;
+      }
+    }
+
+    return finalJvmOpts;
+  }
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java 
b/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java
index 37253442a..504d9a26f 100644
--- a/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java
+++ b/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java
@@ -44,6 +44,7 @@ public class ShellCommandBuilder extends CommandBuilder {
     envBuilder.put(ShellCommandConfig.ENV_CONTAINER_ID, this.id);
     envBuilder.put(ShellCommandConfig.ENV_COORDINATOR_URL, 
this.url.toString());
     envBuilder.put(ShellCommandConfig.ENV_JAVA_OPTS, 
shellCommandConfig.getTaskOpts().orElse(""));
+    envBuilder.put(ShellCommandConfig.WORKER_JVM_OPTS, 
shellCommandConfig.getWorkerOpts().orElse(""));
     envBuilder.put(ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR,
         shellCommandConfig.getAdditionalClasspathDir().orElse(""));
     shellCommandConfig.getJavaHome().ifPresent(javaHome -> 
envBuilder.put(ShellCommandConfig.ENV_JAVA_HOME, javaHome));
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestShellCommandConfig.java 
b/samza-core/src/test/java/org/apache/samza/config/TestShellCommandConfig.java
index 452883d12..d5f8085ba 100644
--- 
a/samza-core/src/test/java/org/apache/samza/config/TestShellCommandConfig.java
+++ 
b/samza-core/src/test/java/org/apache/samza/config/TestShellCommandConfig.java
@@ -22,8 +22,7 @@ import java.util.Optional;
 import com.google.common.collect.ImmutableMap;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.*;
 
 
 public class TestShellCommandConfig {
@@ -81,6 +80,66 @@ public class TestShellCommandConfig {
     assertEquals(Optional.of("-Dproperty=value -Xmx1024m"), 
shellCommandConfig.getTaskOpts());
   }
 
+  @Test
+  public void testGetWorkerOptsAutosizingDisabled() {
+    ShellCommandConfig shellCommandConfig = new ShellCommandConfig(new 
MapConfig(
+        ImmutableMap.of(JobConfig.JOB_AUTOSIZING_WORKER_MAX_HEAP_MB,
+            "1024", "worker.opts", "-Xmx10m -Dproperty=value")));
+
+    String workerOpts = shellCommandConfig.getWorkerOpts()
+        .orElse(null);
+    String expectedOpts = "-Xmx10m -Dproperty=value";
+
+    assertNotNull(workerOpts);
+    assertEquals(expectedOpts, workerOpts);
+  }
+
+  @Test
+  public void testGetWorkerOptsAutosizingEnabled() {
+    ShellCommandConfig shellCommandConfig = new ShellCommandConfig(new 
MapConfig(
+        ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true", 
JobConfig.JOB_AUTOSIZING_WORKER_MAX_HEAP_MB,
+            "1024", "worker.opts", "-Xmx10m -Dproperty=value")));
+
+    String workerOpts = shellCommandConfig.getWorkerOpts()
+        .orElse(null);
+    String expectedOpts = "-Xmx1024m -Dproperty=value";
+
+    assertNotNull(workerOpts);
+    assertEquals(expectedOpts, workerOpts);
+  }
+
+  @Test
+  public void testGetFinalJvmOptionsAutosizingDisabled() {
+    ShellCommandConfig shellCommandConfig =
+        new ShellCommandConfig(new 
MapConfig(ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "false")));
+    String jvmOptions = "";
+    String expectedJvmOptions = "";
+
+    // no override passed
+    assertEquals(expectedJvmOptions, 
shellCommandConfig.getFinalJvmOptions(jvmOptions, ""));
+
+    // ignore override since autosizing is disabled
+    assertEquals(expectedJvmOptions, 
shellCommandConfig.getFinalJvmOptions(jvmOptions, "2048"));
+  }
+
+  @Test
+  public void testGetFinalJvmOptionsAutosizingEnabled() {
+    ShellCommandConfig shellCommandConfig =
+        new ShellCommandConfig(new 
MapConfig(ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true")));
+    String jvmOptions = "-Xmx1024m";
+    String expectedJvmOptions = "-Xmx1024m";
+    assertEquals(expectedJvmOptions, 
shellCommandConfig.getFinalJvmOptions(jvmOptions, ""));
+
+    // override should take effect with autosizing enabled
+    expectedJvmOptions = "-Xmx2048m";
+    assertEquals(expectedJvmOptions, 
shellCommandConfig.getFinalJvmOptions(jvmOptions, "2048"));
+
+    // override should take effect even if xmx is not set
+    jvmOptions = "-Dproperty=value";
+    expectedJvmOptions = "-Dproperty=value -Xmx2048m";
+    assertEquals(expectedJvmOptions, 
shellCommandConfig.getFinalJvmOptions(jvmOptions, "2048"));
+  }
+
   @Test
   public void testGetJavaHome() {
     ShellCommandConfig shellCommandConfig = new ShellCommandConfig(new 
MapConfig());
diff --git 
a/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java 
b/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java
index ca7be0e4a..afb6bfeec 100644
--- a/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java
+++ b/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java
@@ -45,6 +45,7 @@ public class TestShellCommandBuilder {
         ShellCommandConfig.ENV_CONTAINER_ID, "1",
         ShellCommandConfig.ENV_COORDINATOR_URL, URL_STRING,
         ShellCommandConfig.ENV_JAVA_OPTS, "",
+        ShellCommandConfig.WORKER_JVM_OPTS, "",
         ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "");
     // assertions when command path is not set
     assertEquals("foo", shellCommandBuilder.buildCommand());
@@ -60,6 +61,7 @@ public class TestShellCommandBuilder {
     Config config = new MapConfig(new ImmutableMap.Builder<String, String>()
         .put(ShellCommandConfig.COMMAND_SHELL_EXECUTE, "foo")
         .put(ShellCommandConfig.TASK_JVM_OPTS, "-Xmx4g")
+        .put(ShellCommandConfig.WORKER_JVM_OPTS, "-Xmx2g")
         .put(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR, 
"/path/to/additional/classpath")
         .put(ShellCommandConfig.TASK_JAVA_HOME, "/path/to/java/home")
         .build());
@@ -71,6 +73,7 @@ public class TestShellCommandBuilder {
         .put(ShellCommandConfig.ENV_CONTAINER_ID, "1")
         .put(ShellCommandConfig.ENV_COORDINATOR_URL, URL_STRING)
         .put(ShellCommandConfig.ENV_JAVA_OPTS, "-Xmx4g")
+        .put(ShellCommandConfig.WORKER_JVM_OPTS, "-Xmx2g")
         .put(ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, 
"/path/to/additional/classpath")
         .put(ShellCommandConfig.ENV_JAVA_HOME, "/path/to/java/home")
         .build();

Reply via email to