This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 764f09c SAMZA-2589: Consolidate Beam and High/Low Samza Apps launch
workflow (#1428)
764f09c is described below
commit 764f09c6239cdec961fe4f52ae97b8f374db107d
Author: Ke Wu <[email protected]>
AuthorDate: Fri Sep 11 15:33:50 2020 -0700
SAMZA-2589: Consolidate Beam and High/Low Samza Apps launch workflow (#1428)
Issues: app.main.class is only set for Beam apps which causes different
workflow on AM when launching a job.
Changes:
1. Introduce DefaultApplicationMain to capture launch workflow for High/Low
level jobs so on AM, all jobs are launched in the same way:
ClusterBasedJobCoordinatorRunner#main -> app.main.class ->
JobCoordinatorLaunchUtil
2. Update ApplicationConfig#getAppMainClass to default to
DefaultApplicationMain
Tests:
1. Unit Tests
2. Deployed hello samza job successfully with the change following
instructions on http://samza.apache.org/startup/hello-samza/latest/
API Changes: None
Upgrade Instructions: None
Usage Instructions: None
---
gradle/dependency-versions.gradle | 2 +-
.../ClusterBasedJobCoordinatorRunner.java | 38 ++++-------
.../clustermanager/DefaultApplicationMain.java | 47 ++++++++++++++
.../org/apache/samza/config/ApplicationConfig.java | 10 +--
.../clustermanager/TestDefaultApplicationMain.java | 74 ++++++++++++++++++++++
5 files changed, 136 insertions(+), 35 deletions(-)
diff --git a/gradle/dependency-versions.gradle
b/gradle/dependency-versions.gradle
index 061f0b7..28acb2a 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -36,7 +36,7 @@
jerseyVersion = "2.22.1"
jettyVersion = "9.4.20.v20190813"
jodaTimeVersion = "2.2"
- joptSimpleVersion = "3.2"
+ joptSimpleVersion = "5.0.4"
junitVersion = "4.12"
kafkaVersion = "2.0.1"
log4jVersion = "1.2.17"
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
index 107c473..a152032 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
@@ -26,17 +26,14 @@ import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
-import org.apache.samza.application.ApplicationUtil;
import org.apache.samza.classloader.IsolatingClassLoaderFactory;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.serializers.model.SamzaObjectMapper;
-import org.apache.samza.util.ConfigUtil;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.apache.samza.util.SplitDeploymentUtil;
import org.slf4j.Logger;
@@ -94,30 +91,16 @@ public class ClusterBasedJobCoordinatorRunner {
* For Beam jobs, app.main.class will be Beam's main class
* and app.main.args will be Beam's pipeline options.
*/
- if (appConfig.getAppMainClass().isPresent()) {
- String className = appConfig.getAppMainClass().get();
- LOG.info("Invoke main {}", className);
- try {
- Class<?> cls = Class.forName(className);
- Method mainMethod = cls.getMethod("main", String[].class);
- mainMethod.invoke(null, (Object) toArgs(appConfig));
- } catch (Exception e) {
- throw new SamzaException(e);
- }
- } else {
- JobConfig jobConfig = new JobConfig(submissionConfig);
-
- if (!jobConfig.getConfigLoaderFactory().isPresent()) {
- throw new SamzaException(JobConfig.CONFIG_LOADER_FACTORY + " is
required to initialize job coordinator from config loader");
- }
-
- // load full job config with ConfigLoader
- Config originalConfig = ConfigUtil.loadConfig(submissionConfig);
-
-
JobCoordinatorLaunchUtil.run(ApplicationUtil.fromConfig(originalConfig),
originalConfig);
+ String className = appConfig.getAppMainClass();
+ String[] arguments = toArgs(appConfig);
+ LOG.info("Invoke main {} with args {}", className, arguments);
+ try {
+ Class<?> cls = Class.forName(className);
+ Method mainMethod = cls.getMethod("main", String[].class);
+ mainMethod.invoke(null, (Object) arguments);
+ } catch (Exception e) {
+ throw new SamzaException(e);
}
-
- LOG.info("Finished running ClusterBasedJobCoordinator");
} else {
// TODO: Clean this up once SAMZA-2405 is completed when legacy flow is
removed.
Config coordinatorSystemConfig;
@@ -133,8 +116,9 @@ public class ClusterBasedJobCoordinatorRunner {
}
ClusterBasedJobCoordinator jc =
createFromMetadataStore(coordinatorSystemConfig);
jc.run();
- LOG.info("Finished running ClusterBasedJobCoordinator");
}
+
+ LOG.info("Finished running ClusterBasedJobCoordinator");
}
/**
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/DefaultApplicationMain.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/DefaultApplicationMain.java
new file mode 100644
index 0000000..9611bf9
--- /dev/null
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/DefaultApplicationMain.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import com.google.common.annotations.VisibleForTesting;
+import joptsimple.OptionSet;
+import org.apache.samza.application.ApplicationUtil;
+import org.apache.samza.config.Config;
+import org.apache.samza.runtime.ApplicationRunnerMain;
+import org.apache.samza.util.ConfigUtil;
+
+
+public class DefaultApplicationMain {
+
+ public static void main(String[] args) {
+ run(args);
+ }
+
+ @VisibleForTesting
+ static void run(String[] args) {
+ // This branch is ONLY for Yarn deployments, standalone apps uses offspring
+ final ApplicationRunnerMain.ApplicationRunnerCommandLine cmdLine = new
ApplicationRunnerMain.ApplicationRunnerCommandLine();
+ cmdLine.parser().allowsUnrecognizedOptions();
+
+ final OptionSet options = cmdLine.parser().parse(args);
+ // load full job config with ConfigLoader
+ final Config originalConfig =
ConfigUtil.loadConfig(cmdLine.loadConfig(options));
+
+ JobCoordinatorLaunchUtil.run(ApplicationUtil.fromConfig(originalConfig),
originalConfig);
+ }
+}
diff --git
a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
index ea2e943..3dabd05 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
@@ -18,7 +18,7 @@
*/
package org.apache.samza.config;
-import java.util.Optional;
+import org.apache.samza.clustermanager.DefaultApplicationMain;
import org.apache.samza.runtime.UUIDGenerator;
@@ -103,12 +103,8 @@ public class ApplicationConfig extends MapConfig {
return ApplicationMode.valueOf(get(APP_MODE,
ApplicationMode.STREAM.name()).toUpperCase());
}
- public Optional<String> getAppMainArgs() {
- return Optional.ofNullable(get(APP_MAIN_ARGS));
- }
-
- public Optional<String> getAppMainClass() {
- return Optional.ofNullable(get(APP_MAIN_CLASS));
+ public String getAppMainClass() {
+ return get(APP_MAIN_CLASS, DefaultApplicationMain.class.getName());
}
public String getAppRunnerClass() {
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestDefaultApplicationMain.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestDefaultApplicationMain.java
new file mode 100644
index 0000000..57fbed8
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestDefaultApplicationMain.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.application.ApplicationUtil;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory;
+import org.apache.samza.util.ConfigUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.times;
+import static org.powermock.api.mockito.PowerMockito.doNothing;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.verifyStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+ ApplicationUtil.class,
+ ConfigUtil.class,
+ JobCoordinatorLaunchUtil.class,
+ ClusterBasedJobCoordinatorRunner.class})
+public class TestDefaultApplicationMain {
+
+ @Test
+ public void testRun() throws Exception {
+ String[] args = new String[] {
+ "--config",
+ JobConfig.CONFIG_LOADER_FACTORY + "=" +
PropertiesConfigLoaderFactory.class.getName(),
+ "--config",
+ PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX +
"path=" + getClass().getResource("/test.properties").getPath()
+ };
+
+ StreamApplication mockApplication = mock(StreamApplication.class);
+ Config mockConfig = mock(Config.class);
+ mockStatic(JobCoordinatorLaunchUtil.class, ApplicationUtil.class,
ConfigUtil.class);
+
+ when(ApplicationUtil.fromConfig(any()))
+ .thenReturn(mockApplication);
+ when(ConfigUtil.loadConfig(any()))
+ .thenReturn(mockConfig);
+ doNothing()
+ .when(JobCoordinatorLaunchUtil.class, "run",
+ mockApplication, mockConfig);
+ DefaultApplicationMain.run(args);
+
+ verifyStatic(times(1));
+ JobCoordinatorLaunchUtil.run(mockApplication, mockConfig);
+ }
+}