[GOBBLIN-388] Allow classpath to be configured for JVM based task execution in gobblin cluster
Closes #2265 from yukuai518/classpath Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6a31ef84 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6a31ef84 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6a31ef84 Branch: refs/heads/0.12.0 Commit: 6a31ef845bd9617cb5cb8fa8ef53f184c3d6dd88 Parents: 11abf9f Author: Kuai Yu <[email protected]> Authored: Wed Jan 24 21:27:03 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Wed Jan 24 21:27:03 2018 -0800 ---------------------------------------------------------------------- .../cluster/GobblinClusterConfigurationKeys.java | 3 +++ .../org/apache/gobblin/cluster/GobblinTaskRunner.java | 2 +- .../org/apache/gobblin/cluster/HelixTaskFactory.java | 5 +++-- .../org/apache/gobblin/cluster/SingleTaskLauncher.java | 13 +++++++++++-- .../apache/gobblin/cluster/SingleTaskLauncherTest.java | 4 +++- 5 files changed, 21 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6a31ef84/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index 4e78078..de501f1 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -30,8 +30,11 @@ public class GobblinClusterConfigurationKeys { public static final String GOBBLIN_CLUSTER_PREFIX = "gobblin.cluster."; + // Task separation properties public static final String ENABLE_TASK_IN_SEPARATE_PROCESS = GOBBLIN_CLUSTER_PREFIX + "enableTaskInSeparateProcess"; + public static final String TASK_CLASSPATH = + GOBBLIN_CLUSTER_PREFIX + "task.classpath"; // General Gobblin Cluster application configuration properties. public static final String APPLICATION_NAME_OPTION_NAME = "app_name"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6a31ef84/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java index dead73b..8816457 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java @@ -192,7 +192,7 @@ public class GobblinTaskRunner { TaskFactory taskFactory; if (isRunTaskInSeparateProcessEnabled) { logger.info("Running a task in a separate process is enabled."); - taskFactory = new HelixTaskFactory(this.containerMetrics, CLUSTER_CONF_PATH); + taskFactory = new HelixTaskFactory(this.containerMetrics, CLUSTER_CONF_PATH, config); } else { taskFactory = getInProcessTaskFactory(); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6a31ef84/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskFactory.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskFactory.java index ecb97d5..96ecffc 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskFactory.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskFactory.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import com.codahale.metrics.Counter; import com.google.common.base.Optional; +import com.typesafe.config.Config; import org.apache.gobblin.util.GobblinProcessBuilder; import org.apache.gobblin.util.SystemPropertiesWrapper; @@ -49,7 +50,7 @@ public class HelixTaskFactory implements TaskFactory { private final Optional<Counter> newTasksCounter; private final SingleTaskLauncher launcher; - public HelixTaskFactory(Optional<ContainerMetrics> containerMetrics, Path clusterConfPath) { + public HelixTaskFactory(Optional<ContainerMetrics> containerMetrics, Path clusterConfPath, Config sysConfig) { this.containerMetrics = containerMetrics; if (this.containerMetrics.isPresent()) { this.newTasksCounter = Optional @@ -58,7 +59,7 @@ public class HelixTaskFactory implements TaskFactory { this.newTasksCounter = Optional.absent(); } launcher = new SingleTaskLauncher(new GobblinProcessBuilder(), new SystemPropertiesWrapper(), - clusterConfPath); + clusterConfPath, sysConfig); } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6a31ef84/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java index 10bad09..1fe3eaf 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java @@ -29,6 +29,8 @@ import org.slf4j.LoggerFactory; import org.apache.gobblin.util.GobblinProcessBuilder; import org.apache.gobblin.util.SystemPropertiesWrapper; +import com.typesafe.config.Config; + import static org.apache.gobblin.cluster.SingleTaskRunnerMainOptions.CLUSTER_CONFIG_FILE_PATH; import static org.apache.gobblin.cluster.SingleTaskRunnerMainOptions.JOB_ID; import static org.apache.gobblin.cluster.SingleTaskRunnerMainOptions.WORK_UNIT_FILE_PATH; @@ -40,12 +42,14 @@ class SingleTaskLauncher { private final GobblinProcessBuilder processBuilder; private final SystemPropertiesWrapper propertiesWrapper; private final Path clusterConfigFilePath; + private final Config sysConfig; SingleTaskLauncher(final GobblinProcessBuilder processBuilder, - final SystemPropertiesWrapper propertiesWrapper, final Path clusterConfigFilePath) { + final SystemPropertiesWrapper propertiesWrapper, final Path clusterConfigFilePath, Config sysConfig) { this.processBuilder = processBuilder; this.propertiesWrapper = propertiesWrapper; this.clusterConfigFilePath = clusterConfigFilePath; + this.sysConfig = sysConfig; } Process launch(final String jobId, final Path workUnitFilePath) @@ -94,7 +98,12 @@ class SingleTaskLauncher { private void addClassPath() { this.cmd.add("-cp"); - final String classPath = SingleTaskLauncher.this.propertiesWrapper.getJavaClassPath(); + String classPath; + if (sysConfig.hasPath(GobblinClusterConfigurationKeys.TASK_CLASSPATH)) { + classPath = sysConfig.getString(GobblinClusterConfigurationKeys.TASK_CLASSPATH); + } else { + classPath = SingleTaskLauncher.this.propertiesWrapper.getJavaClassPath(); + } this.cmd.add(classPath); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6a31ef84/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java index a8a361c..afa933d 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java @@ -28,6 +28,8 @@ import org.testng.annotations.Test; import org.apache.gobblin.util.GobblinProcessBuilder; import org.apache.gobblin.util.SystemPropertiesWrapper; +import com.typesafe.config.ConfigFactory; + import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -56,7 +58,7 @@ public class SingleTaskLauncherTest { final Path clusterConfPath = Paths.get(CLUSTER_CONFIG_CONF_PATH); final SingleTaskLauncher launcher = - new SingleTaskLauncher(processBuilder, propertiesWrapper, clusterConfPath); + new SingleTaskLauncher(processBuilder, propertiesWrapper, clusterConfPath, ConfigFactory.empty()); final Path workUnitPath = Paths.get(WORK_UNIT_PATH); final Process process = launcher.launch(JOB_ID, workUnitPath);
