Repository: incubator-gobblin Updated Branches: refs/heads/master a288779df -> 2e6ace666
[GOBBLIN-336] Run tasks in a separate process if enabled Default is disabled. Testing: Added a unit test. Default is disabled. Testing: Added a unit test. Closes #2224 from HappyRay/start-a-task-in-a-new- process Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/2e6ace66 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/2e6ace66 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/2e6ace66 Branch: refs/heads/master Commit: 2e6ace6668541811263b69754454e147dd24395b Parents: a288779 Author: Ray Yang <[email protected]> Authored: Tue Dec 19 20:14:37 2017 -0800 Committer: Abhishek Tiwari <[email protected]> Committed: Tue Dec 19 20:14:37 2017 -0800 ---------------------------------------------------------------------- .gitignore | 3 + .../GobblinClusterConfigurationKeys.java | 3 + .../gobblin/cluster/GobblinTaskRunner.java | 51 ++++++++++--- .../gobblin/cluster/HelixTaskFactory.java | 79 ++++++++++++++++++++ .../gobblin/cluster/ClusterIntegrationTest.java | 16 ++++ 5 files changed, 143 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2e6ace66/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 79d09d6..4738da5 100644 --- a/.gitignore +++ b/.gitignore @@ -58,3 +58,6 @@ package-lock.json # generated java files **/gen-java/ + +# generated config files by tests +**/generated-gobblin-cluster.conf http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2e6ace66/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index 3d9759c..8fd9bfd 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -30,6 +30,9 @@ public class GobblinClusterConfigurationKeys { public static final String GOBBLIN_CLUSTER_PREFIX = "gobblin.cluster."; + public static final String ENABLE_TASK_IN_SEPARATE_PROCESS = + GOBBLIN_CLUSTER_PREFIX + "enableTaskInSeparateProcess"; + // General Gobblin Cluster application configuration properties. public static final String APPLICATION_NAME_OPTION_NAME = "app_name"; public static final String STANDALONE_CLUSTER_MODE = "standalone_cluster"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2e6ace66/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java index ca715ba..5f2802c 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java @@ -19,6 +19,7 @@ package org.apache.gobblin.cluster; import java.io.IOException; import java.net.URI; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -75,10 +76,13 @@ import org.apache.gobblin.runtime.TaskExecutor; import org.apache.gobblin.runtime.TaskStateTracker; import org.apache.gobblin.runtime.services.JMXReportingService; import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.FileUtils; import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.JvmUtils; import org.apache.gobblin.util.PathUtils; +import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR; + /** * The main class running in the containers managing services for running Gobblin @@ -108,6 +112,7 @@ import org.apache.gobblin.util.PathUtils; public class GobblinTaskRunner { private static final Logger LOGGER = LoggerFactory.getLogger(GobblinTaskRunner.class); + static final java.nio.file.Path CLUSTER_CONF_PATH = Paths.get("generated-gobblin-cluster.conf"); static final String GOBBLIN_TASK_FACTORY_NAME = "GobblinTaskFactory"; @@ -137,11 +142,15 @@ public class GobblinTaskRunner { String taskRunnerId, Config config, Optional<Path> appWorkDirOptional) throws Exception { this.helixInstanceName = helixInstanceName; - this.config = config; this.taskRunnerId = taskRunnerId; Configuration conf = HadoopUtils.newConfiguration(); - this.fs = buildFileSystem(this.config, conf); + this.fs = buildFileSystem(config, conf); + Path appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get() + : GobblinClusterUtils + .getAppWorkDirPathFromConfig(config, this.fs, applicationName, applicationId); + + this.config = saveConfigToFile(config, appWorkDir); String zkConnectionString = config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); @@ -156,10 +165,6 @@ public class GobblinTaskRunner { TaskExecutor taskExecutor = new TaskExecutor(properties); TaskStateTracker taskStateTracker = new GobblinHelixTaskStateTracker(properties); - Path appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get() - : GobblinClusterUtils - .getAppWorkDirPathFromConfig(config, this.fs, applicationName, applicationId); - List<Service> services = Lists.newArrayList(taskExecutor, taskStateTracker, new JMXReportingService( ImmutableMap.of("task.executor", taskExecutor.getTaskExecutorQueueMetricSet()))); @@ -177,14 +182,42 @@ public class GobblinTaskRunner { // Register task factory for the Helix task state model Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap(); - taskFactoryMap.put(GOBBLIN_TASK_FACTORY_NAME, - new GobblinHelixTaskFactory(this.containerMetrics, taskExecutor, taskStateTracker, this.fs, - appWorkDir, stateStoreJobConfig, this.helixManager)); + + Boolean isRunTaskInSeparateProcessEnabled = getIsRunTaskInSeparateProcessEnabled(); + TaskFactory taskFactory; + if (isRunTaskInSeparateProcessEnabled) { + LOGGER.info("Running a task in a separate process is enabled."); + taskFactory = new HelixTaskFactory(this.containerMetrics, CLUSTER_CONF_PATH); + } else { + taskFactory = + new GobblinHelixTaskFactory(this.containerMetrics, taskExecutor, taskStateTracker, + this.fs, appWorkDir, stateStoreJobConfig, this.helixManager); + } + + taskFactoryMap.put(GOBBLIN_TASK_FACTORY_NAME, taskFactory); this.taskStateModelFactory = new TaskStateModelFactory(this.helixManager, taskFactoryMap); this.helixManager.getStateMachineEngine() .registerStateModelFactory("Task", this.taskStateModelFactory); } + private Boolean getIsRunTaskInSeparateProcessEnabled() { + Boolean enabled = false; + if (this.config.hasPath(GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS)) { + enabled = + this.config.getBoolean(GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS); + } + return enabled; + } + + private Config saveConfigToFile(Config config, Path appWorkDir) + throws IOException { + Config newConf = + config.withValue(CLUSTER_WORK_DIR, ConfigValueFactory.fromAnyRef(appWorkDir.toString())); + ConfigUtils configUtils = new ConfigUtils(new FileUtils()); + configUtils.saveConfigToFile(newConf, CLUSTER_CONF_PATH); + return newConf; + } + /** * Start this {@link GobblinTaskRunner} instance. */ http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2e6ace66/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskFactory.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskFactory.java new file mode 100644 index 0000000..ecb97d5 --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskFactory.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Map; + +import org.apache.helix.task.Task; +import org.apache.helix.task.TaskCallbackContext; +import org.apache.helix.task.TaskFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.codahale.metrics.Counter; +import com.google.common.base.Optional; + +import org.apache.gobblin.util.GobblinProcessBuilder; +import org.apache.gobblin.util.SystemPropertiesWrapper; + + +public class HelixTaskFactory implements TaskFactory { + + private static final Logger logger = LoggerFactory.getLogger(HelixTaskFactory.class); + + private static final String GOBBLIN_CLUSTER_NEW_HELIX_TASK_COUNTER = + "gobblin.cluster.new.helix.task"; + + private final Optional<ContainerMetrics> containerMetrics; + + /** + * A {@link Counter} to count the number of new {@link GobblinHelixTask}s that are created. + */ + private final Optional<Counter> newTasksCounter; + private final SingleTaskLauncher launcher; + + public HelixTaskFactory(Optional<ContainerMetrics> containerMetrics, Path clusterConfPath) { + this.containerMetrics = containerMetrics; + if (this.containerMetrics.isPresent()) { + this.newTasksCounter = Optional + .of(this.containerMetrics.get().getCounter(GOBBLIN_CLUSTER_NEW_HELIX_TASK_COUNTER)); + } else { + this.newTasksCounter = Optional.absent(); + } + launcher = new SingleTaskLauncher(new GobblinProcessBuilder(), new SystemPropertiesWrapper(), + clusterConfPath); + } + + @Override + public Task createNewTask(TaskCallbackContext context) { + try { + if (this.newTasksCounter.isPresent()) { + this.newTasksCounter.get().inc(); + } + Map<String, String> configMap = context.getTaskConfig().getConfigMap(); + return new SingleHelixTask(this.launcher, configMap); + } catch (IOException ioe) { + final String msg = "Failed to create a new SingleHelixTask"; + logger.error(msg, ioe); + throw new GobblinClusterException(msg, ioe); + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2e6ace66/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java index b54db1a..3e40363 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java @@ -72,10 +72,23 @@ public class ClusterIntegrationTest { private TestingServer _testingZKServer; private GobblinTaskRunner _worker; private GobblinClusterManager _manager; + private boolean _runTaskInSeparateProcess; @Test public void simpleJobShouldComplete() throws Exception { + runSimpleJobAndVerifyResult(); + } + + @Test + public void simpleJobShouldCompleteInTaskIsolationMode() + throws Exception { + _runTaskInSeparateProcess = true; + runSimpleJobAndVerifyResult(); + } + + private void runSimpleJobAndVerifyResult() + throws Exception { init(); startCluster(); waitForAndVerifyOutputFiles(); @@ -138,6 +151,9 @@ public class ClusterIntegrationTest { String zkConnectionString = _testingZKServer.getConnectString(); configMap.put(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY, zkConnectionString); configMap.put(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR, _workPath.toString()); + if (_runTaskInSeparateProcess) { + configMap.put(GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS, "true"); + } Config config = ConfigFactory.parseMap(configMap); return config; }
