This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new d660871  SAMZA-2466: [Scala cleanup] Convert ShellCommandConfig from 
scala to java (#1288)
d660871 is described below

commit d6608714c211835c3b04caaaa49882455135158c
Author: Cameron Lee <[email protected]>
AuthorDate: Wed Mar 11 13:44:21 2020 -0700

    SAMZA-2466: [Scala cleanup] Convert ShellCommandConfig from scala to java 
(#1288)
    
    API changes: None
    Upgrade/usage instructions: None
---
 .../clustermanager/ClusterBasedJobCoordinator.java |   6 +-
 .../apache/samza/config/ShellCommandConfig.java}   |  89 +++++++++---------
 .../org/apache/samza/config/StreamConfig.java      |   0
 .../org/apache/samza/execution/JobPlanner.java     |   2 +-
 .../apache/samza/runtime/ContainerLaunchUtil.java  |   6 +-
 .../apache/samza/runtime/LocalContainerRunner.java |   6 +-
 .../org/apache/samza/job/ShellCommandBuilder.scala |  15 +--
 .../apache/samza/job/local/ThreadJobFactory.scala  |   9 +-
 .../org/apache/samza/config/TestJobConfig.java     |  39 --------
 .../samza/config/TestShellCommandConfig.java       | 103 +++++++++++++++++++++
 .../apache/samza/logging/log4j/StreamAppender.java |   2 +-
 .../samza/logging/log4j2/StreamAppender.java       |   2 +-
 .../samza/job/yarn/YarnClusterResourceManager.java |   4 +-
 .../org/apache/samza/job/yarn/TestYarnJob.java     |  30 +++---
 14 files changed, 193 insertions(+), 120 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index b56cb52..b9d054b 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -467,7 +467,7 @@ public class ClusterBasedJobCoordinator {
    */
   public static void main(String[] args) {
     boolean dependencyIsolationEnabled = Boolean.parseBoolean(
-        
System.getenv(ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED()));
+        
System.getenv(ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED));
     if (!dependencyIsolationEnabled) {
       // no isolation enabled, so can just execute 
runClusterBasedJobCoordinator directly
       runClusterBasedJobCoordinator(args);
@@ -537,8 +537,8 @@ public class ClusterBasedJobCoordinator {
    * {@link #main(String[])} so that it can be executed directly or from a 
separate classloader.
    */
   private static void runClusterBasedJobCoordinator(String[] args) {
-    final String coordinatorSystemEnv = 
System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG());
-    final String submissionEnv = 
System.getenv(ShellCommandConfig.ENV_SUBMISSION_CONFIG());
+    final String coordinatorSystemEnv = 
System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG);
+    final String submissionEnv = 
System.getenv(ShellCommandConfig.ENV_SUBMISSION_CONFIG);
 
     if (!StringUtils.isBlank(submissionEnv)) {
       Config submissionConfig;
diff --git 
a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala 
b/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
similarity index 60%
rename from 
samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
rename to 
samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
index 32da385..668c40a 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
+++ b/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
@@ -16,44 +16,46 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.samza.config;
 
-package org.apache.samza.config
+import java.util.Optional;
 
-object ShellCommandConfig {
+
+public class ShellCommandConfig extends MapConfig {
   /**
    * This environment variable is used to store a JSON serialized map of all 
coordinator system configs.
    */
-  val ENV_COORDINATOR_SYSTEM_CONFIG = "SAMZA_COORDINATOR_SYSTEM_CONFIG"
+  public static final String ENV_COORDINATOR_SYSTEM_CONFIG = 
"SAMZA_COORDINATOR_SYSTEM_CONFIG";
 
   /**
    * This environment variable is used to pass a JSON serialized map of 
configs provided during job submission.
    */
-  val ENV_SUBMISSION_CONFIG = "SAMZA_SUBMISSION_CONFIG"
+  public static final String ENV_SUBMISSION_CONFIG = "SAMZA_SUBMISSION_CONFIG";
 
   /**
    * The ID for a container. This is a string representation that is unique to 
the runtime environment.
    */
-  val ENV_CONTAINER_ID = "SAMZA_CONTAINER_ID"
+  public static final String ENV_CONTAINER_ID = "SAMZA_CONTAINER_ID";
 
   /**
    * The URL location of the job coordinator's HTTP server.
    */
-  val ENV_COORDINATOR_URL = "SAMZA_COORDINATOR_URL"
+  public static final String ENV_COORDINATOR_URL = "SAMZA_COORDINATOR_URL";
 
   /**
    * Arguments to be passed to the processing running the TaskRunner (or 
equivalent, for non JVM languages).
    */
-  val ENV_JAVA_OPTS = "JAVA_OPTS"
+  public static final String ENV_JAVA_OPTS = "JAVA_OPTS";
 
   /**
    * The JAVA_HOME path for running the task
    */
-  val ENV_JAVA_HOME = "JAVA_HOME"
+  public static final String ENV_JAVA_HOME = "JAVA_HOME";
 
   /**
    * The ID assigned to the container by the execution environment (eg: YARN 
Container Id)
    */
-  val ENV_EXECUTION_ENV_CONTAINER_ID = "EXECUTION_ENV_CONTAINER_ID"
+  public static final String ENV_EXECUTION_ENV_CONTAINER_ID = 
"EXECUTION_ENV_CONTAINER_ID";
 
   /**
    * Set to "true" if cluster-based job coordinator dependency isolation is 
enabled. Otherwise, will be considered
@@ -64,8 +66,8 @@ object ShellCommandConfig {
    * variable, because the value needs to be known before the full configs can 
be read from the metadata store (full
    * configs are only read after launch is complete).
    */
-  val ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED =
-    "CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED"
+  public static final String 
ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED =
+      "CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED";
 
   /**
    * When running the cluster-based job coordinator in an isolated mode, it 
uses JARs and resources from a lib directory
@@ -77,7 +79,7 @@ object ShellCommandConfig {
    * For example, this is used to set a system property for the location of an 
application-specified log4j configuration
    * file when launching the cluster-based job coordinator Java process.
    */
-  val ENV_APPLICATION_LIB_DIR = "APPLICATION_LIB_DIR"
+  public static final String ENV_APPLICATION_LIB_DIR = "APPLICATION_LIB_DIR";
 
   /*
    * The base directory for storing logged data stores used in Samza. This has 
to be set on all machine running Samza
@@ -85,21 +87,21 @@ object ShellCommandConfig {
    * If this environment variable is not set, the path defaults to current 
working directory (which is the same as the
    * path for persisting non-logged data stores)
    */
-  val ENV_LOGGED_STORE_BASE_DIR = "LOGGED_STORE_BASE_DIR"
+  public static final String ENV_LOGGED_STORE_BASE_DIR = 
"LOGGED_STORE_BASE_DIR";
 
   /**
    * The directory path that contains the execution plan
    */
-  val EXECUTION_PLAN_DIR = "EXECUTION_PLAN_DIR"
+  public static final String EXECUTION_PLAN_DIR = "EXECUTION_PLAN_DIR";
 
   /**
    * Points to the lib directory of the localized resources(other than the 
framework dependencies).
    */
-  val ENV_ADDITIONAL_CLASSPATH_DIR = "ADDITIONAL_CLASSPATH_DIR"
+  public static final String ENV_ADDITIONAL_CLASSPATH_DIR = 
"ADDITIONAL_CLASSPATH_DIR";
 
-  val COMMAND_SHELL_EXECUTE = "task.execute"
-  val TASK_JVM_OPTS = "task.opts"
-  val TASK_JAVA_HOME = "task.java.home"
+  public static final String COMMAND_SHELL_EXECUTE = "task.execute";
+  public static final String TASK_JVM_OPTS = "task.opts";
+  public static final String TASK_JAVA_HOME = "task.java.home";
 
   /**
    * SamzaContainer uses JARs from the lib directory of the framework in it 
classpath. In some cases, it is necessary to include
@@ -108,35 +110,38 @@ object ShellCommandConfig {
    * run-time before launching the SamzaContainer. This environment variable 
can be set to a lib directory of the localized resource and
    * it will be included in the java classpath of the SamzaContainer.
    */
-  val ADDITIONAL_CLASSPATH_DIR = "additional.classpath.dir"
-
-  implicit def Config2ShellCommand(config: Config) = new 
ShellCommandConfig(config)
-}
-
-class ShellCommandConfig(config: Config) extends ScalaMapConfig(config) {
-  def getCommand = 
getOption(ShellCommandConfig.COMMAND_SHELL_EXECUTE).getOrElse("bin/run-container.sh")
-
-  def getTaskOpts = {
-    var jvmOpts = getOption(ShellCommandConfig.TASK_JVM_OPTS)
-    val jobConfig = new JobConfig(config)
+  public static final String ADDITIONAL_CLASSPATH_DIR = 
"additional.classpath.dir";
 
-    if (jobConfig.getAutosizingEnabled && 
getOption(JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB).isDefined) {
+  public ShellCommandConfig(Config config) {
+    super(config);
+  }
 
-      val maxHeapMb = 
getOption(JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB).get
-      val xmxSetting = "-Xmx" + maxHeapMb + "m"
+  public String getCommand() {
+    return 
Optional.ofNullable(get(ShellCommandConfig.COMMAND_SHELL_EXECUTE)).orElse("bin/run-container.sh");
+  }
 
-      if (jvmOpts.isDefined && jvmOpts.get.contains("-Xmx"))
-        jvmOpts = Option(jvmOpts.get.replaceAll("-Xmx\\S+", xmxSetting))
-      else if (jvmOpts.isDefined)
-        jvmOpts = Option(jvmOpts.get.concat(" " + xmxSetting))
-      else
-        jvmOpts = Some(xmxSetting)
+  public Optional<String> getTaskOpts() {
+    Optional<String> jvmOpts = 
Optional.ofNullable(get(ShellCommandConfig.TASK_JVM_OPTS));
+    Optional<String> maxHeapMbOptional = 
Optional.ofNullable(get(JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB));
+    if (new JobConfig(this).getAutosizingEnabled() && 
maxHeapMbOptional.isPresent()) {
+      String maxHeapMb = maxHeapMbOptional.get();
+      String xmxSetting = "-Xmx" + maxHeapMb + "m";
+      if (jvmOpts.isPresent() && jvmOpts.get().contains("-Xmx")) {
+        jvmOpts = Optional.of(jvmOpts.get().replaceAll("-Xmx\\S+", 
xmxSetting));
+      } else if (jvmOpts.isPresent()) {
+        jvmOpts = Optional.of(jvmOpts.get().concat(" " + xmxSetting));
+      } else {
+        jvmOpts = Optional.of(xmxSetting);
+      }
     }
-
-    jvmOpts
+    return jvmOpts;
   }
 
-  def getJavaHome = getOption(ShellCommandConfig.TASK_JAVA_HOME)
+  public Optional<String> getJavaHome() {
+    return Optional.ofNullable(get(ShellCommandConfig.TASK_JAVA_HOME));
+  }
 
-  def getAdditionalClasspathDir(): Option[String] = 
getOption(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR)
+  public Optional<String> getAdditionalClasspathDir() {
+    return 
Optional.ofNullable(get(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR));
+  }
 }
diff --git 
a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/StreamConfig.java
similarity index 100%
rename from samza-core/src/main/scala/org/apache/samza/config/StreamConfig.java
rename to samza-core/src/main/java/org/apache/samza/config/StreamConfig.java
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java 
b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
index f8e0684..72c2972 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
@@ -111,7 +111,7 @@ public abstract class JobPlanner {
   final void writePlanJsonFile(String planJson) {
     try {
       String content = "plan='" + planJson + "'";
-      String planPath = System.getenv(ShellCommandConfig.EXECUTION_PLAN_DIR());
+      String planPath = System.getenv(ShellCommandConfig.EXECUTION_PLAN_DIR);
       if (planPath != null && !planPath.isEmpty()) {
         // Write the plan json to plan path
         File file = new File(planPath + "/plan.json");
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java 
b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index 54cb298..c68ec03 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -65,7 +65,7 @@ public class ContainerLaunchUtil {
    * Any change here needs to take Beam into account.
    */
   public static void run(ApplicationDescriptorImpl<? extends 
ApplicationDescriptor> appDesc,  String containerId, JobModel jobModel) {
-    Optional<String> execEnvContainerId = 
Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID()));
+    Optional<String> execEnvContainerId = 
Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID));
     JobConfig jobConfig = new JobConfig(jobModel.getConfig());
     ContainerLaunchUtil.run(appDesc, jobConfig.getName().get(), 
jobConfig.getJobId(), containerId, execEnvContainerId, jobModel);
   }
@@ -182,8 +182,8 @@ public class ContainerLaunchUtil {
    * @return a new {@link ContainerHeartbeatMonitor} instance, or null if 
could not create one
    */
   private static ContainerHeartbeatMonitor 
createContainerHeartbeatMonitor(SamzaContainer container) {
-    String coordinatorUrl = 
System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
-    String executionEnvContainerId = 
System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID());
+    String coordinatorUrl = 
System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
+    String executionEnvContainerId = 
System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID);
     if (executionEnvContainerId != null) {
       log.info("Got execution environment container id: {}", 
executionEnvContainerId);
       return new ContainerHeartbeatMonitor(() -> {
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 07cd637..f8d5e40 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -50,15 +50,15 @@ public class LocalContainerRunner {
           System.exit(1);
         }));
 
-    String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
+    String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID);
     log.info(String.format("Got container ID: %s", containerId));
     System.out.println(String.format("Container ID: %s", containerId));
 
-    String coordinatorUrl = 
System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
+    String coordinatorUrl = 
System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
     log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
     System.out.println(String.format("Coordinator URL: %s", coordinatorUrl));
 
-    Optional<String> execEnvContainerId = 
Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID()));
+    Optional<String> execEnvContainerId = 
Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID));
 
     int delay = new 
Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
     JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala 
b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
index 42c50e1..9b95648 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
@@ -23,25 +23,28 @@ package org.apache.samza.job
 import java.io.File
 
 import org.apache.samza.config.ShellCommandConfig
-import org.apache.samza.config.ShellCommandConfig.Config2ShellCommand
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
+
 import scala.collection.JavaConverters._
 
 class ShellCommandBuilder extends CommandBuilder {
   def buildCommand() = {
+    val shellCommandConfig = new ShellCommandConfig(config)
     if(commandPath == null || commandPath.isEmpty())
-      config.getCommand
+      shellCommandConfig.getCommand
     else
-      commandPath + File.separator +  config.getCommand
+      commandPath + File.separator +  shellCommandConfig.getCommand
   }
 
   def buildEnvironment(): java.util.Map[String, String] = {
+    val shellCommandConfig = new ShellCommandConfig(config)
     val envMap = Map(
       ShellCommandConfig.ENV_CONTAINER_ID -> id.toString,
       ShellCommandConfig.ENV_COORDINATOR_URL -> url.toString,
-      ShellCommandConfig.ENV_JAVA_OPTS -> config.getTaskOpts.getOrElse(""),
-      ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR -> 
config.getAdditionalClasspathDir.getOrElse(""))
+      ShellCommandConfig.ENV_JAVA_OPTS -> 
shellCommandConfig.getTaskOpts.orElse(""),
+      ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR -> 
shellCommandConfig.getAdditionalClasspathDir.orElse(""))
 
-    val envMapWithJavaHome = config.getJavaHome match {
+    val envMapWithJavaHome = 
JavaOptionals.toRichOptional(shellCommandConfig.getJavaHome).toOption match {
       case Some(javaHome) => envMap + (ShellCommandConfig.ENV_JAVA_HOME -> 
javaHome)
       case None => envMap
     }
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index deea95a..c1a3683 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -23,8 +23,7 @@ import org.apache.samza.SamzaException
 import org.apache.samza.application.ApplicationUtil
 import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
 import org.apache.samza.config.JobConfig._
-import org.apache.samza.config.ShellCommandConfig._
-import org.apache.samza.config.{Config, JobConfig}
+import org.apache.samza.config.{Config, JobConfig, ShellCommandConfig}
 import org.apache.samza.container.{SamzaContainer, SamzaContainerListener, 
TaskName}
 import org.apache.samza.context.{ExternalContext, JobContextImpl}
 import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, 
NamespaceAwareCoordinatorStreamStore}
@@ -38,6 +37,7 @@ import org.apache.samza.runtime.ProcessorContext
 import org.apache.samza.startpoint.StartpointManager
 import org.apache.samza.storage.ChangelogStreamManager
 import org.apache.samza.task.{TaskFactory, TaskFactoryUtil}
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 import org.apache.samza.util.{ConfigUtil, CoordinatorStreamUtil, 
DiagnosticsUtil, Logging}
 
 import scala.collection.JavaConversions._
@@ -110,9 +110,10 @@ class ThreadJobFactory extends StreamJobFactory with 
Logging {
     val taskFactory: TaskFactory[_] = TaskFactoryUtil.getTaskFactory(appDesc)
 
     // Give developers a nice friendly warning if they've specified task.opts 
and are using a threaded job.
-    config.getTaskOpts match {
+    JavaOptionals.toRichOptional(new 
ShellCommandConfig(config).getTaskOpts).toOption match {
       case Some(taskOpts) => warn("%s was specified in config, but is not 
being used because job is being executed with ThreadJob. " +
-        "You probably want to run %s=%s." format(TASK_JVM_OPTS, 
STREAM_JOB_FACTORY_CLASS, classOf[ProcessJobFactory].getName))
+        "You probably want to run %s=%s." 
format(ShellCommandConfig.TASK_JVM_OPTS, STREAM_JOB_FACTORY_CLASS,
+        classOf[ProcessJobFactory].getName))
       case _ => None
     }
 
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java 
b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index 9c3c97e..abe6dfa 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -33,7 +33,6 @@ import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStore
 import org.apache.samza.runtime.DefaultLocationIdProviderFactory;
 import org.junit.Assert;
 import org.junit.Test;
-import scala.Option;
 
 import static org.junit.Assert.*;
 
@@ -604,42 +603,4 @@ public class TestJobConfig {
     Assert.assertEquals(900, clusterManagerConfig.getContainerMemoryMb());
     Assert.assertEquals(2, clusterManagerConfig.getNumCores());
   }
-
-  @Test
-  public void testGetTaskOptsAutosizingDisabled() {
-    ShellCommandConfig shellCommandConfig =
-        new ShellCommandConfig(new 
MapConfig(ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "false")));
-    assertEquals(Option.empty(), shellCommandConfig.getTaskOpts());
-
-    String taskOpts = "-Dproperty=value";
-    shellCommandConfig = new ShellCommandConfig(new MapConfig(
-        ImmutableMap.of(ShellCommandConfig.TASK_JVM_OPTS(), taskOpts, 
JobConfig.JOB_AUTOSIZING_ENABLED, "false")));
-    assertEquals(Option.apply(taskOpts), shellCommandConfig.getTaskOpts());
-  }
-
-  @Test
-  public void testGetTaskOptsAutosizingEnabled() {
-    // opts not set, autosizing max heap not set
-    ShellCommandConfig shellCommandConfig =
-        new ShellCommandConfig(new 
MapConfig(ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true")));
-    assertEquals(Option.empty(), shellCommandConfig.getTaskOpts());
-
-    // opts set, autosizing max heap not set
-    String taskOpts = "-Dproperty=value";
-    shellCommandConfig = new ShellCommandConfig(new MapConfig(
-        ImmutableMap.of(ShellCommandConfig.TASK_JVM_OPTS(), taskOpts, 
JobConfig.JOB_AUTOSIZING_ENABLED, "true")));
-    assertEquals(Option.apply(taskOpts), shellCommandConfig.getTaskOpts());
-
-    // opts set with Xmx, autosizing max heap set
-    shellCommandConfig = new ShellCommandConfig(new MapConfig(
-        ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true", 
JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB,
-            "1024", "task.opts", "-Xmx10m -Dproperty=value")));
-    assertEquals(Option.apply("-Xmx1024m -Dproperty=value"), 
shellCommandConfig.getTaskOpts());
-
-    // opts set without -Xmx, autosizing max heap set
-    shellCommandConfig = new ShellCommandConfig(new MapConfig(
-        ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true", 
JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB,
-            "1024", "task.opts", "-Dproperty=value")));
-    assertEquals(Option.apply("-Dproperty=value -Xmx1024m"), 
shellCommandConfig.getTaskOpts());
-  }
 }
\ No newline at end of file
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestShellCommandConfig.java 
b/samza-core/src/test/java/org/apache/samza/config/TestShellCommandConfig.java
new file mode 100644
index 0000000..452883d
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/config/TestShellCommandConfig.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+import java.util.Optional;
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+
+public class TestShellCommandConfig {
+  @Test
+  public void testGetCommand() {
+    ShellCommandConfig shellCommandConfig = new ShellCommandConfig(new 
MapConfig());
+    assertEquals("bin/run-container.sh", shellCommandConfig.getCommand());
+
+    shellCommandConfig = new ShellCommandConfig(
+        new 
MapConfig(ImmutableMap.of(ShellCommandConfig.COMMAND_SHELL_EXECUTE, 
"my-run-container.sh")));
+    assertEquals("my-run-container.sh", shellCommandConfig.getCommand());
+  }
+
+  @Test
+  public void testGetTaskOptsAutosizingDisabled() {
+    ShellCommandConfig shellCommandConfig =
+        new ShellCommandConfig(new 
MapConfig(ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "false")));
+    assertEquals(Optional.empty(), shellCommandConfig.getTaskOpts());
+
+    String taskOpts = "-Dproperty=value";
+    shellCommandConfig = new ShellCommandConfig(new MapConfig(
+        ImmutableMap.of(ShellCommandConfig.TASK_JVM_OPTS, taskOpts, 
JobConfig.JOB_AUTOSIZING_ENABLED, "false")));
+    assertEquals(Optional.of(taskOpts), shellCommandConfig.getTaskOpts());
+  }
+
+  @Test
+  public void testGetTaskOptsAutosizingEnabled() {
+    // opts not set, autosizing max heap not set
+    ShellCommandConfig shellCommandConfig =
+        new ShellCommandConfig(new 
MapConfig(ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true")));
+    assertEquals(Optional.empty(), shellCommandConfig.getTaskOpts());
+
+    // opts set, autosizing max heap not set
+    String taskOpts = "-Dproperty=value";
+    shellCommandConfig = new ShellCommandConfig(new MapConfig(
+        ImmutableMap.of(ShellCommandConfig.TASK_JVM_OPTS, taskOpts, 
JobConfig.JOB_AUTOSIZING_ENABLED, "true")));
+    assertEquals(Optional.of(taskOpts), shellCommandConfig.getTaskOpts());
+
+    // opts not set, autosizing max heap set
+    shellCommandConfig = new ShellCommandConfig(new MapConfig(
+        ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true", 
JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB,
+            "1024")));
+    assertEquals(Optional.of("-Xmx1024m"), shellCommandConfig.getTaskOpts());
+
+    // opts set with Xmx, autosizing max heap set
+    shellCommandConfig = new ShellCommandConfig(new MapConfig(
+        ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true", 
JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB,
+            "1024", "task.opts", "-Xmx10m -Dproperty=value")));
+    assertEquals(Optional.of("-Xmx1024m -Dproperty=value"), 
shellCommandConfig.getTaskOpts());
+
+    // opts set without -Xmx, autosizing max heap set
+    shellCommandConfig = new ShellCommandConfig(new MapConfig(
+        ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true", 
JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB,
+            "1024", "task.opts", "-Dproperty=value")));
+    assertEquals(Optional.of("-Dproperty=value -Xmx1024m"), 
shellCommandConfig.getTaskOpts());
+  }
+
+  @Test
+  public void testGetJavaHome() {
+    ShellCommandConfig shellCommandConfig = new ShellCommandConfig(new 
MapConfig());
+    assertFalse(shellCommandConfig.getJavaHome().isPresent());
+
+    shellCommandConfig =
+        new ShellCommandConfig(new 
MapConfig(ImmutableMap.of(ShellCommandConfig.TASK_JAVA_HOME, 
"/location/java")));
+    assertEquals(Optional.of("/location/java"), 
shellCommandConfig.getJavaHome());
+  }
+
+  @Test
+  public void testGetAdditionalClasspathDir() {
+    ShellCommandConfig shellCommandConfig = new ShellCommandConfig(new 
MapConfig());
+    assertFalse(shellCommandConfig.getAdditionalClasspathDir().isPresent());
+
+    shellCommandConfig = new ShellCommandConfig(
+        new 
MapConfig(ImmutableMap.of(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR, 
"/location/classpath")));
+    assertEquals(Optional.of("/location/classpath"), 
shellCommandConfig.getAdditionalClasspathDir());
+  }
+}
\ No newline at end of file
diff --git 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index a039858..c885454 100644
--- 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -271,7 +271,7 @@ public class StreamAppender extends AppenderSkeleton {
       if (isApplicationMaster) {
         config = 
JobModelManager.currentJobModelManager().jobModel().getConfig();
       } else {
-        String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
+        String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
         String response = HttpUtil.read(new URL(url), 30000, new 
ExponentialSleepStrategy());
         config = SamzaObjectMapper.getObjectMapper().readValue(response, 
JobModel.class).getConfig();
       }
diff --git 
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
 
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
index f6fb4d9..e033376 100644
--- 
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
+++ 
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -292,7 +292,7 @@ public class StreamAppender extends AbstractAppender {
       if (isApplicationMaster) {
         config = 
JobModelManager.currentJobModelManager().jobModel().getConfig();
       } else {
-        String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
+        String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
         String response = HttpUtil.read(new URL(url), 30000, new 
ExponentialSleepStrategy());
         config = SamzaObjectMapper.getObjectMapper().readValue(response, 
JobModel.class).getConfig();
       }
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index eb97b69..e05b31e 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -300,7 +300,7 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
    */
   @Override
   public void launchStreamProcessor(SamzaResource resource, CommandBuilder 
builder) {
-    String processorId = 
builder.buildEnvironment().get(ShellCommandConfig.ENV_CONTAINER_ID());
+    String processorId = 
builder.buildEnvironment().get(ShellCommandConfig.ENV_CONTAINER_ID);
     String containerId = resource.getContainerId();
     String host = resource.getHost();
     log.info("Starting Processor ID: {} on Container ID: {} on host: {}", 
processorId, containerId, host);
@@ -599,7 +599,7 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
     String command = cmdBuilder.buildCommand();
 
     Map<String, String> env = getEscapedEnvironmentVariablesMap(cmdBuilder);
-    env.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), 
Util.envVarEscape(container.getId().toString()));
+    env.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID, 
Util.envVarEscape(container.getId().toString()));
 
     Path packagePath = new Path(yarnConfig.getPackagePath());
     String formattedCommand =
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
index daa719b..7961360 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
@@ -64,9 +64,9 @@ public class TestYarnJob {
     String expectedCoordinatorStreamConfigStringValue = 
Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
         
.writeValueAsString(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)));
     Map<String, String> expected = ImmutableMap.of(
-        ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG(), 
expectedCoordinatorStreamConfigStringValue,
-        ShellCommandConfig.ENV_JAVA_OPTS(), Util.envVarEscape(amJvmOptions),
-        
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED(),
 "false");
+        ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG, 
expectedCoordinatorStreamConfigStringValue,
+        ShellCommandConfig.ENV_JAVA_OPTS, Util.envVarEscape(amJvmOptions),
+        
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
 "false");
     assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
         YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new 
JobConfig(config))).asJava());
   }
