This is an automated email from the ASF dual-hosted git repository.
dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new cb63b275d SAMZA-2735: Support configurable command to run jc in Yarn
(#1600)
cb63b275d is described below
commit cb63b275dfe209f6b2a59509d90d32fa8eb1e826
Author: Xinyu Liu <[email protected]>
AuthorDate: Wed Apr 20 11:49:25 2022 -0700
SAMZA-2735: Support configurable command to run jc in Yarn (#1600)
Co-authored-by: Xinyu Liu <[email protected]>
---
.../documentation/versioned/jobs/samza-configurations.md | 7 ++++---
.../src/main/java/org/apache/samza/config/JobConfig.java | 7 +++++++
.../src/test/java/org/apache/samza/config/TestJobConfig.java | 11 +++++++++++
.../src/main/scala/org/apache/samza/job/yarn/YarnJob.scala | 2 +-
4 files changed, 23 insertions(+), 4 deletions(-)
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md
b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 1ef74cdf5..790e289ec 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -67,15 +67,16 @@ These are the basic properties for setting up a Samza
application.
|--- |--- |--- |
|job.changelog.system|inherited from job.default.system|This property is
required if you would like to override the system defined in
`job.default.system` for the changelog. The changelog will be used with the
stream specified in `stores.store-name.changelog` config. You can override this
system by specifying both the system and the stream in
`stores.store-name.changelog`.|
|job.coordinator.system|inherited from job.default.system|This property is
required if you would like to override the system defined in
`job.default.system` for coordination. The **_system-name_** to use for
creating and maintaining the Coordinator Stream.|
+|job.coordinator.segment.<br>bytes|26214400|If you are using a Kafka system
for coordinator stream, this is the segment size to be used for the coordinator
topic's log segments. Keeping this number small is useful because it increases
the frequency that Kafka will garbage collect old messages.|
+|job.coordinator.replication.<br>factor|2|If you are using a Kafka system for
coordinator stream, this is the replication factor to be used for the
coordinator topic.|
+|job.coordinator.<br>monitor-partition-change.<br>frequency.ms|300000|The
frequency at which the input streams' partition count change should be
detected. When the input partition count change is detected, Samza will
automatically restart a stateless job or fail a stateful job. A longer time
interval is recommended for jobs w/ large number of input system stream
partitions, since gathering partition count may incur measurable overhead to
the job. You can completely disable partition coun [...]
+|job.coordinator.execute|bin/run-jc.sh|The command that starts a Samza job
coordinator. The script must be included in the job package. There is usually
no need to customize this.|
|job.config.rewriter.<br>**_rewriter-name_**.class|(none)|You can optionally
define configuration rewriters, which have the opportunity to dynamically
modify the job configuration before the job is started. For example, this can
be useful for pulling configuration from an external configuration management
system, or for determining the set of input streams dynamically at runtime. The
value of this property is a fully-qualified Java classname which must implement
[ConfigRewriter](../api/j [...]
|job.config.rewriters|(none)|If you have defined configuration rewriters, you
need to list them here, in the order in which they should be applied. The value
of this property is a comma-separated list of **_rewriter-name_** tokens.|
|job.config.rewriter.<br>**_rewriter-name_**.system|(none)|Set this property
to the `system-name` of the Kafka system from which you want to consume all
matching topics.|
|job.config.rewriter.<br>**_rewriter-name_**.regex|(none)|A regular expression
specifying which topics you want to consume within the Kafka system
`job.config.rewriter.*.system`. Any topics matched by this regular expression
will be consumed in addition to any topics you specify in your application.|
|job.config.rewriter.<br>**_rewriter-name_**.config.*| |Any properties
specified within this namespace are applied to the configuration of streams
that match the regex in `job.config.rewriter.*.regex`. For example, you can set
`job.config.rewriter.*.config.samza.msg.serde` to configure the deserializer
for messages in the matching streams, which is equivalent to setting
`systems.*.streams.*.samza.msg.serde` for each topic that matches the regex.|
|job.container.thread.<br>pool.size|0|If configured, the container thread pool
will be used to run synchronous operations of each task [in
parallel](#../container/event-loop.html). The operations include
StreamTask.process(), WindowableTask.window(), and internally Task.commit(). If
not configured and the default value of 0 is used, all task operations will run
in a single thread.|
-|job.coordinator.<br>monitor-partition-change.<br>frequency.ms|300000|The
frequency at which the input streams' partition count change should be
detected. When the input partition count change is detected, Samza will
automatically restart a stateless job or fail a stateful job. A longer time
interval is recommended for jobs w/ large number of input system stream
partitions, since gathering partition count may incur measurable overhead to
the job. You can completely disable partition coun [...]
-|job.coordinator.segment.<br>bytes|26214400| If you are using a Kafka system
for coordinator stream, this is the segment size to be used for the coordinator
topic's log segments. Keeping this number small is useful because it increases
the frequency that Kafka will garbage collect old messages.|
-|job.coordinator.replication.<br>factor|300000|The frequency at which the
input streams' partition count change should be detected. When the input
partition count change is detected, Samza will automatically restart a
stateless job or fail a stateful job. A longer time interval is recommended for
jobs w/ large number of input system stream partitions, since gathering
partition count may incur measurable overhead to the job. You can completely
disable partition count monitoring by setting [...]
|job.systemstreampartition.<br>grouper.factory|`org.apache.samza.`<br>`container.grouper.stream.`<br>`GroupByPartitionFactory`|A
factory class that is used to determine how input SystemStreamPartitions are
grouped together for processing in individual StreamTask instances. The factory
must implement the SystemStreamPartitionGrouperFactory interface. Once this
configuration is set, it can't be changed, since doing so could violate state
semantics, and lead to a loss of data.<br><br>`org.a [...]
|job.systemstreampartition.<br>matcher.class| |If you want to enable static
partition assignment, then this is a required configuration. The value of this
property is a fully-qualified Java class name that implements the interface
org.apache.samza.system.SystemStreamPartitionMatcher. Samza ships with two
matcher
classes:<br><br>`org.apache.samza.system.RangeSystemStreamPartitionMatcher`<br>This
classes uses a comma separated list of range(s) to determine which partition
matches, and thus [...]
|job.systemstreampartition.<br>matcher.config.<br>range| |If
`job.systemstreampartition.matcher.class` is specified, and the value of this
property is `org.apache.samza.system.RangeSystemStreamPartitionMatcher`, then
this property is a required configuration. Specify a comma separated list of
range(s) to determine which partition matches, and thus statically assigned to
the Job. For example "2,3,11-20", statically assigns partition 2, 3, and 11 to
20 for all the specified system and stre [...]
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 638474715..e3658ce03 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -111,6 +111,9 @@ public class JobConfig extends MapConfig {
public static final String MONITOR_INPUT_REGEX_FREQUENCY_MS =
"job.coordinator.monitor-input-regex.frequency.ms";
static final int DEFAULT_MONITOR_INPUT_REGEX_FREQUENCY_MS = 300000;
+ public static final String COORDINATOR_EXECUTE_COMMAND =
"job.coordinator.execute";
+ static final String DEFAULT_COORDINATOR_EXECUTE_COMMAND = "bin/run-jc.sh";
+
public static final String REGEX_RESOLVED_STREAMS =
"job.config.rewriter.%s.regex";
public static final String REGEX_RESOLVED_SYSTEM =
"job.config.rewriter.%s.system";
public static final String REGEX_INHERITED_CONFIG =
"job.config.rewriter.%s.config";
@@ -490,4 +493,8 @@ public class JobConfig extends MapConfig {
}
return elasticityFactor;
}
+
+ public String getCoordinatorExecuteCommand() {
+ return get(COORDINATOR_EXECUTE_COMMAND,
DEFAULT_COORDINATOR_EXECUTE_COMMAND);
+ }
}
\ No newline at end of file
diff --git
a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index 4d171662c..a037da41b 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -682,4 +682,15 @@ public class TestJobConfig {
jobConfig = new JobConfig(new MapConfig());
assertEquals(JobConfig.DEFAULT_JOB_ELASTICITY_FACTOR,
jobConfig.getElasticityFactor());
}
+
+ @Test
+ public void testGetCoordinatorExecuteCommand() {
+ JobConfig jobConfig = new JobConfig(new MapConfig());
+ assertEquals(JobConfig.DEFAULT_COORDINATOR_EXECUTE_COMMAND,
jobConfig.getCoordinatorExecuteCommand());
+
+ String myJcCmd = "bin/run-my-jc.sh";
+ jobConfig = new JobConfig(new MapConfig(
+ Collections.singletonMap(JobConfig.COORDINATOR_EXECUTE_COMMAND,
myJcCmd)));
+ assertEquals(myJcCmd, jobConfig.getCoordinatorExecuteCommand());
+ }
}
\ No newline at end of file
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index 237667d07..16ff8a465 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -43,7 +43,7 @@ class YarnJob(config: Config, hadoopConfig: Configuration)
extends StreamJob wit
def submit: YarnJob = {
try {
val jobConfig = new JobConfig(config)
- val cmdExec = "./__package/bin/run-jc.sh"
+ val cmdExec = "./__package/" + jobConfig.getCoordinatorExecuteCommand
val environment = YarnJob.buildEnvironment(config, this.yarnConfig,
jobConfig)
appId = client.submitApplication(