This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 4347057  SAMZA-2106: Samza app and job config refactor (#920)
4347057 is described below

commit 434705750a454d989f00a2f0afd15a2e20b4cf5b
Author: Sanil Jain <[email protected]>
AuthorDate: Wed Mar 6 17:50:38 2019 -0800

    SAMZA-2106: Samza app and job config refactor (#920)
---
 .../versioned/jobs/samza-configurations.md         |  4 +-
 .../StreamApplicationDescriptorImpl.java           |  8 +--
 .../stream/CoordinatorStreamWriter.java            |  6 ++-
 .../apache/samza/execution/ExecutionPlanner.java   |  8 ++-
 .../execution/JobNodeConfigurationGenerator.java   | 25 +++++++++
 .../org/apache/samza/execution/JobPlanner.java     | 42 ++++++++++++++-
 .../samza/runtime/RemoteApplicationRunner.java     |  7 +--
 .../apache/samza/checkpoint/CheckpointTool.scala   | 18 +++----
 .../TestStreamApplicationDescriptorImpl.java       | 24 ++++-----
 .../stream/TestCoordinatorStreamWriter.java        | 14 ++---
 .../TestJobNodeConfigurationGenerator.java         | 20 +++++++
 .../org/apache/samza/execution/TestJobPlanner.java | 62 ++++++++++++++++++++++
 .../samza/runtime/TestLocalApplicationRunner.java  |  8 +--
 .../samza/runtime/TestRemoteApplicationRunner.java |  2 +
 .../samza/checkpoint/TestCheckpointTool.scala      | 23 ++++----
 .../apache/samza/config/KafkaConsumerConfig.java   |  3 +-
 samza-test/src/main/config/join/checker.samza      |  4 +-
 samza-test/src/main/config/join/emitter.samza      |  4 +-
 samza-test/src/main/config/join/joiner.samza       |  4 +-
 samza-test/src/main/config/join/watcher.samza      |  4 +-
 .../main/config/standalone.failure.test.properties |  2 -
 .../apache/samza/test/framework/TestRunner.java    | 10 ++--
 .../StreamApplicationIntegrationTestHarness.java   |  2 +-
 .../benchmark/SystemConsumerWithSamzaBench.java    |  3 +-
 24 files changed, 231 insertions(+), 76 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md 
b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 757d552..4fcc87b 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -50,8 +50,8 @@ These are the basic properties for setting up a Samza 
application.
 |app.id|1|If you run several instances of your application at the same time, 
you need to give each instance a different app.id. This is important, since 
otherwise the applications will overwrite each others' checkpoints, and perhaps 
interfere with each other in other ways.|
 |app.class| |This is __required if running on YARN__. The application to run. 
The value is a fully-qualified Java classname, which must implement 
StreamApplication. A StreamApplication describes as a series of transformations 
on the streams.|
 |job.factory.class| |This is __required if running on YARN__. The job factory 
to use for running this job. <br> The value is a fully-qualified Java 
classname, which must implement StreamJobFactory.<br> Samza ships with three 
implementations:<br><br>`org.apache.samza.job.yarn.YarnJobFactory`<br>Runs your 
job on a YARN grid. See below for YARN-specific 
configuration.<br><br>`org.apache.samza.job.local.ThreadJobFactory`<br>__For 
dev deployments only.__ Runs your job on your local machine us [...]
-|job.name| |__Required:__ The name of your job. This name appears on the Samza 
dashboard, and it is used to tell apart this job's checkpoints from other jobs' 
checkpoints.|
-|job.id|1|If you run several instances of your job at the same time, you need 
to give each execution a different job.id. This is important, since otherwise 
the jobs will overwrite each others' checkpoints, and perhaps interfere with 
each other in other ways.|
+|job.name| |_(Deprecated in favor of app.name)_  The name of your job. This 
name appears on the Samza dashboard, and it is used to tell apart this job's 
checkpoints from other jobs' checkpoints.|
+|job.id|1|_(Deprecated in favor of app.id)_ If you run several instances of 
your job at the same time, you need to give each execution a different job.id. 
This is important, since otherwise the jobs will overwrite each others' 
checkpoints, and perhaps interfere with each other in other ways.|
 |job.default.system| |__Required:__ The system-name to use for creating input 
or output streams for which the system is not explicitly configured. This 
property will also be used as default for `job.coordinator.system`, 
`task.checkpoint.system` and `job.changelog.system` if none are defined.|
 |task.class| |Used for legacy purposes; replace with `app.class` in new jobs. 
The fully-qualified name of the Java class which processes incoming messages 
from input streams. The class must implement 
[StreamTask](../api/javadocs/org/apache/samza/task/StreamTask.html) or 
[AsyncStreamTask](../api/javadocs/org/apache/samza/task/AsyncStreamTask.html), 
and may optionally implement 
[InitableTask](../api/javadocs/org/apache/samza/task/InitableTask.html), 
[ClosableTask](../api/javadocs/org/apach [...]
 |job.host-affinity.enabled|false|This property indicates whether host-affinity 
is enabled or not. Host-affinity refers to the ability of Samza to request and 
allocate a container on the same host every time the job is deployed. When 
host-affinity is enabled, Samza makes a "best-effort" to honor the 
host-affinity constraint. The property 
`cluster-manager.container.request.timeout.ms` determines how long to wait 
before de-prioritizing the host-affinity constraint and assigning the containe 
[...]
diff --git 
a/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
index 2632f4b..956d34e 100644
--- 
a/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
@@ -30,8 +30,8 @@ import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
 import org.apache.samza.system.descriptors.InputDescriptor;
 import org.apache.samza.system.descriptors.OutputDescriptor;
 import org.apache.samza.operators.KV;
@@ -166,10 +166,10 @@ public class StreamApplicationDescriptorImpl extends 
ApplicationDescriptorImpl<S
       throw new SamzaException("Operator ID must not contain spaces or special 
characters: " + userDefinedId);
     }
 
-    JobConfig jobConfig = new JobConfig(getConfig());
+    ApplicationConfig applicationConfig = new ApplicationConfig(getConfig());
     String nextOpId = String.format("%s-%s-%s-%s",
-        jobConfig.getName().get(),
-        jobConfig.getJobId(),
+        applicationConfig.getAppName(),
+        applicationConfig.getAppId(),
         opCode.name().toLowerCase(),
         StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : 
String.valueOf(nextOpNum));
     if (!operatorIds.add(nextOpId)) {
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
index daca6a0..2e857f4 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
@@ -22,6 +22,7 @@ package org.apache.samza.coordinator.stream;
 import joptsimple.OptionSet;
 import org.apache.samza.config.Config;
 import org.apache.samza.coordinator.stream.messages.SetConfig;
+import org.apache.samza.execution.JobPlanner;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -114,12 +115,13 @@ public class CoordinatorStreamWriter {
   public static void main(String[] args) {
     CoordinatorStreamWriterCommandLine cmdline = new 
CoordinatorStreamWriterCommandLine();
     OptionSet options = cmdline.parser().parse(args);
-    Config config = cmdline.loadConfig(options);
+    Config userConfig = cmdline.loadConfig(options);
+    Config generatedConfig = JobPlanner.generateSingleJobConfig(userConfig);
     String type = cmdline.loadType(options);
     String key = cmdline.loadKey(options);
     String value = cmdline.loadValue(options);
 
-    CoordinatorStreamWriter writer = new CoordinatorStreamWriter(config);
+    CoordinatorStreamWriter writer = new 
CoordinatorStreamWriter(generatedConfig);
     writer.start();
     writer.sendMessage(type, key, value);
     writer.stop();
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java 
b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index 0c5e368..48b3f4b 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -43,6 +43,7 @@ import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
@@ -125,9 +126,12 @@ public class ExecutionPlanner {
 
     Set<TableDescriptor> tables = appDesc.getTableDescriptors();
 
+    // Generate job.id and job.name configs from app.id and app.name if defined
+    MapConfig generatedJobConfigs = JobPlanner.generateSingleJobConfig(config);
+    String jobName = generatedJobConfigs.get(JobConfig.JOB_NAME());
+    String jobId = generatedJobConfigs.get(JobConfig.JOB_ID(), "1");
+
     // For this phase, we have a single job node for the whole DAG
-    String jobName = config.get(JobConfig.JOB_NAME());
-    String jobId = config.get(JobConfig.JOB_ID(), "1");
     JobNode node = jobGraph.getOrCreateJobNode(jobName, jobId);
 
     // Add input streams
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
 
b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
index 761fb05..92acabd 100644
--- 
a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
@@ -31,6 +31,8 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.JobConfig;
@@ -69,7 +71,9 @@ import org.slf4j.LoggerFactory;
   static final String CONFIG_INTERNAL_EXECUTION_PLAN = 
"samza.internal.execution.plan";
 
   static Config mergeConfig(Map<String, String> originalConfig, Map<String, 
String> generatedConfig) {
+    validateJobConfigs(originalConfig, generatedConfig);
     Map<String, String> mergedConfig = new HashMap<>(generatedConfig);
+
     originalConfig.forEach((k, v) -> {
         if (generatedConfig.containsKey(k) && 
!Objects.equals(generatedConfig.get(k), v)) {
           LOG.info("Replacing generated config for key: {} value: {} with 
original config value: {}", k, generatedConfig.get(k), v);
@@ -80,6 +84,27 @@ import org.slf4j.LoggerFactory;
     return Util.rewriteConfig(new MapConfig(mergedConfig));
   }
 
+  static void validateJobConfigs(Map<String, String> originalConfig, 
Map<String, String> generatedConfig) {
+    String userConfiguredJobId = originalConfig.get(JobConfig.JOB_ID());
+    String userConfiguredJobName = originalConfig.get(JobConfig.JOB_NAME());
+    String generatedJobId = generatedConfig.get(JobConfig.JOB_ID());
+    String generatedJobName = generatedConfig.get(JobConfig.JOB_NAME());
+
+    if (generatedJobName != null && userConfiguredJobName != null && 
!StringUtils.equals(generatedJobName,
+        userConfiguredJobName)) {
+      throw new SamzaException(String.format(
+          "Generated job.name = %s from app.name = %s does not match user 
configured job.name = %s, please configure job.name same as app.name",
+          generatedJobName, originalConfig.get(ApplicationConfig.APP_NAME), 
userConfiguredJobName));
+    }
+
+    if (generatedJobId != null && userConfiguredJobId != null && 
!StringUtils.equals(generatedJobId,
+        userConfiguredJobId)) {
+      throw new SamzaException(String.format(
+          "Generated job.id = %s from app.id = %s does not match user 
configured job.id = %s, please configure job.id same as app.id",
+          generatedJobId, originalConfig.get(ApplicationConfig.APP_ID), 
userConfiguredJobId));
+    }
+  }
+
   JobConfig generateJobConfig(JobNode jobNode, String executionPlanJson) {
     if (jobNode.isLegacyTaskApplication()) {
       return new JobConfig(jobNode.getConfig());
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java 
b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
index b569437..f495870 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
@@ -26,12 +26,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
 import org.apache.samza.application.descriptors.ApplicationDescriptor;
 import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.application.LegacyTaskApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.StreamConfig;
 import org.apache.samza.config.TaskConfig;
@@ -82,7 +84,6 @@ public abstract class JobPlanner {
             userConfig.get(TaskConfig.INPUT_STREAMS()));
         allowedUserConfig.remove(TaskConfig.INPUT_STREAMS());
       }
-
       generatedConfig.putAll(getGeneratedConfig(runId));
     }
 
@@ -155,4 +156,43 @@ public abstract class JobPlanner {
         systemStreamConfigs.put(JobConfig.JOB_DEFAULT_SYSTEM(), 
dsd.getSystemName()));
     return systemStreamConfigs;
   }
+
+  /**
+   * Generates configs for a single job in app, job.id from app.id and 
job.name from app.name config
+   * If both job.id and app.id is defined, app.id takes precedence and job.id 
is set to value of app.id
+   * If both job.name and app.name is defined, app.name takes precedence and 
job.name is set to value of app.name
+   *
+   * @param userConfigs configs passed from user
+   *
+   */
+  public static MapConfig generateSingleJobConfig(Map<String, String> 
userConfigs) {
+    Map<String, String> generatedConfig = new HashMap<>(userConfigs);
+
+    if (!userConfigs.containsKey(JobConfig.JOB_NAME()) && 
!userConfigs.containsKey(ApplicationConfig.APP_NAME)) {
+      throw new SamzaException("Samza app name should not be null, Please set 
either app.name (preferred) or job.name (deprecated) in configs");
+    }
+
+    if (userConfigs.containsKey(JobConfig.JOB_ID())) {
+      LOG.warn("{} is a deprecated configuration, use app.id instead.", 
JobConfig.JOB_ID());
+    }
+
+    if (userConfigs.containsKey(JobConfig.JOB_NAME())) {
+      LOG.warn("{} is a deprecated configuration, use use app.name instead.", 
JobConfig.JOB_NAME());
+    }
+
+    if (userConfigs.containsKey(ApplicationConfig.APP_NAME)) {
+      String appName =  userConfigs.get(ApplicationConfig.APP_NAME);
+      LOG.info("app.name is defined, generating job.name equal to app.name 
value: {}", appName);
+      generatedConfig.put(JobConfig.JOB_NAME(), appName);
+    }
+
+    if (userConfigs.containsKey(ApplicationConfig.APP_ID)) {
+      String appId =  userConfigs.get(ApplicationConfig.APP_ID);
+      LOG.info("app.id is defined, generating job.id equal to app.name value: 
{}", appId);
+      generatedConfig.put(JobConfig.JOB_ID(), appId);
+    }
+
+    return new MapConfig(generatedConfig);
+  }
+
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
 
b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
index da4fab2..9969b9d 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -29,6 +29,7 @@ import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.context.ExternalContext;
+import org.apache.samza.execution.JobPlanner;
 import org.apache.samza.execution.RemoteJobPlanner;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.JobRunner;
@@ -84,7 +85,7 @@ public class RemoteApplicationRunner implements 
ApplicationRunner {
     // since currently we only support single actual remote job, we can get 
its status without
     // building the execution plan.
     try {
-      JobConfig jc = new JobConfig(appDesc.getConfig());
+      JobConfig jc = new 
JobConfig(JobPlanner.generateSingleJobConfig(appDesc.getConfig()));
       LOG.info("Killing job {}", jc.getName());
       JobRunner runner = new JobRunner(jc);
       runner.kill();
@@ -98,7 +99,7 @@ public class RemoteApplicationRunner implements 
ApplicationRunner {
     // since currently we only support single actual remote job, we can get 
its status without
     // building the execution plan
     try {
-      JobConfig jc = new JobConfig(appDesc.getConfig());
+      JobConfig jc = new 
JobConfig(JobPlanner.generateSingleJobConfig(appDesc.getConfig()));
       return getApplicationStatus(jc);
     } catch (Throwable t) {
       throw new SamzaException("Failed to get status for application", t);
@@ -112,7 +113,7 @@ public class RemoteApplicationRunner implements 
ApplicationRunner {
 
   @Override
   public boolean waitForFinish(Duration timeout) {
-    JobConfig jobConfig = new JobConfig(appDesc.getConfig());
+    JobConfig jobConfig = new 
JobConfig(JobPlanner.generateSingleJobConfig(appDesc.getConfig()));
     boolean finished = true;
     long timeoutInMs = timeout.toMillis();
     long startTimeInMs = System.currentTimeMillis();
diff --git 
a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index bb3b04d..25f7a17 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -20,29 +20,26 @@
 package org.apache.samza.checkpoint
 
 import java.net.URI
+import java.util
 import java.util.regex.Pattern
 
 import joptsimple.ArgumentAcceptingOptionSpec
 import joptsimple.OptionSet
 import org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap
 import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.Config
-import org.apache.samza.config.ConfigRewriter
-import org.apache.samza.config.JobConfig
-import org.apache.samza.config.MapConfig
+import org.apache.samza.config._
 import org.apache.samza.container.TaskName
-import org.apache.samza.job.JobRunner._
+import org.apache.samza.job.JobRunner.{info, warn, _}
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.CommandLine
-import org.apache.samza.util.Logging
-import org.apache.samza.util.Util
+import org.apache.samza.util.{CommandLine, Logging, Util}
 import org.apache.samza.Partition
 import org.apache.samza.SamzaException
 
 import scala.collection.JavaConverters._
 import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.coordinator.stream.CoordinatorStreamManager
+import org.apache.samza.execution.JobPlanner
 import org.apache.samza.storage.ChangelogStreamManager
 
 import scala.collection.mutable.ListBuffer
@@ -152,8 +149,9 @@ object CheckpointTool {
   def main(args: Array[String]) {
     val cmdline = new CheckpointToolCommandLine
     val options = cmdline.parser.parse(args: _*)
-    val config = cmdline.loadConfig(options)
-    val rewrittenConfig = rewriteConfig(new JobConfig(config))
+    val userConfig = cmdline.loadConfig(options)
+    val jobConfig = JobPlanner.generateSingleJobConfig(userConfig)
+    val rewrittenConfig = rewriteConfig(new JobConfig(jobConfig))
     info(s"Using the rewritten config: $rewrittenConfig")
     val tool = CheckpointTool(rewrittenConfig, cmdline.newOffsets)
     tool.run()
diff --git 
a/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
 
b/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
index 141d31b..d1fab98 100644
--- 
a/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
@@ -28,8 +28,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.context.ApplicationContainerContextFactory;
 import org.apache.samza.context.ApplicationTaskContextFactory;
@@ -428,33 +428,33 @@ public class TestStreamApplicationDescriptorImpl {
   @Test
   public void testGetNextOpIdIncrementsId() {
     HashMap<String, String> configMap = new HashMap<>();
-    configMap.put(JobConfig.JOB_NAME(), "jobName");
-    configMap.put(JobConfig.JOB_ID(), "1234");
+    configMap.put(ApplicationConfig.APP_NAME, "appName");
+    configMap.put(ApplicationConfig.APP_ID, "1234");
     Config config = new MapConfig(configMap);
 
     StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, config);
-    assertEquals("jobName-1234-merge-0", 
streamAppDesc.getNextOpId(OpCode.MERGE, null));
-    assertEquals("jobName-1234-join-customName", 
streamAppDesc.getNextOpId(OpCode.JOIN, "customName"));
-    assertEquals("jobName-1234-map-2", streamAppDesc.getNextOpId(OpCode.MAP, 
null));
+    assertEquals("appName-1234-merge-0", 
streamAppDesc.getNextOpId(OpCode.MERGE, null));
+    assertEquals("appName-1234-join-customName", 
streamAppDesc.getNextOpId(OpCode.JOIN, "customName"));
+    assertEquals("appName-1234-map-2", streamAppDesc.getNextOpId(OpCode.MAP, 
null));
   }
 
   @Test(expected = SamzaException.class)
   public void testGetNextOpIdRejectsDuplicates() {
     HashMap<String, String> configMap = new HashMap<>();
-    configMap.put(JobConfig.JOB_NAME(), "jobName");
-    configMap.put(JobConfig.JOB_ID(), "1234");
+    configMap.put(ApplicationConfig.APP_NAME, "appName");
+    configMap.put(ApplicationConfig.APP_ID, "1234");
     Config config = new MapConfig(configMap);
 
     StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, config);
-    assertEquals("jobName-1234-join-customName", 
streamAppDesc.getNextOpId(OpCode.JOIN, "customName"));
+    assertEquals("appName-1234-join-customName", 
streamAppDesc.getNextOpId(OpCode.JOIN, "customName"));
     streamAppDesc.getNextOpId(OpCode.JOIN, "customName"); // should throw
   }
 
   @Test
   public void testOpIdValidation() {
     HashMap<String, String> configMap = new HashMap<>();
-    configMap.put(JobConfig.JOB_NAME(), "jobName");
-    configMap.put(JobConfig.JOB_ID(), "1234");
+    configMap.put(ApplicationConfig.APP_NAME, "appName");
+    configMap.put(ApplicationConfig.APP_ID, "1234");
     Config config = new MapConfig(configMap);
 
     StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, config);
@@ -575,7 +575,7 @@ public class TestStreamApplicationDescriptorImpl {
 
   private Config getConfig() {
     HashMap<String, String> configMap = new HashMap<>();
-    configMap.put(JobConfig.JOB_NAME(), "test-job");
+    configMap.put(ApplicationConfig.APP_NAME, "test-job");
     return new MapConfig(configMap);
   }
 
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
index f83487d..ee069fd 100644
--- 
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
@@ -21,8 +21,8 @@ package org.apache.samza.coordinator.stream;
 
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.execution.JobPlanner;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.codehaus.jackson.type.TypeReference;
@@ -48,12 +48,12 @@ public class TestCoordinatorStreamWriter {
   @Test
   public void testCoordinatorStream() {
 
-    Map<String, String> configMap = new HashMap<>();
-    configMap.put("systems.coordinatorStreamWriter.samza.factory", 
"org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory");
-    configMap.put("job.name", "coordinator-stream-writer-test");
-    configMap.put("job.coordinator.system", "coordinatorStreamWriter");
-    Config config = new MapConfig(configMap);
-    coordinatorStreamWriter = new CoordinatorStreamWriter(config);
+    Map<String, String> userConfigs = new HashMap<>();
+    userConfigs.put("systems.coordinatorStreamWriter.samza.factory", 
"org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory");
+    userConfigs.put("app.name", "coordinator-stream-writer-test");
+    userConfigs.put("job.coordinator.system", "coordinatorStreamWriter");
+    Config generatedConfig = JobPlanner.generateSingleJobConfig(userConfigs);
+    coordinatorStreamWriter = new CoordinatorStreamWriter(generatedConfig);
     boolean exceptionHappened = false;
 
     try {
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
index 1c4abc6..0962a14 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
@@ -19,7 +19,9 @@
 package org.apache.samza.execution;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
+import org.apache.samza.SamzaException;
 import 
org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
 import org.apache.samza.application.descriptors.TaskApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
@@ -236,6 +238,24 @@ public class TestJobNodeConfigurationGenerator extends 
ExecutionPlannerTestBase
     assertEquals("rewritten-system", jobConfig.get(streamCfgToOverride));
   }
 
+  @Test(expected = SamzaException.class)
+  public void testJobNameConfigValidation() {
+    ImmutableMap<String, String> userConfigs =
+        ImmutableMap.of("job.name", "samza-job", "job.id", "1", "app.name", 
"samza-app");
+    ImmutableMap<String, String> generatedConfigs =
+        ImmutableMap.of("job.name", "samza-app", "job.id", "1", "app.name", 
"samza-app");
+    JobNodeConfigurationGenerator.validateJobConfigs(userConfigs, 
generatedConfigs);
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testJobIdConfigValidation() {
+    ImmutableMap<String, String> userConfigs =
+        ImmutableMap.of("job.id", "1", "app.id", 
"this-should-take-precedence", "app.name", "samza-app");
+    ImmutableMap<String, String> generatedConfigs =
+        ImmutableMap.of("job.name", "samza-app", "job.id", 
"this-should-take-precedence", "app.name", "samza-app");
+    JobNodeConfigurationGenerator.validateJobConfigs(userConfigs, 
generatedConfigs);
+  }
+
   private void validateTableConfigure(JobConfig jobConfig, Map<String, Serde> 
deserializedSerdes,
       TableDescriptor tableDescriptor) {
     Config tableConfig = jobConfig.subset(String.format("tables.%s.", 
tableDescriptor.getTableId()));
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobPlanner.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobPlanner.java
new file mode 100644
index 0000000..49bfd7f
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobPlanner.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.execution;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.MapConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestJobPlanner {
+
+  @Test
+  public void testJobNameIdConfigGeneration() {
+    Map<String, String> testConfig = new HashMap<>();
+    testConfig.put("app.name", "samza-app");
+    testConfig .put("app.id", "id");
+    MapConfig generatedConfig = JobPlanner.generateSingleJobConfig(testConfig);
+    Assert.assertEquals(generatedConfig.get("job.name"), "samza-app");
+    Assert.assertEquals(generatedConfig.get("job.id"), "id");
+  }
+
+  @Test
+  public void testAppConfigPrecedence() {
+    Map<String, String> testConfig = new HashMap<>();
+    testConfig.put("app.name", "samza-app");
+    testConfig .put("app.id", "id");
+    testConfig .put("job.id", "should-not-exist-id");
+    testConfig .put("job.name", "should-not-exist-name");
+    MapConfig generatedConfig = JobPlanner.generateSingleJobConfig(testConfig);
+    Assert.assertEquals(generatedConfig.get("job.name"), "samza-app");
+    Assert.assertEquals(generatedConfig.get("job.id"), "id");
+  }
+
+  @Test
+  public void testJobNameId() {
+    Map<String, String> testConfig = new HashMap<>();
+    testConfig .put("job.id", "should-exist-id");
+    testConfig .put("job.name", "should-exist-name");
+    MapConfig generatedConfig = JobPlanner.generateSingleJobConfig(testConfig);
+    Assert.assertEquals(generatedConfig.get("job.name"), "should-exist-name");
+    Assert.assertEquals(generatedConfig.get("job.id"), "should-exist-id");
+  }
+
+}
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index c691500..5e91a2a 100644
--- 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -75,8 +75,8 @@ public class TestLocalApplicationRunner {
   public void testRunStreamTask() {
     final Map<String, String> cfgs = new HashMap<>();
     cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, 
UUIDGenerator.class.getName());
-    cfgs.put(JobConfig.JOB_NAME(), "test-task-job");
-    cfgs.put(JobConfig.JOB_ID(), "jobId");
+    cfgs.put(ApplicationConfig.APP_NAME, "test-app");
+    cfgs.put(ApplicationConfig.APP_ID, "test-appId");
     config = new MapConfig(cfgs);
     mockApp = new LegacyTaskApplication(IdentityStreamTask.class.getName());
     prepareTest();
@@ -108,8 +108,8 @@ public class TestLocalApplicationRunner {
   public void testRunStreamTaskWithoutExternalContext() {
     final Map<String, String> cfgs = new HashMap<>();
     cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, 
UUIDGenerator.class.getName());
-    cfgs.put(JobConfig.JOB_NAME(), "test-task-job");
-    cfgs.put(JobConfig.JOB_ID(), "jobId");
+    cfgs.put(ApplicationConfig.APP_NAME, "test-app");
+    cfgs.put(ApplicationConfig.APP_ID, "test-appId");
     config = new MapConfig(cfgs);
     mockApp = new LegacyTaskApplication(IdentityStreamTask.class.getName());
     prepareTest();
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
index 702cbfb..d07f02e 100644
--- 
a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
@@ -23,6 +23,7 @@ import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
@@ -55,6 +56,7 @@ public class TestRemoteApplicationRunner {
   @Before
   public void setUp() {
     Map<String, String> config = new HashMap<>();
+    config.put(ApplicationConfig.APP_NAME, "test-app");
     StreamApplication userApp = appDesc -> { };
     runner = spy(new RemoteApplicationRunner(userApp, new MapConfig(config)));
   }
diff --git 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index dadccd9..244fd8f 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -27,10 +27,7 @@ import 
org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap
 import org.apache.samza.container.TaskName
 import 
org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
 import org.apache.samza.checkpoint.TestCheckpointTool.MockSystemFactory
-import org.apache.samza.config.Config
-import org.apache.samza.config.MapConfig
-import org.apache.samza.config.SystemConfig
-import org.apache.samza.config.TaskConfig
+import org.apache.samza.config._
 import org.apache.samza.metrics.MetricsRegistry
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.apache.samza.system.SystemAdmin
@@ -45,10 +42,11 @@ import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.junit.AssertionsForJUnit
 import org.scalatest.mockito.MockitoSugar
+
 import scala.collection.JavaConverters._
-import org.apache.samza.config.JobConfig
 import org.apache.samza.coordinator.stream.CoordinatorStreamManager
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
+import org.apache.samza.execution.JobPlanner
 
 object TestCheckpointTool {
   var checkpointManager: CheckpointManager = _
@@ -81,8 +79,8 @@ class TestCheckpointTool extends AssertionsForJUnit with 
MockitoSugar {
 
   @Before
   def setup() {
-    config = new MapConfig(Map(
-      JobConfig.JOB_NAME -> "test",
+    val userDefinedConfig: MapConfig = new MapConfig(Map(
+      ApplicationConfig.APP_NAME -> "test",
       JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
       TaskConfig.INPUT_STREAMS -> "test.foo",
       TaskConfig.CHECKPOINT_MANAGER_FACTORY -> 
classOf[MockCheckpointManagerFactory].getName,
@@ -90,6 +88,7 @@ class TestCheckpointTool extends AssertionsForJUnit with 
MockitoSugar {
       SystemConfig.SYSTEM_FACTORY.format("coordinator") -> 
classOf[MockCoordinatorStreamSystemFactory].getName,
       TaskConfig.GROUPER_FACTORY -> 
"org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
     ).asJava)
+    config = JobPlanner.generateSingleJobConfig(userDefinedConfig)
     val metadata = new SystemStreamMetadata("foo", Map[Partition, 
SystemStreamPartitionMetadata](
       new Partition(0) -> new SystemStreamPartitionMetadata("0", "100", "101"),
       new Partition(1) -> new SystemStreamPartitionMetadata("0", "200", "201")
@@ -147,8 +146,8 @@ class TestCheckpointTool extends AssertionsForJUnit with 
MockitoSugar {
     val offsetMap: TaskNameToCheckpointMap = Map(tn0 -> Map(new 
SystemStreamPartition("test", "foo", p0) -> "42"),
       tn1 -> Map(new SystemStreamPartition("test", "foo", p1) -> "43"))
     val coordinatorStreamManager = mock[CoordinatorStreamManager]
-    val userDefinedConfig: Config = new MapConfig(Map(
-      JobConfig.JOB_NAME -> "test",
+    val userDefinedConfig: MapConfig = new MapConfig(Map(
+      ApplicationConfig.APP_NAME -> "test",
       JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
       TaskConfig.INPUT_STREAMS -> "test.foo",
       TaskConfig.CHECKPOINT_MANAGER_FACTORY -> 
classOf[MockCheckpointManagerFactory].getName,
@@ -156,8 +155,8 @@ class TestCheckpointTool extends AssertionsForJUnit with 
MockitoSugar {
       SystemConfig.SYSTEM_FACTORY.format("coordinator") -> 
classOf[MockCoordinatorStreamSystemFactory].getName,
       TaskConfig.GROUPER_FACTORY -> 
"org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
     ).asJava)
-
-    when(coordinatorStreamManager.getConfig).thenReturn(userDefinedConfig)
+    val generatedConfigs: MapConfig = 
JobPlanner.generateSingleJobConfig(userDefinedConfig)
+    when(coordinatorStreamManager.getConfig).thenReturn(generatedConfigs)
 
     val checkpointTool: CheckpointTool = new CheckpointTool(offsetMap, 
coordinatorStreamManager)
     checkpointTool.run()
@@ -167,6 +166,6 @@ class TestCheckpointTool extends AssertionsForJUnit with 
MockitoSugar {
     verify(TestCheckpointTool.checkpointManager)
       .writeCheckpoint(tn1, new Checkpoint(Map(new 
SystemStreamPartition("test", "foo", p1) -> "43").asJava))
     verify(coordinatorStreamManager).getConfig
-    assert(TestCheckpointTool.coordinatorConfig == userDefinedConfig)
+    assert(TestCheckpointTool.coordinatorConfig == generatedConfigs)
   }
 }
diff --git 
a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java 
b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
index 71a902c..21709be 100644
--- a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
+++ b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
@@ -30,6 +30,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.samza.SamzaException;
+import org.apache.samza.execution.JobPlanner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
@@ -159,7 +160,7 @@ public class KafkaConsumerConfig extends HashMap<String, 
Object> {
   }
 
   public static Pair<String, String> getJobNameAndId(Config config) {
-    JobConfig jobConfig = new JobConfig(config);
+    JobConfig jobConfig = new 
JobConfig(JobPlanner.generateSingleJobConfig(config));
     Option jobNameOption = jobConfig.getName();
     if (jobNameOption.isEmpty()) {
       throw new ConfigException("Missing job name");
diff --git a/samza-test/src/main/config/join/checker.samza 
b/samza-test/src/main/config/join/checker.samza
index c0e3df7..faef65e 100644
--- a/samza-test/src/main/config/join/checker.samza
+++ b/samza-test/src/main/config/join/checker.samza
@@ -16,8 +16,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# Job
-job.name=checker
+# App
+app.name=checker
 
 
systems.kafka.partitioner.class=org.apache.samza.test.integration.join.EpochPartitioner
 systems.kafka.samza.offset.default=oldest
diff --git a/samza-test/src/main/config/join/emitter.samza 
b/samza-test/src/main/config/join/emitter.samza
index 50379a3..ab71849 100644
--- a/samza-test/src/main/config/join/emitter.samza
+++ b/samza-test/src/main/config/join/emitter.samza
@@ -16,8 +16,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# Job
-job.name=emitter
+# App
+app.name=emitter
 
 # Task
 task.class=org.apache.samza.test.integration.join.Emitter
diff --git a/samza-test/src/main/config/join/joiner.samza 
b/samza-test/src/main/config/join/joiner.samza
index a138e9e..3e15f07 100644
--- a/samza-test/src/main/config/join/joiner.samza
+++ b/samza-test/src/main/config/join/joiner.samza
@@ -16,8 +16,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# Job
-job.name=joiner
+# App
+app.name=joiner
 
 # Task
 task.class=org.apache.samza.test.integration.join.Joiner
diff --git a/samza-test/src/main/config/join/watcher.samza 
b/samza-test/src/main/config/join/watcher.samza
index 3e2bf7e..05e31d1 100644
--- a/samza-test/src/main/config/join/watcher.samza
+++ b/samza-test/src/main/config/join/watcher.samza
@@ -16,8 +16,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# Job
-job.name=watcher
+# App
+app.name=watcher
 
 # Task
 task.class=org.apache.samza.test.integration.join.Watcher
diff --git a/samza-test/src/main/config/standalone.failure.test.properties 
b/samza-test/src/main/config/standalone.failure.test.properties
index d200251..ea0e1dd 100644
--- a/samza-test/src/main/config/standalone.failure.test.properties
+++ b/samza-test/src/main/config/standalone.failure.test.properties
@@ -22,8 +22,6 @@ 
app.class=org.apache.samza.test.integration.TestStandaloneIntegrationApplication
 
 app.name=test-app-name
 app.id=test-app-id
-job.name=test-app-name
-job.id=test-app-id
 
 ## Kafka I/O system properties.
 input.stream.name=standalone_integration_test_kafka_input_topic
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java 
b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index bfea961..b2c24a2 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -34,6 +34,7 @@ import org.apache.commons.lang.RandomStringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.LegacyTaskApplication;
 import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.InMemorySystemConfig;
@@ -44,6 +45,7 @@ import org.apache.samza.config.StreamConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
 import org.apache.samza.context.ExternalContext;
+import org.apache.samza.execution.JobPlanner;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.metadatastore.InMemoryMetadataStoreFactory;
 import org.apache.samza.operators.KV;
@@ -80,7 +82,7 @@ import org.slf4j.LoggerFactory;
  *  <ol>
  *    <li>"job.coordination.factory" = {@link 
PassthroughJobCoordinatorFactory}</li>
  *    <li>"task.name.grouper.factory" = {@link 
SingleContainerGrouperFactory}</li>
- *    <li>"job.name" = "test-samza"</li>
+ *    <li>"app.name" = "test-samza"</li>
  *    <li>"processor.id" = "1"</li>
  *    <li>"job.default.system" = {@code JOB_DEFAULT_SYSTEM}</li>
  *    <li>"job.host-affinity.enabled" = "false"</li>
@@ -91,7 +93,7 @@ import org.slf4j.LoggerFactory;
 public class TestRunner {
   private static final Logger LOG = LoggerFactory.getLogger(TestRunner.class);
   private static final String JOB_DEFAULT_SYSTEM = "default-samza-system";
-  private static final String JOB_NAME = "samza-test";
+  private static final String APP_NAME = "samza-test";
 
   private Map<String, String> configs;
   private SamzaApplication app;
@@ -104,7 +106,7 @@ public class TestRunner {
   private TestRunner() {
     this.configs = new HashMap<>();
     this.inMemoryScope = RandomStringUtils.random(10, true, true);
-    configs.put(JobConfig.JOB_NAME(), JOB_NAME);
+    configs.put(ApplicationConfig.APP_NAME, APP_NAME);
     configs.put(JobConfig.PROCESSOR_ID(), "1");
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
PassthroughJobCoordinatorFactory.class.getName());
     configs.put(JobConfig.STARTPOINT_METADATA_STORE_FACTORY(), 
InMemoryMetadataStoreFactory.class.getCanonicalName());
@@ -268,7 +270,7 @@ public class TestRunner {
     Preconditions.checkState(!timeout.isZero() || !timeout.isNegative(), 
"Timeouts should be positive");
     // Cleaning store directories to ensure current run does not pick up state 
from previous run
     deleteStoreDirectories();
-    Config config = new MapConfig(configs);
+    Config config = new MapConfig(JobPlanner.generateSingleJobConfig(configs));
     final LocalApplicationRunner runner = new LocalApplicationRunner(app, 
config);
     runner.run(buildExternalContext(config).orElse(null));
     if (!runner.waitForFinish(timeout)) {
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
index 16953f0..05f5a35 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
@@ -222,7 +222,7 @@ public class StreamApplicationIntegrationTestHarness 
extends AbstractIntegration
       Map<String, String> overriddenConfigs) {
     Map<String, String> configMap = new HashMap<>();
     configMap.put("app.runner.class", 
"org.apache.samza.runtime.LocalApplicationRunner");
-    configMap.put("job.name", appName);
+    configMap.put("app.name", appName);
     configMap.put("app.class", 
streamApplication.getClass().getCanonicalName());
     configMap.put("serializers.registry.json.class", 
"org.apache.samza.serializers.JsonSerdeFactory");
     configMap.put("serializers.registry.string.class", 
"org.apache.samza.serializers.StringSerdeFactory");
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
 
b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
index c9f29eb..cc8cceb 100644
--- 
a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.commons.cli.ParseException;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
@@ -64,7 +65,7 @@ public class SystemConsumerWithSamzaBench extends 
AbstractSamzaBench {
   public void addMoreSystemConfigs(Properties props) {
     props.put("app.runner.class", LocalApplicationRunner.class.getName());
     List<Integer> partitions = IntStream.range(startPartition, 
endPartition).boxed().collect(Collectors.toList());
-    props.put(JobConfig.JOB_NAME(), "SamzaBench");
+    props.put(ApplicationConfig.APP_NAME, "SamzaBench");
     props.put(JobConfig.PROCESSOR_ID(), "1");
     props.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
PassthroughJobCoordinatorFactory.class.getName());
     
props.put(String.format(ConfigBasedSspGrouperFactory.CONFIG_STREAM_PARTITIONS, 
streamId),

Reply via email to