This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 65f31eb6e SAMZA-2763: Support worker JVM opts for Samza Beam portable
mode (#1689)
65f31eb6e is described below
commit 65f31eb6e7da19a39b082635d20f730059aac8cb
Author: Bharath Kumarasubramanian <[email protected]>
AuthorDate: Mon Nov 20 14:02:51 2023 -0800
SAMZA-2763: Support worker JVM opts for Samza Beam portable mode (#1689)
Summary: Support JVM options for worker process in Samza Beam portable mode
Description: With portable mode support for Samza Beam, we want to tune and
configure the JVM options for worker process. In this PR, we add support by
introducing worker.opts configuration and autosizing integration support.
Changes:
- Added worker.opts configuration
- Add autosizing integration support for Xmx
- Updated configuration table and website
API Changes: None
Usage Instructions: worker.opts can be used similar to other samza
application configuration although it only applies to Samza Beam portable
execution mode and is ignored otherwise.
Upgrade Instructions: None
---
.../versioned/jobs/configuration-table.html | 15 ++++++
.../java/org/apache/samza/config/JobConfig.java | 1 +
.../apache/samza/config/ShellCommandConfig.java | 52 +++++++++++++-----
.../org/apache/samza/job/ShellCommandBuilder.java | 1 +
.../samza/config/TestShellCommandConfig.java | 63 +++++++++++++++++++++-
.../apache/samza/job/TestShellCommandBuilder.java | 3 ++
6 files changed, 119 insertions(+), 16 deletions(-)
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html
b/docs/learn/documentation/versioned/jobs/configuration-table.html
index f4c8d4d7b..e00c983d8 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -2048,6 +2048,21 @@
</th>
</tr>
+ <tr>
+ <td class="property" id="worker-opts">worker.opts</td>
+ <td class="default"></td>
+ <td class="description">
+ Any JVM options to include in the command line when
executing worker process in portable execution of Samza using beam. For example,
+ this can be used to set the JVM heap size, to tune the
garbage collector, or to enable
+ <a
href="/learn/tutorials/{{site.version}}/remote-debugging-samza.html">remote
debugging</a>.
+ Anything you put in <code>worker.opts</code> gets
forwarded directly to the commandline of worker process as part of the JVM
invocation.
+ <b>Note:</b> The configuration only applies for Samza
Beam portable mode.
+ <dl>
+ <dt>Example:
<code>worker.opts=-XX:+HeapDumpOnOutOfMemoryError
-XX:+UseConcMarkSweepGC</code></dt>
+ </dl>
+ </td>
+ </tr>
+
<tr>
<td class="property"
id="yarn-package-path">yarn.package.path</td>
<td class="default"></td>
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index b9aa82cf2..3d0b53262 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -158,6 +158,7 @@ public class JobConfig extends MapConfig {
public static final String JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB =
JOB_AUTOSIZING_CONFIG_PREFIX + "container.maxheap.mb";
public static final String JOB_AUTOSIZING_CONTAINER_MEMORY_MB =
JOB_AUTOSIZING_CONFIG_PREFIX + "container.memory.mb";
public static final String JOB_AUTOSIZING_CONTAINER_MAX_CORES =
JOB_AUTOSIZING_CONFIG_PREFIX + "container.cpu.cores";
+ public static final String JOB_AUTOSIZING_WORKER_MAX_HEAP_MB =
JOB_AUTOSIZING_CONFIG_PREFIX + "worker.maxheap.mb";
public static final String COORDINATOR_STREAM_FACTORY =
"job.coordinatorstream.config.factory";
public static final String DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY =
"org.apache.samza.util.DefaultCoordinatorStreamConfigFactory";
diff --git
a/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
b/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
index 73bcf8ec6..90780c81d 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
@@ -18,7 +18,9 @@
*/
package org.apache.samza.config;
+import com.google.common.annotations.VisibleForTesting;
import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
public class ShellCommandConfig extends MapConfig {
@@ -77,6 +79,7 @@ public class ShellCommandConfig extends MapConfig {
public static final String COMMAND_SHELL_EXECUTE = "task.execute";
public static final String TASK_JVM_OPTS = "task.opts";
+ public static final String WORKER_JVM_OPTS = "worker.opts";
public static final String TASK_JAVA_HOME = "task.java.home";
/**
@@ -97,20 +100,19 @@ public class ShellCommandConfig extends MapConfig {
}
public Optional<String> getTaskOpts() {
- Optional<String> jvmOpts =
Optional.ofNullable(get(ShellCommandConfig.TASK_JVM_OPTS));
- Optional<String> maxHeapMbOptional =
Optional.ofNullable(get(JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB));
- if (new JobConfig(this).getAutosizingEnabled() &&
maxHeapMbOptional.isPresent()) {
- String maxHeapMb = maxHeapMbOptional.get();
- String xmxSetting = "-Xmx" + maxHeapMb + "m";
- if (jvmOpts.isPresent() && jvmOpts.get().contains("-Xmx")) {
- jvmOpts = Optional.of(jvmOpts.get().replaceAll("-Xmx\\S+",
xmxSetting));
- } else if (jvmOpts.isPresent()) {
- jvmOpts = Optional.of(jvmOpts.get().concat(" " + xmxSetting));
- } else {
- jvmOpts = Optional.of(xmxSetting);
- }
- }
- return jvmOpts;
+ String taskOpts = get(ShellCommandConfig.TASK_JVM_OPTS);
+ String autosizingContainerMaxHeap =
get(JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB);
+
+ return Optional.ofNullable(getFinalJvmOptions(taskOpts,
autosizingContainerMaxHeap));
+ }
+
+ /**
+ * Returns the worker opts for the application if available.
+ */
+ public Optional<String> getWorkerOpts() {
+ String autosizingWorkerHeapMb =
get(JobConfig.JOB_AUTOSIZING_WORKER_MAX_HEAP_MB);
+ String workerOpts = get(ShellCommandConfig.WORKER_JVM_OPTS);
+ return Optional.ofNullable(getFinalJvmOptions(workerOpts,
autosizingWorkerHeapMb));
}
public Optional<String> getJavaHome() {
@@ -120,4 +122,26 @@ public class ShellCommandConfig extends MapConfig {
public Optional<String> getAdditionalClasspathDir() {
return
Optional.ofNullable(get(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR));
}
+
+ /**
+ * Returns the final JVM options by applying the heap override if available
to the jvm opts
+ */
+ @VisibleForTesting
+ String getFinalJvmOptions(String jvmOpts, String maxHeapOverride) {
+ String finalJvmOpts = jvmOpts;
+ if (new JobConfig(this).getAutosizingEnabled() &&
StringUtils.isNotEmpty(maxHeapOverride)) {
+ String xmxSetting = "-Xmx" + maxHeapOverride + "m";
+ if (StringUtils.isNotBlank(jvmOpts)) {
+ if (jvmOpts.contains("-Xmx")) {
+ finalJvmOpts = jvmOpts.replaceAll("-Xmx\\S+", xmxSetting);
+ } else {
+ finalJvmOpts = jvmOpts.concat(" " + xmxSetting);
+ }
+ } else {
+ finalJvmOpts = xmxSetting;
+ }
+ }
+
+ return finalJvmOpts;
+ }
}
diff --git
a/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java
b/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java
index 37253442a..504d9a26f 100644
--- a/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java
+++ b/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java
@@ -44,6 +44,7 @@ public class ShellCommandBuilder extends CommandBuilder {
envBuilder.put(ShellCommandConfig.ENV_CONTAINER_ID, this.id);
envBuilder.put(ShellCommandConfig.ENV_COORDINATOR_URL,
this.url.toString());
envBuilder.put(ShellCommandConfig.ENV_JAVA_OPTS,
shellCommandConfig.getTaskOpts().orElse(""));
+ envBuilder.put(ShellCommandConfig.WORKER_JVM_OPTS,
shellCommandConfig.getWorkerOpts().orElse(""));
envBuilder.put(ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR,
shellCommandConfig.getAdditionalClasspathDir().orElse(""));
shellCommandConfig.getJavaHome().ifPresent(javaHome ->
envBuilder.put(ShellCommandConfig.ENV_JAVA_HOME, javaHome));
diff --git
a/samza-core/src/test/java/org/apache/samza/config/TestShellCommandConfig.java
b/samza-core/src/test/java/org/apache/samza/config/TestShellCommandConfig.java
index 452883d12..d5f8085ba 100644
---
a/samza-core/src/test/java/org/apache/samza/config/TestShellCommandConfig.java
+++
b/samza-core/src/test/java/org/apache/samza/config/TestShellCommandConfig.java
@@ -22,8 +22,7 @@ import java.util.Optional;
import com.google.common.collect.ImmutableMap;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.*;
public class TestShellCommandConfig {
@@ -81,6 +80,66 @@ public class TestShellCommandConfig {
assertEquals(Optional.of("-Dproperty=value -Xmx1024m"),
shellCommandConfig.getTaskOpts());
}
+ @Test
+ public void testGetWorkerOptsAutosizingDisabled() {
+ ShellCommandConfig shellCommandConfig = new ShellCommandConfig(new
MapConfig(
+ ImmutableMap.of(JobConfig.JOB_AUTOSIZING_WORKER_MAX_HEAP_MB,
+ "1024", "worker.opts", "-Xmx10m -Dproperty=value")));
+
+ String workerOpts = shellCommandConfig.getWorkerOpts()
+ .orElse(null);
+ String expectedOpts = "-Xmx10m -Dproperty=value";
+
+ assertNotNull(workerOpts);
+ assertEquals(expectedOpts, workerOpts);
+ }
+
+ @Test
+ public void testGetWorkerOptsAutosizingEnabled() {
+ ShellCommandConfig shellCommandConfig = new ShellCommandConfig(new
MapConfig(
+ ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true",
JobConfig.JOB_AUTOSIZING_WORKER_MAX_HEAP_MB,
+ "1024", "worker.opts", "-Xmx10m -Dproperty=value")));
+
+ String workerOpts = shellCommandConfig.getWorkerOpts()
+ .orElse(null);
+ String expectedOpts = "-Xmx1024m -Dproperty=value";
+
+ assertNotNull(workerOpts);
+ assertEquals(expectedOpts, workerOpts);
+ }
+
+ @Test
+ public void testGetFinalJvmOptionsAutosizingDisabled() {
+ ShellCommandConfig shellCommandConfig =
+ new ShellCommandConfig(new
MapConfig(ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "false")));
+ String jvmOptions = "";
+ String expectedJvmOptions = "";
+
+ // no override passed
+ assertEquals(expectedJvmOptions,
shellCommandConfig.getFinalJvmOptions(jvmOptions, ""));
+
+ // ignore override since autosizing is disabled
+ assertEquals(expectedJvmOptions,
shellCommandConfig.getFinalJvmOptions(jvmOptions, "2048"));
+ }
+
+ @Test
+ public void testGetFinalJvmOptionsAutosizingEnabled() {
+ ShellCommandConfig shellCommandConfig =
+ new ShellCommandConfig(new
MapConfig(ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true")));
+ String jvmOptions = "-Xmx1024m";
+ String expectedJvmOptions = "-Xmx1024m";
+ assertEquals(expectedJvmOptions,
shellCommandConfig.getFinalJvmOptions(jvmOptions, ""));
+
+ // override should take effect with autosizing enabled
+ expectedJvmOptions = "-Xmx2048m";
+ assertEquals(expectedJvmOptions,
shellCommandConfig.getFinalJvmOptions(jvmOptions, "2048"));
+
+ // override should take effect even if xmx is not set
+ jvmOptions = "-Dproperty=value";
+ expectedJvmOptions = "-Dproperty=value -Xmx2048m";
+ assertEquals(expectedJvmOptions,
shellCommandConfig.getFinalJvmOptions(jvmOptions, "2048"));
+ }
+
@Test
public void testGetJavaHome() {
ShellCommandConfig shellCommandConfig = new ShellCommandConfig(new
MapConfig());
diff --git
a/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java
b/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java
index ca7be0e4a..afb6bfeec 100644
--- a/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java
+++ b/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java
@@ -45,6 +45,7 @@ public class TestShellCommandBuilder {
ShellCommandConfig.ENV_CONTAINER_ID, "1",
ShellCommandConfig.ENV_COORDINATOR_URL, URL_STRING,
ShellCommandConfig.ENV_JAVA_OPTS, "",
+ ShellCommandConfig.WORKER_JVM_OPTS, "",
ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "");
// assertions when command path is not set
assertEquals("foo", shellCommandBuilder.buildCommand());
@@ -60,6 +61,7 @@ public class TestShellCommandBuilder {
Config config = new MapConfig(new ImmutableMap.Builder<String, String>()
.put(ShellCommandConfig.COMMAND_SHELL_EXECUTE, "foo")
.put(ShellCommandConfig.TASK_JVM_OPTS, "-Xmx4g")
+ .put(ShellCommandConfig.WORKER_JVM_OPTS, "-Xmx2g")
.put(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR,
"/path/to/additional/classpath")
.put(ShellCommandConfig.TASK_JAVA_HOME, "/path/to/java/home")
.build());
@@ -71,6 +73,7 @@ public class TestShellCommandBuilder {
.put(ShellCommandConfig.ENV_CONTAINER_ID, "1")
.put(ShellCommandConfig.ENV_COORDINATOR_URL, URL_STRING)
.put(ShellCommandConfig.ENV_JAVA_OPTS, "-Xmx4g")
+ .put(ShellCommandConfig.WORKER_JVM_OPTS, "-Xmx2g")
.put(ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR,
"/path/to/additional/classpath")
.put(ShellCommandConfig.ENV_JAVA_HOME, "/path/to/java/home")
.build();