@@ -83,10 +83,10 @@ public class TestYarnJob {
     String expectedCoordinatorStreamConfigStringValue = 
Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
         
.writeValueAsString(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)));
     Map<String, String> expected = ImmutableMap.of(
-        ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG(), 
expectedCoordinatorStreamConfigStringValue,
-        ShellCommandConfig.ENV_JAVA_OPTS(), "",
-        
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED(),
 "true",
-        ShellCommandConfig.ENV_APPLICATION_LIB_DIR(), "./__package/lib");
+        ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG, 
expectedCoordinatorStreamConfigStringValue,
+        ShellCommandConfig.ENV_JAVA_OPTS, "",
+        
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
 "true",
+        ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib");
     assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
         YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new 
JobConfig(config))).asJava());
   }
@@ -104,10 +104,10 @@ public class TestYarnJob {
     String expectedCoordinatorStreamConfigStringValue = 
Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
         
.writeValueAsString(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)));
     Map<String, String> expected = ImmutableMap.of(
-        ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG(), 
expectedCoordinatorStreamConfigStringValue,
-        ShellCommandConfig.ENV_JAVA_OPTS(), "",
-        
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED(),
 "false",
-        ShellCommandConfig.ENV_JAVA_HOME(), "/some/path/to/java/home");
+        ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG, 
expectedCoordinatorStreamConfigStringValue,
+        ShellCommandConfig.ENV_JAVA_OPTS, "",
+        
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
 "false",
+        ShellCommandConfig.ENV_JAVA_HOME, "/some/path/to/java/home");
     assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
         YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new 
JobConfig(config))).asJava());
   }
@@ -124,10 +124,10 @@ public class TestYarnJob {
     String expectedSubmissionConfig = 
Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
         .writeValueAsString(config));
     Map<String, String> expected = ImmutableMap.of(
-        ShellCommandConfig.ENV_SUBMISSION_CONFIG(), expectedSubmissionConfig,
-        ShellCommandConfig.ENV_JAVA_OPTS(), "",
-        
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED(),
 "true",
-        ShellCommandConfig.ENV_APPLICATION_LIB_DIR(), "./__package/lib");
+        ShellCommandConfig.ENV_SUBMISSION_CONFIG, expectedSubmissionConfig,
+        ShellCommandConfig.ENV_JAVA_OPTS, "",
+        
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
 "true",
+        ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib");
     assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
         YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new 
JobConfig(config))).asJava());
   }

Reply via email to