This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 4347057 SAMZA-2106: Samza app and job config refactor (#920)
4347057 is described below
commit 434705750a454d989f00a2f0afd15a2e20b4cf5b
Author: Sanil Jain <[email protected]>
AuthorDate: Wed Mar 6 17:50:38 2019 -0800
SAMZA-2106: Samza app and job config refactor (#920)
---
.../versioned/jobs/samza-configurations.md | 4 +-
.../StreamApplicationDescriptorImpl.java | 8 +--
.../stream/CoordinatorStreamWriter.java | 6 ++-
.../apache/samza/execution/ExecutionPlanner.java | 8 ++-
.../execution/JobNodeConfigurationGenerator.java | 25 +++++++++
.../org/apache/samza/execution/JobPlanner.java | 42 ++++++++++++++-
.../samza/runtime/RemoteApplicationRunner.java | 7 +--
.../apache/samza/checkpoint/CheckpointTool.scala | 18 +++----
.../TestStreamApplicationDescriptorImpl.java | 24 ++++-----
.../stream/TestCoordinatorStreamWriter.java | 14 ++---
.../TestJobNodeConfigurationGenerator.java | 20 +++++++
.../org/apache/samza/execution/TestJobPlanner.java | 62 ++++++++++++++++++++++
.../samza/runtime/TestLocalApplicationRunner.java | 8 +--
.../samza/runtime/TestRemoteApplicationRunner.java | 2 +
.../samza/checkpoint/TestCheckpointTool.scala | 23 ++++----
.../apache/samza/config/KafkaConsumerConfig.java | 3 +-
samza-test/src/main/config/join/checker.samza | 4 +-
samza-test/src/main/config/join/emitter.samza | 4 +-
samza-test/src/main/config/join/joiner.samza | 4 +-
samza-test/src/main/config/join/watcher.samza | 4 +-
.../main/config/standalone.failure.test.properties | 2 -
.../apache/samza/test/framework/TestRunner.java | 10 ++--
.../StreamApplicationIntegrationTestHarness.java | 2 +-
.../benchmark/SystemConsumerWithSamzaBench.java | 3 +-
24 files changed, 231 insertions(+), 76 deletions(-)
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md
b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 757d552..4fcc87b 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -50,8 +50,8 @@ These are the basic properties for setting up a Samza
application.
|app.id|1|If you run several instances of your application at the same time,
you need to give each instance a different app.id. This is important, since
otherwise the applications will overwrite each others' checkpoints, and perhaps
interfere with each other in other ways.|
|app.class| |This is __required if running on YARN__. The application to run.
The value is a fully-qualified Java classname, which must implement
StreamApplication. A StreamApplication describes as a series of transformations
on the streams.|
|job.factory.class| |This is __required if running on YARN__. The job factory
to use for running this job. <br> The value is a fully-qualified Java
classname, which must implement StreamJobFactory.<br> Samza ships with three
implementations:<br><br>`org.apache.samza.job.yarn.YarnJobFactory`<br>Runs your
job on a YARN grid. See below for YARN-specific
configuration.<br><br>`org.apache.samza.job.local.ThreadJobFactory`<br>__For
dev deployments only.__ Runs your job on your local machine us [...]
-|job.name| |__Required:__ The name of your job. This name appears on the Samza
dashboard, and it is used to tell apart this job's checkpoints from other jobs'
checkpoints.|
-|job.id|1|If you run several instances of your job at the same time, you need
to give each execution a different job.id. This is important, since otherwise
the jobs will overwrite each others' checkpoints, and perhaps interfere with
each other in other ways.|
+|job.name| |_(Deprecated in favor of app.name)_ The name of your job. This
name appears on the Samza dashboard, and it is used to tell apart this job's
checkpoints from other jobs' checkpoints.|
+|job.id|1|_(Deprecated in favor of app.id)_ If you run several instances of
your job at the same time, you need to give each execution a different job.id.
This is important, since otherwise the jobs will overwrite each others'
checkpoints, and perhaps interfere with each other in other ways.|
|job.default.system| |__Required:__ The system-name to use for creating input
or output streams for which the system is not explicitly configured. This
property will also be used as default for `job.coordinator.system`,
`task.checkpoint.system` and `job.changelog.system` if none are defined.|
|task.class| |Used for legacy purposes; replace with `app.class` in new jobs.
The fully-qualified name of the Java class which processes incoming messages
from input streams. The class must implement
[StreamTask](../api/javadocs/org/apache/samza/task/StreamTask.html) or
[AsyncStreamTask](../api/javadocs/org/apache/samza/task/AsyncStreamTask.html),
and may optionally implement
[InitableTask](../api/javadocs/org/apache/samza/task/InitableTask.html),
[ClosableTask](../api/javadocs/org/apach [...]
|job.host-affinity.enabled|false|This property indicates whether host-affinity
is enabled or not. Host-affinity refers to the ability of Samza to request and
allocate a container on the same host every time the job is deployed. When
host-affinity is enabled, Samza makes a "best-effort" to honor the
host-affinity constraint. The property
`cluster-manager.container.request.timeout.ms` determines how long to wait
before de-prioritizing the host-affinity constraint and assigning the containe
[...]
diff --git
a/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
b/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
index 2632f4b..956d34e 100644
---
a/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
+++
b/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
@@ -30,8 +30,8 @@ import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
import org.apache.samza.system.descriptors.InputDescriptor;
import org.apache.samza.system.descriptors.OutputDescriptor;
import org.apache.samza.operators.KV;
@@ -166,10 +166,10 @@ public class StreamApplicationDescriptorImpl extends
ApplicationDescriptorImpl<S
throw new SamzaException("Operator ID must not contain spaces or special
characters: " + userDefinedId);
}
- JobConfig jobConfig = new JobConfig(getConfig());
+ ApplicationConfig applicationConfig = new ApplicationConfig(getConfig());
String nextOpId = String.format("%s-%s-%s-%s",
- jobConfig.getName().get(),
- jobConfig.getJobId(),
+ applicationConfig.getAppName(),
+ applicationConfig.getAppId(),
opCode.name().toLowerCase(),
StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() :
String.valueOf(nextOpNum));
if (!operatorIds.add(nextOpId)) {
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
index daca6a0..2e857f4 100644
---
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
@@ -22,6 +22,7 @@ package org.apache.samza.coordinator.stream;
import joptsimple.OptionSet;
import org.apache.samza.config.Config;
import org.apache.samza.coordinator.stream.messages.SetConfig;
+import org.apache.samza.execution.JobPlanner;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -114,12 +115,13 @@ public class CoordinatorStreamWriter {
public static void main(String[] args) {
CoordinatorStreamWriterCommandLine cmdline = new
CoordinatorStreamWriterCommandLine();
OptionSet options = cmdline.parser().parse(args);
- Config config = cmdline.loadConfig(options);
+ Config userConfig = cmdline.loadConfig(options);
+ Config generatedConfig = JobPlanner.generateSingleJobConfig(userConfig);
String type = cmdline.loadType(options);
String key = cmdline.loadKey(options);
String value = cmdline.loadValue(options);
- CoordinatorStreamWriter writer = new CoordinatorStreamWriter(config);
+ CoordinatorStreamWriter writer = new
CoordinatorStreamWriter(generatedConfig);
writer.start();
writer.sendMessage(type, key, value);
writer.stop();
diff --git
a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index 0c5e368..48b3f4b 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -43,6 +43,7 @@ import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
@@ -125,9 +126,12 @@ public class ExecutionPlanner {
Set<TableDescriptor> tables = appDesc.getTableDescriptors();
+ // Generate job.id and job.name configs from app.id and app.name if defined
+ MapConfig generatedJobConfigs = JobPlanner.generateSingleJobConfig(config);
+ String jobName = generatedJobConfigs.get(JobConfig.JOB_NAME());
+ String jobId = generatedJobConfigs.get(JobConfig.JOB_ID(), "1");
+
// For this phase, we have a single job node for the whole DAG
- String jobName = config.get(JobConfig.JOB_NAME());
- String jobId = config.get(JobConfig.JOB_ID(), "1");
JobNode node = jobGraph.getOrCreateJobNode(jobName, jobId);
// Add input streams
diff --git
a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
index 761fb05..92acabd 100644
---
a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
+++
b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
@@ -31,6 +31,8 @@ import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.config.JobConfig;
@@ -69,7 +71,9 @@ import org.slf4j.LoggerFactory;
static final String CONFIG_INTERNAL_EXECUTION_PLAN =
"samza.internal.execution.plan";
static Config mergeConfig(Map<String, String> originalConfig, Map<String,
String> generatedConfig) {
+ validateJobConfigs(originalConfig, generatedConfig);
Map<String, String> mergedConfig = new HashMap<>(generatedConfig);
+
originalConfig.forEach((k, v) -> {
if (generatedConfig.containsKey(k) &&
!Objects.equals(generatedConfig.get(k), v)) {
LOG.info("Replacing generated config for key: {} value: {} with
original config value: {}", k, generatedConfig.get(k), v);
@@ -80,6 +84,27 @@ import org.slf4j.LoggerFactory;
return Util.rewriteConfig(new MapConfig(mergedConfig));
}
+ static void validateJobConfigs(Map<String, String> originalConfig,
Map<String, String> generatedConfig) {
+ String userConfiguredJobId = originalConfig.get(JobConfig.JOB_ID());
+ String userConfiguredJobName = originalConfig.get(JobConfig.JOB_NAME());
+ String generatedJobId = generatedConfig.get(JobConfig.JOB_ID());
+ String generatedJobName = generatedConfig.get(JobConfig.JOB_NAME());
+
+ if (generatedJobName != null && userConfiguredJobName != null &&
!StringUtils.equals(generatedJobName,
+ userConfiguredJobName)) {
+ throw new SamzaException(String.format(
+ "Generated job.name = %s from app.name = %s does not match user
configured job.name = %s, please configure job.name same as app.name",
+ generatedJobName, originalConfig.get(ApplicationConfig.APP_NAME),
userConfiguredJobName));
+ }
+
+ if (generatedJobId != null && userConfiguredJobId != null &&
!StringUtils.equals(generatedJobId,
+ userConfiguredJobId)) {
+ throw new SamzaException(String.format(
+ "Generated job.id = %s from app.id = %s does not match user
configured job.id = %s, please configure job.id same as app.id",
+ generatedJobId, originalConfig.get(ApplicationConfig.APP_ID),
userConfiguredJobId));
+ }
+ }
+
JobConfig generateJobConfig(JobNode jobNode, String executionPlanJson) {
if (jobNode.isLegacyTaskApplication()) {
return new JobConfig(jobNode.getConfig());
diff --git
a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
index b569437..f495870 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
@@ -26,12 +26,14 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
import org.apache.samza.application.descriptors.ApplicationDescriptor;
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.application.LegacyTaskApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.TaskConfig;
@@ -82,7 +84,6 @@ public abstract class JobPlanner {
userConfig.get(TaskConfig.INPUT_STREAMS()));
allowedUserConfig.remove(TaskConfig.INPUT_STREAMS());
}
-
generatedConfig.putAll(getGeneratedConfig(runId));
}
@@ -155,4 +156,43 @@ public abstract class JobPlanner {
systemStreamConfigs.put(JobConfig.JOB_DEFAULT_SYSTEM(),
dsd.getSystemName()));
return systemStreamConfigs;
}
+
+ /**
+ * Generates configs for a single job in app, job.id from app.id and
job.name from app.name config
+ * If both job.id and app.id is defined, app.id takes precedence and job.id
is set to value of app.id
+ * If both job.name and app.name is defined, app.name takes precedence and
job.name is set to value of app.name
+ *
+ * @param userConfigs configs passed from user
+ *
+ */
+ public static MapConfig generateSingleJobConfig(Map<String, String>
userConfigs) {
+ Map<String, String> generatedConfig = new HashMap<>(userConfigs);
+
+ if (!userConfigs.containsKey(JobConfig.JOB_NAME()) &&
!userConfigs.containsKey(ApplicationConfig.APP_NAME)) {
+ throw new SamzaException("Samza app name should not be null, Please set
either app.name (preferred) or job.name (deprecated) in configs");
+ }
+
+ if (userConfigs.containsKey(JobConfig.JOB_ID())) {
+ LOG.warn("{} is a deprecated configuration, use app.id instead.",
JobConfig.JOB_ID());
+ }
+
+ if (userConfigs.containsKey(JobConfig.JOB_NAME())) {
+ LOG.warn("{} is a deprecated configuration, use use app.name instead.",
JobConfig.JOB_NAME());
+ }
+
+ if (userConfigs.containsKey(ApplicationConfig.APP_NAME)) {
+ String appName = userConfigs.get(ApplicationConfig.APP_NAME);
+ LOG.info("app.name is defined, generating job.name equal to app.name
value: {}", appName);
+ generatedConfig.put(JobConfig.JOB_NAME(), appName);
+ }
+
+ if (userConfigs.containsKey(ApplicationConfig.APP_ID)) {
+ String appId = userConfigs.get(ApplicationConfig.APP_ID);
+ LOG.info("app.id is defined, generating job.id equal to app.name value:
{}", appId);
+ generatedConfig.put(JobConfig.JOB_ID(), appId);
+ }
+
+ return new MapConfig(generatedConfig);
+ }
+
}
diff --git
a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
index da4fab2..9969b9d 100644
---
a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
+++
b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -29,6 +29,7 @@ import org.apache.samza.application.SamzaApplication;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.context.ExternalContext;
+import org.apache.samza.execution.JobPlanner;
import org.apache.samza.execution.RemoteJobPlanner;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.job.JobRunner;
@@ -84,7 +85,7 @@ public class RemoteApplicationRunner implements
ApplicationRunner {
// since currently we only support single actual remote job, we can get
its status without
// building the execution plan.
try {
- JobConfig jc = new JobConfig(appDesc.getConfig());
+ JobConfig jc = new
JobConfig(JobPlanner.generateSingleJobConfig(appDesc.getConfig()));
LOG.info("Killing job {}", jc.getName());
JobRunner runner = new JobRunner(jc);
runner.kill();
@@ -98,7 +99,7 @@ public class RemoteApplicationRunner implements
ApplicationRunner {
// since currently we only support single actual remote job, we can get
its status without
// building the execution plan
try {
- JobConfig jc = new JobConfig(appDesc.getConfig());
+ JobConfig jc = new
JobConfig(JobPlanner.generateSingleJobConfig(appDesc.getConfig()));
return getApplicationStatus(jc);
} catch (Throwable t) {
throw new SamzaException("Failed to get status for application", t);
@@ -112,7 +113,7 @@ public class RemoteApplicationRunner implements
ApplicationRunner {
@Override
public boolean waitForFinish(Duration timeout) {
- JobConfig jobConfig = new JobConfig(appDesc.getConfig());
+ JobConfig jobConfig = new
JobConfig(JobPlanner.generateSingleJobConfig(appDesc.getConfig()));
boolean finished = true;
long timeoutInMs = timeout.toMillis();
long startTimeInMs = System.currentTimeMillis();
diff --git
a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index bb3b04d..25f7a17 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -20,29 +20,26 @@
package org.apache.samza.checkpoint
import java.net.URI
+import java.util
import java.util.regex.Pattern
import joptsimple.ArgumentAcceptingOptionSpec
import joptsimple.OptionSet
import org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap
import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.Config
-import org.apache.samza.config.ConfigRewriter
-import org.apache.samza.config.JobConfig
-import org.apache.samza.config.MapConfig
+import org.apache.samza.config._
import org.apache.samza.container.TaskName
-import org.apache.samza.job.JobRunner._
+import org.apache.samza.job.JobRunner.{info, warn, _}
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.CommandLine
-import org.apache.samza.util.Logging
-import org.apache.samza.util.Util
+import org.apache.samza.util.{CommandLine, Logging, Util}
import org.apache.samza.Partition
import org.apache.samza.SamzaException
import scala.collection.JavaConverters._
import org.apache.samza.coordinator.JobModelManager
import org.apache.samza.coordinator.stream.CoordinatorStreamManager
+import org.apache.samza.execution.JobPlanner
import org.apache.samza.storage.ChangelogStreamManager
import scala.collection.mutable.ListBuffer
@@ -152,8 +149,9 @@ object CheckpointTool {
def main(args: Array[String]) {
val cmdline = new CheckpointToolCommandLine
val options = cmdline.parser.parse(args: _*)
- val config = cmdline.loadConfig(options)
- val rewrittenConfig = rewriteConfig(new JobConfig(config))
+ val userConfig = cmdline.loadConfig(options)
+ val jobConfig = JobPlanner.generateSingleJobConfig(userConfig)
+ val rewrittenConfig = rewriteConfig(new JobConfig(jobConfig))
info(s"Using the rewritten config: $rewrittenConfig")
val tool = CheckpointTool(rewrittenConfig, cmdline.newOffsets)
tool.run()
diff --git
a/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
b/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
index 141d31b..d1fab98 100644
---
a/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
+++
b/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
@@ -28,8 +28,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.context.ApplicationContainerContextFactory;
import org.apache.samza.context.ApplicationTaskContextFactory;
@@ -428,33 +428,33 @@ public class TestStreamApplicationDescriptorImpl {
@Test
public void testGetNextOpIdIncrementsId() {
HashMap<String, String> configMap = new HashMap<>();
- configMap.put(JobConfig.JOB_NAME(), "jobName");
- configMap.put(JobConfig.JOB_ID(), "1234");
+ configMap.put(ApplicationConfig.APP_NAME, "appName");
+ configMap.put(ApplicationConfig.APP_ID, "1234");
Config config = new MapConfig(configMap);
StreamApplicationDescriptorImpl streamAppDesc = new
StreamApplicationDescriptorImpl(appDesc -> { }, config);
- assertEquals("jobName-1234-merge-0",
streamAppDesc.getNextOpId(OpCode.MERGE, null));
- assertEquals("jobName-1234-join-customName",
streamAppDesc.getNextOpId(OpCode.JOIN, "customName"));
- assertEquals("jobName-1234-map-2", streamAppDesc.getNextOpId(OpCode.MAP,
null));
+ assertEquals("appName-1234-merge-0",
streamAppDesc.getNextOpId(OpCode.MERGE, null));
+ assertEquals("appName-1234-join-customName",
streamAppDesc.getNextOpId(OpCode.JOIN, "customName"));
+ assertEquals("appName-1234-map-2", streamAppDesc.getNextOpId(OpCode.MAP,
null));
}
@Test(expected = SamzaException.class)
public void testGetNextOpIdRejectsDuplicates() {
HashMap<String, String> configMap = new HashMap<>();
- configMap.put(JobConfig.JOB_NAME(), "jobName");
- configMap.put(JobConfig.JOB_ID(), "1234");
+ configMap.put(ApplicationConfig.APP_NAME, "appName");
+ configMap.put(ApplicationConfig.APP_ID, "1234");
Config config = new MapConfig(configMap);
StreamApplicationDescriptorImpl streamAppDesc = new
StreamApplicationDescriptorImpl(appDesc -> { }, config);
- assertEquals("jobName-1234-join-customName",
streamAppDesc.getNextOpId(OpCode.JOIN, "customName"));
+ assertEquals("appName-1234-join-customName",
streamAppDesc.getNextOpId(OpCode.JOIN, "customName"));
streamAppDesc.getNextOpId(OpCode.JOIN, "customName"); // should throw
}
@Test
public void testOpIdValidation() {
HashMap<String, String> configMap = new HashMap<>();
- configMap.put(JobConfig.JOB_NAME(), "jobName");
- configMap.put(JobConfig.JOB_ID(), "1234");
+ configMap.put(ApplicationConfig.APP_NAME, "appName");
+ configMap.put(ApplicationConfig.APP_ID, "1234");
Config config = new MapConfig(configMap);
StreamApplicationDescriptorImpl streamAppDesc = new
StreamApplicationDescriptorImpl(appDesc -> { }, config);
@@ -575,7 +575,7 @@ public class TestStreamApplicationDescriptorImpl {
private Config getConfig() {
HashMap<String, String> configMap = new HashMap<>();
- configMap.put(JobConfig.JOB_NAME(), "test-job");
+ configMap.put(ApplicationConfig.APP_NAME, "test-job");
return new MapConfig(configMap);
}
diff --git
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
index f83487d..ee069fd 100644
---
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
+++
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
@@ -21,8 +21,8 @@ package org.apache.samza.coordinator.stream;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.execution.JobPlanner;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.codehaus.jackson.type.TypeReference;
@@ -48,12 +48,12 @@ public class TestCoordinatorStreamWriter {
@Test
public void testCoordinatorStream() {
- Map<String, String> configMap = new HashMap<>();
- configMap.put("systems.coordinatorStreamWriter.samza.factory",
"org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory");
- configMap.put("job.name", "coordinator-stream-writer-test");
- configMap.put("job.coordinator.system", "coordinatorStreamWriter");
- Config config = new MapConfig(configMap);
- coordinatorStreamWriter = new CoordinatorStreamWriter(config);
+ Map<String, String> userConfigs = new HashMap<>();
+ userConfigs.put("systems.coordinatorStreamWriter.samza.factory",
"org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory");
+ userConfigs.put("app.name", "coordinator-stream-writer-test");
+ userConfigs.put("job.coordinator.system", "coordinatorStreamWriter");
+ Config generatedConfig = JobPlanner.generateSingleJobConfig(userConfigs);
+ coordinatorStreamWriter = new CoordinatorStreamWriter(generatedConfig);
boolean exceptionHappened = false;
try {
diff --git
a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
index 1c4abc6..0962a14 100644
---
a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
+++
b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
@@ -19,7 +19,9 @@
package org.apache.samza.execution;
import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
+import org.apache.samza.SamzaException;
import
org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
import org.apache.samza.application.descriptors.TaskApplicationDescriptorImpl;
import org.apache.samza.config.Config;
@@ -236,6 +238,24 @@ public class TestJobNodeConfigurationGenerator extends
ExecutionPlannerTestBase
assertEquals("rewritten-system", jobConfig.get(streamCfgToOverride));
}
+ @Test(expected = SamzaException.class)
+ public void testJobNameConfigValidation() {
+ ImmutableMap<String, String> userConfigs =
+ ImmutableMap.of("job.name", "samza-job", "job.id", "1", "app.name",
"samza-app");
+ ImmutableMap<String, String> generatedConfigs =
+ ImmutableMap.of("job.name", "samza-app", "job.id", "1", "app.name",
"samza-app");
+ JobNodeConfigurationGenerator.validateJobConfigs(userConfigs,
generatedConfigs);
+ }
+
+ @Test(expected = SamzaException.class)
+ public void testJobIdConfigValidation() {
+ ImmutableMap<String, String> userConfigs =
+ ImmutableMap.of("job.id", "1", "app.id",
"this-should-take-precedence", "app.name", "samza-app");
+ ImmutableMap<String, String> generatedConfigs =
+ ImmutableMap.of("job.name", "samza-app", "job.id",
"this-should-take-precedence", "app.name", "samza-app");
+ JobNodeConfigurationGenerator.validateJobConfigs(userConfigs,
generatedConfigs);
+ }
+
private void validateTableConfigure(JobConfig jobConfig, Map<String, Serde>
deserializedSerdes,
TableDescriptor tableDescriptor) {
Config tableConfig = jobConfig.subset(String.format("tables.%s.",
tableDescriptor.getTableId()));
diff --git
a/samza-core/src/test/java/org/apache/samza/execution/TestJobPlanner.java
b/samza-core/src/test/java/org/apache/samza/execution/TestJobPlanner.java
new file mode 100644
index 0000000..49bfd7f
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobPlanner.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.execution;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.MapConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestJobPlanner {
+
+ @Test
+ public void testJobNameIdConfigGeneration() {
+ Map<String, String> testConfig = new HashMap<>();
+ testConfig.put("app.name", "samza-app");
+ testConfig .put("app.id", "id");
+ MapConfig generatedConfig = JobPlanner.generateSingleJobConfig(testConfig);
+ Assert.assertEquals(generatedConfig.get("job.name"), "samza-app");
+ Assert.assertEquals(generatedConfig.get("job.id"), "id");
+ }
+
+ @Test
+ public void testAppConfigPrecedence() {
+ Map<String, String> testConfig = new HashMap<>();
+ testConfig.put("app.name", "samza-app");
+ testConfig .put("app.id", "id");
+ testConfig .put("job.id", "should-not-exist-id");
+ testConfig .put("job.name", "should-not-exist-name");
+ MapConfig generatedConfig = JobPlanner.generateSingleJobConfig(testConfig);
+ Assert.assertEquals(generatedConfig.get("job.name"), "samza-app");
+ Assert.assertEquals(generatedConfig.get("job.id"), "id");
+ }
+
+ @Test
+ public void testJobNameId() {
+ Map<String, String> testConfig = new HashMap<>();
+ testConfig .put("job.id", "should-exist-id");
+ testConfig .put("job.name", "should-exist-name");
+ MapConfig generatedConfig = JobPlanner.generateSingleJobConfig(testConfig);
+ Assert.assertEquals(generatedConfig.get("job.name"), "should-exist-name");
+ Assert.assertEquals(generatedConfig.get("job.id"), "should-exist-id");
+ }
+
+}
diff --git
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index c691500..5e91a2a 100644
---
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -75,8 +75,8 @@ public class TestLocalApplicationRunner {
public void testRunStreamTask() {
final Map<String, String> cfgs = new HashMap<>();
cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS,
UUIDGenerator.class.getName());
- cfgs.put(JobConfig.JOB_NAME(), "test-task-job");
- cfgs.put(JobConfig.JOB_ID(), "jobId");
+ cfgs.put(ApplicationConfig.APP_NAME, "test-app");
+ cfgs.put(ApplicationConfig.APP_ID, "test-appId");
config = new MapConfig(cfgs);
mockApp = new LegacyTaskApplication(IdentityStreamTask.class.getName());
prepareTest();
@@ -108,8 +108,8 @@ public class TestLocalApplicationRunner {
public void testRunStreamTaskWithoutExternalContext() {
final Map<String, String> cfgs = new HashMap<>();
cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS,
UUIDGenerator.class.getName());
- cfgs.put(JobConfig.JOB_NAME(), "test-task-job");
- cfgs.put(JobConfig.JOB_ID(), "jobId");
+ cfgs.put(ApplicationConfig.APP_NAME, "test-app");
+ cfgs.put(ApplicationConfig.APP_ID, "test-appId");
config = new MapConfig(cfgs);
mockApp = new LegacyTaskApplication(IdentityStreamTask.class.getName());
prepareTest();
diff --git
a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
index 702cbfb..d07f02e 100644
---
a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
+++
b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
@@ -23,6 +23,7 @@ import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
@@ -55,6 +56,7 @@ public class TestRemoteApplicationRunner {
@Before
public void setUp() {
Map<String, String> config = new HashMap<>();
+ config.put(ApplicationConfig.APP_NAME, "test-app");
StreamApplication userApp = appDesc -> { };
runner = spy(new RemoteApplicationRunner(userApp, new MapConfig(config)));
}
diff --git
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index dadccd9..244fd8f 100644
---
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -27,10 +27,7 @@ import
org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap
import org.apache.samza.container.TaskName
import
org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
import org.apache.samza.checkpoint.TestCheckpointTool.MockSystemFactory
-import org.apache.samza.config.Config
-import org.apache.samza.config.MapConfig
-import org.apache.samza.config.SystemConfig
-import org.apache.samza.config.TaskConfig
+import org.apache.samza.config._
import org.apache.samza.metrics.MetricsRegistry
import
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
import org.apache.samza.system.SystemAdmin
@@ -45,10 +42,11 @@ import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.junit.AssertionsForJUnit
import org.scalatest.mockito.MockitoSugar
+
import scala.collection.JavaConverters._
-import org.apache.samza.config.JobConfig
import org.apache.samza.coordinator.stream.CoordinatorStreamManager
import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
+import org.apache.samza.execution.JobPlanner
object TestCheckpointTool {
var checkpointManager: CheckpointManager = _
@@ -81,8 +79,8 @@ class TestCheckpointTool extends AssertionsForJUnit with
MockitoSugar {
@Before
def setup() {
- config = new MapConfig(Map(
- JobConfig.JOB_NAME -> "test",
+ val userDefinedConfig: MapConfig = new MapConfig(Map(
+ ApplicationConfig.APP_NAME -> "test",
JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
TaskConfig.INPUT_STREAMS -> "test.foo",
TaskConfig.CHECKPOINT_MANAGER_FACTORY ->
classOf[MockCheckpointManagerFactory].getName,
@@ -90,6 +88,7 @@ class TestCheckpointTool extends AssertionsForJUnit with
MockitoSugar {
SystemConfig.SYSTEM_FACTORY.format("coordinator") ->
classOf[MockCoordinatorStreamSystemFactory].getName,
TaskConfig.GROUPER_FACTORY ->
"org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
).asJava)
+ config = JobPlanner.generateSingleJobConfig(userDefinedConfig)
val metadata = new SystemStreamMetadata("foo", Map[Partition,
SystemStreamPartitionMetadata](
new Partition(0) -> new SystemStreamPartitionMetadata("0", "100", "101"),
new Partition(1) -> new SystemStreamPartitionMetadata("0", "200", "201")
@@ -147,8 +146,8 @@ class TestCheckpointTool extends AssertionsForJUnit with
MockitoSugar {
val offsetMap: TaskNameToCheckpointMap = Map(tn0 -> Map(new
SystemStreamPartition("test", "foo", p0) -> "42"),
tn1 -> Map(new SystemStreamPartition("test", "foo", p1) -> "43"))
val coordinatorStreamManager = mock[CoordinatorStreamManager]
- val userDefinedConfig: Config = new MapConfig(Map(
- JobConfig.JOB_NAME -> "test",
+ val userDefinedConfig: MapConfig = new MapConfig(Map(
+ ApplicationConfig.APP_NAME -> "test",
JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
TaskConfig.INPUT_STREAMS -> "test.foo",
TaskConfig.CHECKPOINT_MANAGER_FACTORY ->
classOf[MockCheckpointManagerFactory].getName,
@@ -156,8 +155,8 @@ class TestCheckpointTool extends AssertionsForJUnit with
MockitoSugar {
SystemConfig.SYSTEM_FACTORY.format("coordinator") ->
classOf[MockCoordinatorStreamSystemFactory].getName,
TaskConfig.GROUPER_FACTORY ->
"org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
).asJava)
-
- when(coordinatorStreamManager.getConfig).thenReturn(userDefinedConfig)
+ val generatedConfigs: MapConfig =
JobPlanner.generateSingleJobConfig(userDefinedConfig)
+ when(coordinatorStreamManager.getConfig).thenReturn(generatedConfigs)
val checkpointTool: CheckpointTool = new CheckpointTool(offsetMap,
coordinatorStreamManager)
checkpointTool.run()
@@ -167,6 +166,6 @@ class TestCheckpointTool extends AssertionsForJUnit with
MockitoSugar {
verify(TestCheckpointTool.checkpointManager)
.writeCheckpoint(tn1, new Checkpoint(Map(new
SystemStreamPartition("test", "foo", p1) -> "43").asJava))
verify(coordinatorStreamManager).getConfig
- assert(TestCheckpointTool.coordinatorConfig == userDefinedConfig)
+ assert(TestCheckpointTool.coordinatorConfig == generatedConfigs)
}
}
diff --git
a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
index 71a902c..21709be 100644
--- a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
+++ b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
@@ -30,6 +30,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.samza.SamzaException;
+import org.apache.samza.execution.JobPlanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
@@ -159,7 +160,7 @@ public class KafkaConsumerConfig extends HashMap<String,
Object> {
}
public static Pair<String, String> getJobNameAndId(Config config) {
- JobConfig jobConfig = new JobConfig(config);
+ JobConfig jobConfig = new
JobConfig(JobPlanner.generateSingleJobConfig(config));
Option jobNameOption = jobConfig.getName();
if (jobNameOption.isEmpty()) {
throw new ConfigException("Missing job name");
diff --git a/samza-test/src/main/config/join/checker.samza
b/samza-test/src/main/config/join/checker.samza
index c0e3df7..faef65e 100644
--- a/samza-test/src/main/config/join/checker.samza
+++ b/samza-test/src/main/config/join/checker.samza
@@ -16,8 +16,8 @@
# specific language governing permissions and limitations
# under the License.
-# Job
-job.name=checker
+# App
+app.name=checker
systems.kafka.partitioner.class=org.apache.samza.test.integration.join.EpochPartitioner
systems.kafka.samza.offset.default=oldest
diff --git a/samza-test/src/main/config/join/emitter.samza
b/samza-test/src/main/config/join/emitter.samza
index 50379a3..ab71849 100644
--- a/samza-test/src/main/config/join/emitter.samza
+++ b/samza-test/src/main/config/join/emitter.samza
@@ -16,8 +16,8 @@
# specific language governing permissions and limitations
# under the License.
-# Job
-job.name=emitter
+# App
+app.name=emitter
# Task
task.class=org.apache.samza.test.integration.join.Emitter
diff --git a/samza-test/src/main/config/join/joiner.samza
b/samza-test/src/main/config/join/joiner.samza
index a138e9e..3e15f07 100644
--- a/samza-test/src/main/config/join/joiner.samza
+++ b/samza-test/src/main/config/join/joiner.samza
@@ -16,8 +16,8 @@
# specific language governing permissions and limitations
# under the License.
-# Job
-job.name=joiner
+# App
+app.name=joiner
# Task
task.class=org.apache.samza.test.integration.join.Joiner
diff --git a/samza-test/src/main/config/join/watcher.samza
b/samza-test/src/main/config/join/watcher.samza
index 3e2bf7e..05e31d1 100644
--- a/samza-test/src/main/config/join/watcher.samza
+++ b/samza-test/src/main/config/join/watcher.samza
@@ -16,8 +16,8 @@
# specific language governing permissions and limitations
# under the License.
-# Job
-job.name=watcher
+# App
+app.name=watcher
# Task
task.class=org.apache.samza.test.integration.join.Watcher
diff --git a/samza-test/src/main/config/standalone.failure.test.properties
b/samza-test/src/main/config/standalone.failure.test.properties
index d200251..ea0e1dd 100644
--- a/samza-test/src/main/config/standalone.failure.test.properties
+++ b/samza-test/src/main/config/standalone.failure.test.properties
@@ -22,8 +22,6 @@
app.class=org.apache.samza.test.integration.TestStandaloneIntegrationApplication
app.name=test-app-name
app.id=test-app-id
-job.name=test-app-name
-job.id=test-app-id
## Kafka I/O system properties.
input.stream.name=standalone_integration_test_kafka_input_topic
diff --git
a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index bfea961..b2c24a2 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -34,6 +34,7 @@ import org.apache.commons.lang.RandomStringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.application.LegacyTaskApplication;
import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.InMemorySystemConfig;
@@ -44,6 +45,7 @@ import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
import org.apache.samza.context.ExternalContext;
+import org.apache.samza.execution.JobPlanner;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.metadatastore.InMemoryMetadataStoreFactory;
import org.apache.samza.operators.KV;
@@ -80,7 +82,7 @@ import org.slf4j.LoggerFactory;
* <ol>
* <li>"job.coordination.factory" = {@link
PassthroughJobCoordinatorFactory}</li>
* <li>"task.name.grouper.factory" = {@link
SingleContainerGrouperFactory}</li>
- * <li>"job.name" = "test-samza"</li>
+ * <li>"app.name" = "test-samza"</li>
* <li>"processor.id" = "1"</li>
* <li>"job.default.system" = {@code JOB_DEFAULT_SYSTEM}</li>
* <li>"job.host-affinity.enabled" = "false"</li>
@@ -91,7 +93,7 @@ import org.slf4j.LoggerFactory;
public class TestRunner {
private static final Logger LOG = LoggerFactory.getLogger(TestRunner.class);
private static final String JOB_DEFAULT_SYSTEM = "default-samza-system";
- private static final String JOB_NAME = "samza-test";
+ private static final String APP_NAME = "samza-test";
private Map<String, String> configs;
private SamzaApplication app;
@@ -104,7 +106,7 @@ public class TestRunner {
private TestRunner() {
this.configs = new HashMap<>();
this.inMemoryScope = RandomStringUtils.random(10, true, true);
- configs.put(JobConfig.JOB_NAME(), JOB_NAME);
+ configs.put(ApplicationConfig.APP_NAME, APP_NAME);
configs.put(JobConfig.PROCESSOR_ID(), "1");
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
PassthroughJobCoordinatorFactory.class.getName());
configs.put(JobConfig.STARTPOINT_METADATA_STORE_FACTORY(),
InMemoryMetadataStoreFactory.class.getCanonicalName());
@@ -268,7 +270,7 @@ public class TestRunner {
Preconditions.checkState(!timeout.isZero() || !timeout.isNegative(),
"Timeouts should be positive");
// Cleaning store directories to ensure current run does not pick up state
from previous run
deleteStoreDirectories();
- Config config = new MapConfig(configs);
+ Config config = new MapConfig(JobPlanner.generateSingleJobConfig(configs));
final LocalApplicationRunner runner = new LocalApplicationRunner(app,
config);
runner.run(buildExternalContext(config).orElse(null));
if (!runner.waitForFinish(timeout)) {
diff --git
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
index 16953f0..05f5a35 100644
---
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
+++
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
@@ -222,7 +222,7 @@ public class StreamApplicationIntegrationTestHarness
extends AbstractIntegration
Map<String, String> overriddenConfigs) {
Map<String, String> configMap = new HashMap<>();
configMap.put("app.runner.class",
"org.apache.samza.runtime.LocalApplicationRunner");
- configMap.put("job.name", appName);
+ configMap.put("app.name", appName);
configMap.put("app.class",
streamApplication.getClass().getCanonicalName());
configMap.put("serializers.registry.json.class",
"org.apache.samza.serializers.JsonSerdeFactory");
configMap.put("serializers.registry.string.class",
"org.apache.samza.serializers.StringSerdeFactory");
diff --git
a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
index c9f29eb..cc8cceb 100644
---
a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
+++
b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.cli.ParseException;
import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
@@ -64,7 +65,7 @@ public class SystemConsumerWithSamzaBench extends
AbstractSamzaBench {
public void addMoreSystemConfigs(Properties props) {
props.put("app.runner.class", LocalApplicationRunner.class.getName());
List<Integer> partitions = IntStream.range(startPartition,
endPartition).boxed().collect(Collectors.toList());
- props.put(JobConfig.JOB_NAME(), "SamzaBench");
+ props.put(ApplicationConfig.APP_NAME, "SamzaBench");
props.put(JobConfig.PROCESSOR_ID(), "1");
props.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
PassthroughJobCoordinatorFactory.class.getName());
props.put(String.format(ConfigBasedSspGrouperFactory.CONFIG_STREAM_PARTITIONS,
streamId),