This is an automated email from the ASF dual-hosted git repository.

dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new cb63b275d SAMZA-2735: Support configurable command to run jc in Yarn 
(#1600)
cb63b275d is described below

commit cb63b275dfe209f6b2a59509d90d32fa8eb1e826
Author: Xinyu Liu <[email protected]>
AuthorDate: Wed Apr 20 11:49:25 2022 -0700

    SAMZA-2735: Support configurable command to run jc in Yarn (#1600)
    
    Co-authored-by: Xinyu Liu <[email protected]>
---
 .../documentation/versioned/jobs/samza-configurations.md      |  7 ++++---
 .../src/main/java/org/apache/samza/config/JobConfig.java      |  7 +++++++
 .../src/test/java/org/apache/samza/config/TestJobConfig.java  | 11 +++++++++++
 .../src/main/scala/org/apache/samza/job/yarn/YarnJob.scala    |  2 +-
 4 files changed, 23 insertions(+), 4 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md 
b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 1ef74cdf5..790e289ec 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -67,15 +67,16 @@ These are the basic properties for setting up a Samza 
application.
 |--- |--- |--- |
 |job.changelog.system|inherited from job.default.system|This property is 
required if you would like to override the system defined in 
`job.default.system` for the changelog. The changelog will be used with the 
stream specified in `stores.store-name.changelog` config. You can override this 
system by specifying both the system and the stream in 
`stores.store-name.changelog`.|
 |job.coordinator.system|inherited from job.default.system|This property is 
required if you would like to override the system defined in 
`job.default.system` for coordination. The **_system-name_** to use for 
creating and maintaining the Coordinator Stream.|
+|job.coordinator.segment.<br>bytes|26214400|If you are using a Kafka system 
for coordinator stream, this is the segment size to be used for the coordinator 
topic's log segments. Keeping this number small is useful because it increases 
the frequency that Kafka will garbage collect old messages.|
+|job.coordinator.replication.<br>factor|2|If you are using a Kafka system for 
coordinator stream, this is the replication factor to be used for the 
coordinator topic.|
+|job.coordinator.<br>monitor-partition-change.<br>frequency.ms|300000|The 
frequency at which the input streams' partition count change should be 
detected. When the input partition count change is detected, Samza will 
automatically restart a stateless job or fail a stateful job. A longer time 
interval is recommended for jobs w/ large number of input system stream 
partitions, since gathering partition count may incur measurable overhead to 
the job. You can completely disable partition coun [...]
+|job.coordinator.execute|bin/run-jc.sh|The command that starts a Samza job 
coordinator. The script must be included in the job package. There is usually 
no need to customize this.|
 |job.config.rewriter.<br>**_rewriter-name_**.class|(none)|You can optionally 
define configuration rewriters, which have the opportunity to dynamically 
modify the job configuration before the job is started. For example, this can 
be useful for pulling configuration from an external configuration management 
system, or for determining the set of input streams dynamically at runtime. The 
value of this property is a fully-qualified Java classname which must implement 
[ConfigRewriter](../api/j [...]
 |job.config.rewriters|(none)|If you have defined configuration rewriters, you 
need to list them here, in the order in which they should be applied. The value 
of this property is a comma-separated list of **_rewriter-name_** tokens.|
 |job.config.rewriter.<br>**_rewriter-name_**.system|(none)|Set this property 
to the `system-name` of the Kafka system from which you want to consume all 
matching topics.|
 |job.config.rewriter.<br>**_rewriter-name_**.regex|(none)|A regular expression 
specifying which topics you want to consume within the Kafka system 
`job.config.rewriter.*.system`. Any topics matched by this regular expression 
will be consumed in addition to any topics you specify in your application.|
 |job.config.rewriter.<br>**_rewriter-name_**.config.*| |Any properties 
specified within this namespace are applied to the configuration of streams 
that match the regex in `job.config.rewriter.*.regex`. For example, you can set 
`job.config.rewriter.*.config.samza.msg.serde` to configure the deserializer 
for messages in the matching streams, which is equivalent to setting 
`systems.*.streams.*.samza.msg.serde` for each topic that matches the regex.|
 |job.container.thread.<br>pool.size|0|If configured, the container thread pool 
will be used to run synchronous operations of each task [in 
parallel](#../container/event-loop.html). The operations include 
StreamTask.process(), WindowableTask.window(), and internally Task.commit(). If 
not configured and the default value of 0 is used, all task operations will run 
in a single thread.|
-|job.coordinator.<br>monitor-partition-change.<br>frequency.ms|300000|The 
frequency at which the input streams' partition count change should be 
detected. When the input partition count change is detected, Samza will 
automatically restart a stateless job or fail a stateful job. A longer time 
interval is recommended for jobs w/ large number of input system stream 
partitions, since gathering partition count may incur measurable overhead to 
the job. You can completely disable partition coun [...]
-|job.coordinator.segment.<br>bytes|26214400|   If you are using a Kafka system 
for coordinator stream, this is the segment size to be used for the coordinator 
topic's log segments. Keeping this number small is useful because it increases 
the frequency that Kafka will garbage collect old messages.|
-|job.coordinator.replication.<br>factor|300000|The frequency at which the 
input streams' partition count change should be detected. When the input 
partition count change is detected, Samza will automatically restart a 
stateless job or fail a stateful job. A longer time interval is recommended for 
jobs w/ large number of input system stream partitions, since gathering 
partition count may incur measurable overhead to the job. You can completely 
disable partition count monitoring by setting [...]
 
|job.systemstreampartition.<br>grouper.factory|`org.apache.samza.`<br>`container.grouper.stream.`<br>`GroupByPartitionFactory`|A
 factory class that is used to determine how input SystemStreamPartitions are 
grouped together for processing in individual StreamTask instances. The factory 
must implement the SystemStreamPartitionGrouperFactory interface. Once this 
configuration is set, it can't be changed, since doing so could violate state 
semantics, and lead to a loss of data.<br><br>`org.a [...]
 |job.systemstreampartition.<br>matcher.class| |If you want to enable static 
partition assignment, then this is a required configuration. The value of this 
property is a fully-qualified Java class name that implements the interface 
org.apache.samza.system.SystemStreamPartitionMatcher. Samza ships with two 
matcher 
classes:<br><br>`org.apache.samza.system.RangeSystemStreamPartitionMatcher`<br>This
 classes uses a comma separated list of range(s) to determine which partition 
matches, and thus [...]
 |job.systemstreampartition.<br>matcher.config.<br>range| |If 
`job.systemstreampartition.matcher.class` is specified, and the value of this 
property is `org.apache.samza.system.RangeSystemStreamPartitionMatcher`, then 
this property is a required configuration. Specify a comma separated list of 
range(s) to determine which partition matches, and thus statically assigned to 
the Job. For example "2,3,11-20", statically assigns partition 2, 3, and 11 to 
20 for all the specified system and stre [...]
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 638474715..e3658ce03 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -111,6 +111,9 @@ public class JobConfig extends MapConfig {
   public static final String MONITOR_INPUT_REGEX_FREQUENCY_MS = 
"job.coordinator.monitor-input-regex.frequency.ms";
   static final int DEFAULT_MONITOR_INPUT_REGEX_FREQUENCY_MS = 300000;
 
+  public static final String COORDINATOR_EXECUTE_COMMAND = 
"job.coordinator.execute";
+  static final String DEFAULT_COORDINATOR_EXECUTE_COMMAND = "bin/run-jc.sh";
+
   public static final String REGEX_RESOLVED_STREAMS = 
"job.config.rewriter.%s.regex";
   public static final String REGEX_RESOLVED_SYSTEM = 
"job.config.rewriter.%s.system";
   public static final String REGEX_INHERITED_CONFIG = 
"job.config.rewriter.%s.config";
@@ -490,4 +493,8 @@ public class JobConfig extends MapConfig {
     }
     return elasticityFactor;
   }
+
+  public String getCoordinatorExecuteCommand() {
+    return get(COORDINATOR_EXECUTE_COMMAND, 
DEFAULT_COORDINATOR_EXECUTE_COMMAND);
+  }
 }
\ No newline at end of file
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java 
b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index 4d171662c..a037da41b 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -682,4 +682,15 @@ public class TestJobConfig {
     jobConfig = new JobConfig(new MapConfig());
     assertEquals(JobConfig.DEFAULT_JOB_ELASTICITY_FACTOR, 
jobConfig.getElasticityFactor());
   }
+
+  @Test
+  public void testGetCoordinatorExecuteCommand() {
+    JobConfig jobConfig = new JobConfig(new MapConfig());
+    assertEquals(JobConfig.DEFAULT_COORDINATOR_EXECUTE_COMMAND, 
jobConfig.getCoordinatorExecuteCommand());
+
+    String myJcCmd = "bin/run-my-jc.sh";
+    jobConfig = new JobConfig(new MapConfig(
+        Collections.singletonMap(JobConfig.COORDINATOR_EXECUTE_COMMAND, 
myJcCmd)));
+    assertEquals(myJcCmd, jobConfig.getCoordinatorExecuteCommand());
+  }
 }
\ No newline at end of file
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index 237667d07..16ff8a465 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -43,7 +43,7 @@ class YarnJob(config: Config, hadoopConfig: Configuration) 
extends StreamJob wit
   def submit: YarnJob = {
     try {
       val jobConfig = new JobConfig(config)
-      val cmdExec = "./__package/bin/run-jc.sh"
+      val cmdExec = "./__package/" + jobConfig.getCoordinatorExecuteCommand
       val environment = YarnJob.buildEnvironment(config, this.yarnConfig, 
jobConfig)
 
       appId = client.submitApplication(

Reply via email to