This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new e4eefe2  SAMZA-2453: Update ClusterBasedJobCoordinator to support Beam 
jobs (#1309)
e4eefe2 is described below

commit e4eefe2a9b7d6559ee5ce236e5357d773c199816
Author: Ke Wu <[email protected]>
AuthorDate: Tue Mar 10 13:20:39 2020 -0700

    SAMZA-2453: Update ClusterBasedJobCoordinator to support Beam jobs (#1309)
---
 .../clustermanager/ClusterBasedJobCoordinator.java | 58 ++++++++++++++++++++--
 .../TestClusterBasedJobCoordinator.java            | 27 ++++++++--
 2 files changed, 79 insertions(+), 6 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index f894bcf..b56cb52 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -22,6 +22,8 @@ import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -195,7 +197,7 @@ public class ClusterBasedJobCoordinator {
    * @param metadataStore metadata store to hold metadata.
    * @param fullJobConfig full job config.
    */
-  private ClusterBasedJobCoordinator(MetricsRegistryMap metrics, MetadataStore 
metadataStore, Config fullJobConfig) {
+  public ClusterBasedJobCoordinator(MetricsRegistryMap metrics, MetadataStore 
metadataStore, Config fullJobConfig) {
     this.metrics = metrics;
     this.metadataStore = metadataStore;
     this.config = fullJobConfig;
@@ -551,8 +553,28 @@ public class ClusterBasedJobCoordinator {
         throw new SamzaException(e);
       }
 
-      ClusterBasedJobCoordinator jc = createFromConfigLoader(submissionConfig);
-      jc.run();
+      ApplicationConfig appConfig = new ApplicationConfig(submissionConfig);
+
+      /*
+       * Invoke app.main.class with app.main.args when present.
+       * For Beam jobs, app.main.class will be Beam's main class
+       * and app.main.args will be Beam's pipeline options.
+       */
+      if (appConfig.getAppMainClass().isPresent()) {
+        String className = appConfig.getAppMainClass().get();
+        LOG.info("Invoke main {}", className);
+        try {
+          Class<?> cls = Class.forName(className);
+          Method mainMethod = cls.getMethod("main", String[].class);
+          mainMethod.invoke(null, (Object) toArgs(appConfig));
+        } catch (Exception e) {
+          throw new SamzaException(e);
+        }
+      } else {
+        ClusterBasedJobCoordinator jc = 
createFromConfigLoader(submissionConfig);
+        jc.run();
+      }
+
       LOG.info("Finished running ClusterBasedJobCoordinator");
     } else {
       // TODO: Clean this up once SAMZA-2405 is completed when legacy flow is 
removed.
@@ -634,4 +656,34 @@ public class ClusterBasedJobCoordinator {
         metadataStore,
         config);
   }
+
+  /**
+   * Convert Samza config to command line arguments to invoke app.main.class
+   *
+   * @param config Samza config to convert.
+   * @return converted command line arguments.
+   */
+  @VisibleForTesting
+  static String[] toArgs(ApplicationConfig config) {
+    List<String> args = new ArrayList<>(config.size() * 2);
+
+    config.forEach((key, value) -> {
+        if (key.equals(ApplicationConfig.APP_MAIN_ARGS)) {
+          /*
+           * Converts native beam pipeline options such as
+           * --runner=SamzaRunner --maxSourceParallelism=1024
+           */
+          args.addAll(Arrays.asList(value.split("\\s")));
+        } else {
+          /*
+           * Converts native Samza configs to config override format such as
+           * --config job.name=test
+           */
+          args.add("--config");
+          args.add(String.format("%s=%s", key, value));
+        }
+      });
+
+    return args.toArray(new String[0]);
+  }
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
index 787edf2..513c075 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -19,10 +19,13 @@
 
 package org.apache.samza.clustermanager;
 
+import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
@@ -55,9 +58,7 @@ import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 import static org.mockito.AdditionalMatchers.aryEq;
 import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.any;
@@ -240,4 +241,24 @@ public class TestClusterBasedJobCoordinator {
 
     
verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(MetricsRegistryMap.class),
 eq(mockCoordinatorStreamStore), eq(fullJobConfig));
   }
+
+  @Test
+  public void testToArgs() {
+    ApplicationConfig appConfig = new ApplicationConfig(new 
MapConfig(ImmutableMap.of(
+        JobConfig.JOB_NAME, "test1",
+        ApplicationConfig.APP_CLASS, "class1",
+        ApplicationConfig.APP_MAIN_ARGS, "--runner=SamzaRunner 
--maxSourceParallelism=1024"
+    )));
+
+    List<String> expected = Arrays.asList(
+        "--config", "job.name=test1",
+        "--config", "app.class=class1",
+        "--runner=SamzaRunner",
+        "--maxSourceParallelism=1024");
+    List<String> actual = 
Arrays.asList(ClusterBasedJobCoordinator.toArgs(appConfig));
+
+    // cannot assert expected equals to actual as the order can be different.
+    assertEquals(expected.size(), actual.size());
+    assertTrue(actual.containsAll(expected));
+  }
 }

Reply via email to