This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new e4eefe2 SAMZA-2453: Update ClusterBasedJobCoordinator to support Beam
jobs (#1309)
e4eefe2 is described below
commit e4eefe2a9b7d6559ee5ce236e5357d773c199816
Author: Ke Wu <[email protected]>
AuthorDate: Tue Mar 10 13:20:39 2020 -0700
SAMZA-2453: Update ClusterBasedJobCoordinator to support Beam jobs (#1309)
---
.../clustermanager/ClusterBasedJobCoordinator.java | 58 ++++++++++++++++++++--
.../TestClusterBasedJobCoordinator.java | 27 ++++++++--
2 files changed, 79 insertions(+), 6 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index f894bcf..b56cb52 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -22,6 +22,8 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -195,7 +197,7 @@ public class ClusterBasedJobCoordinator {
* @param metadataStore metadata store to hold metadata.
* @param fullJobConfig full job config.
*/
- private ClusterBasedJobCoordinator(MetricsRegistryMap metrics, MetadataStore
metadataStore, Config fullJobConfig) {
+ public ClusterBasedJobCoordinator(MetricsRegistryMap metrics, MetadataStore
metadataStore, Config fullJobConfig) {
this.metrics = metrics;
this.metadataStore = metadataStore;
this.config = fullJobConfig;
@@ -551,8 +553,28 @@ public class ClusterBasedJobCoordinator {
throw new SamzaException(e);
}
- ClusterBasedJobCoordinator jc = createFromConfigLoader(submissionConfig);
- jc.run();
+ ApplicationConfig appConfig = new ApplicationConfig(submissionConfig);
+
+ /*
+ * Invoke app.main.class with app.main.args when present.
+ * For Beam jobs, app.main.class will be Beam's main class
+ * and app.main.args will be Beam's pipeline options.
+ */
+ if (appConfig.getAppMainClass().isPresent()) {
+ String className = appConfig.getAppMainClass().get();
+ LOG.info("Invoke main {}", className);
+ try {
+ Class<?> cls = Class.forName(className);
+ Method mainMethod = cls.getMethod("main", String[].class);
+ mainMethod.invoke(null, (Object) toArgs(appConfig));
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+ } else {
+ ClusterBasedJobCoordinator jc =
createFromConfigLoader(submissionConfig);
+ jc.run();
+ }
+
LOG.info("Finished running ClusterBasedJobCoordinator");
} else {
// TODO: Clean this up once SAMZA-2405 is completed when legacy flow is
removed.
@@ -634,4 +656,34 @@ public class ClusterBasedJobCoordinator {
metadataStore,
config);
}
+
+ /**
+ * Convert Samza config to command line arguments to invoke app.main.class
+ *
+ * @param config Samza config to convert.
+ * @return converted command line arguments.
+ */
+ @VisibleForTesting
+ static String[] toArgs(ApplicationConfig config) {
+ List<String> args = new ArrayList<>(config.size() * 2);
+
+ config.forEach((key, value) -> {
+ if (key.equals(ApplicationConfig.APP_MAIN_ARGS)) {
+ /*
+ * Converts native beam pipeline options such as
+ * --runner=SamzaRunner --maxSourceParallelism=1024
+ */
+ args.addAll(Arrays.asList(value.split("\\s")));
+ } else {
+ /*
+ * Converts native Samza configs to config override format such as
+ * --config job.name=test
+ */
+ args.add("--config");
+ args.add(String.format("%s=%s", key, value));
+ }
+ });
+
+ return args.toArray(new String[0]);
+ }
}
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
index 787edf2..513c075 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -19,10 +19,13 @@
package org.apache.samza.clustermanager;
+import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
@@ -55,9 +58,7 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
import static org.mockito.AdditionalMatchers.aryEq;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.any;
@@ -240,4 +241,24 @@ public class TestClusterBasedJobCoordinator {
verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(MetricsRegistryMap.class),
eq(mockCoordinatorStreamStore), eq(fullJobConfig));
}
+
+ @Test
+ public void testToArgs() {
+ ApplicationConfig appConfig = new ApplicationConfig(new
MapConfig(ImmutableMap.of(
+ JobConfig.JOB_NAME, "test1",
+ ApplicationConfig.APP_CLASS, "class1",
+ ApplicationConfig.APP_MAIN_ARGS, "--runner=SamzaRunner
--maxSourceParallelism=1024"
+ )));
+
+ List<String> expected = Arrays.asList(
+ "--config", "job.name=test1",
+ "--config", "app.class=class1",
+ "--runner=SamzaRunner",
+ "--maxSourceParallelism=1024");
+ List<String> actual =
Arrays.asList(ClusterBasedJobCoordinator.toArgs(appConfig));
+
+ // cannot assert expected equals to actual as the order can be different.
+ assertEquals(expected.size(), actual.size());
+ assertTrue(actual.containsAll(expected));
+ }
}