Repository: incubator-gobblin
Updated Branches:
  refs/heads/master a288779df -> 2e6ace666


[GOBBLIN-336] Run tasks in a separate process if enabled

Default is disabled.

Testing:
Added a unit test.

Default is disabled.

Testing:
Added a unit test.

Closes #2224 from HappyRay/start-a-task-in-a-new-
process


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/2e6ace66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/2e6ace66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/2e6ace66

Branch: refs/heads/master
Commit: 2e6ace6668541811263b69754454e147dd24395b
Parents: a288779
Author: Ray Yang <[email protected]>
Authored: Tue Dec 19 20:14:37 2017 -0800
Committer: Abhishek Tiwari <[email protected]>
Committed: Tue Dec 19 20:14:37 2017 -0800

----------------------------------------------------------------------
 .gitignore                                      |  3 +
 .../GobblinClusterConfigurationKeys.java        |  3 +
 .../gobblin/cluster/GobblinTaskRunner.java      | 51 ++++++++++---
 .../gobblin/cluster/HelixTaskFactory.java       | 79 ++++++++++++++++++++
 .../gobblin/cluster/ClusterIntegrationTest.java | 16 ++++
 5 files changed, 143 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2e6ace66/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 79d09d6..4738da5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -58,3 +58,6 @@ package-lock.json
 
 # generated java files
 **/gen-java/
+
+# generated config files by tests
+**/generated-gobblin-cluster.conf

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2e6ace66/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 3d9759c..8fd9bfd 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -30,6 +30,9 @@ public class GobblinClusterConfigurationKeys {
 
   public static final String GOBBLIN_CLUSTER_PREFIX = "gobblin.cluster.";
 
+  public static final String ENABLE_TASK_IN_SEPARATE_PROCESS =
+      GOBBLIN_CLUSTER_PREFIX + "enableTaskInSeparateProcess";
+
   // General Gobblin Cluster application configuration properties.
   public static final String APPLICATION_NAME_OPTION_NAME = "app_name";
   public static final String STANDALONE_CLUSTER_MODE = "standalone_cluster";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2e6ace66/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index ca715ba..5f2802c 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.cluster;
 
 import java.io.IOException;
 import java.net.URI;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -75,10 +76,13 @@ import org.apache.gobblin.runtime.TaskExecutor;
 import org.apache.gobblin.runtime.TaskStateTracker;
 import org.apache.gobblin.runtime.services.JMXReportingService;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.FileUtils;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.JvmUtils;
 import org.apache.gobblin.util.PathUtils;
 
+import static 
org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR;
+
 
 /**
  * The main class running in the containers managing services for running 
Gobblin
@@ -108,6 +112,7 @@ import org.apache.gobblin.util.PathUtils;
 public class GobblinTaskRunner {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinTaskRunner.class);
+  static final java.nio.file.Path CLUSTER_CONF_PATH = 
Paths.get("generated-gobblin-cluster.conf");
 
   static final String GOBBLIN_TASK_FACTORY_NAME = "GobblinTaskFactory";
 
@@ -137,11 +142,15 @@ public class GobblinTaskRunner {
       String taskRunnerId, Config config, Optional<Path> appWorkDirOptional)
       throws Exception {
     this.helixInstanceName = helixInstanceName;
-    this.config = config;
     this.taskRunnerId = taskRunnerId;
 
     Configuration conf = HadoopUtils.newConfiguration();
-    this.fs = buildFileSystem(this.config, conf);
+    this.fs = buildFileSystem(config, conf);
+    Path appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get()
+        : GobblinClusterUtils
+            .getAppWorkDirPathFromConfig(config, this.fs, applicationName, 
applicationId);
+
+    this.config = saveConfigToFile(config, appWorkDir);
 
     String zkConnectionString =
         
config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
@@ -156,10 +165,6 @@ public class GobblinTaskRunner {
     TaskExecutor taskExecutor = new TaskExecutor(properties);
     TaskStateTracker taskStateTracker = new 
GobblinHelixTaskStateTracker(properties);
 
-    Path appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get()
-        : GobblinClusterUtils
-            .getAppWorkDirPathFromConfig(config, this.fs, applicationName, 
applicationId);
-
     List<Service> services = Lists.newArrayList(taskExecutor, taskStateTracker,
         new JMXReportingService(
             ImmutableMap.of("task.executor", 
taskExecutor.getTaskExecutorQueueMetricSet())));
@@ -177,14 +182,42 @@ public class GobblinTaskRunner {
 
     // Register task factory for the Helix task state model
     Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap();
-    taskFactoryMap.put(GOBBLIN_TASK_FACTORY_NAME,
-        new GobblinHelixTaskFactory(this.containerMetrics, taskExecutor, 
taskStateTracker, this.fs,
-            appWorkDir, stateStoreJobConfig, this.helixManager));
+
+    Boolean isRunTaskInSeparateProcessEnabled = 
getIsRunTaskInSeparateProcessEnabled();
+    TaskFactory taskFactory;
+    if (isRunTaskInSeparateProcessEnabled) {
+      LOGGER.info("Running a task in a separate process is enabled.");
+      taskFactory = new HelixTaskFactory(this.containerMetrics, 
CLUSTER_CONF_PATH);
+    } else {
+      taskFactory =
+          new GobblinHelixTaskFactory(this.containerMetrics, taskExecutor, 
taskStateTracker,
+              this.fs, appWorkDir, stateStoreJobConfig, this.helixManager);
+    }
+
+    taskFactoryMap.put(GOBBLIN_TASK_FACTORY_NAME, taskFactory);
     this.taskStateModelFactory = new TaskStateModelFactory(this.helixManager, 
taskFactoryMap);
     this.helixManager.getStateMachineEngine()
         .registerStateModelFactory("Task", this.taskStateModelFactory);
   }
 
+  private Boolean getIsRunTaskInSeparateProcessEnabled() {
+    Boolean enabled = false;
+    if 
(this.config.hasPath(GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS))
 {
+      enabled =
+          
this.config.getBoolean(GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS);
+    }
+    return enabled;
+  }
+
+  private Config saveConfigToFile(Config config, Path appWorkDir)
+      throws IOException {
+    Config newConf =
+        config.withValue(CLUSTER_WORK_DIR, 
ConfigValueFactory.fromAnyRef(appWorkDir.toString()));
+    ConfigUtils configUtils = new ConfigUtils(new FileUtils());
+    configUtils.saveConfigToFile(newConf, CLUSTER_CONF_PATH);
+    return newConf;
+  }
+
   /**
    * Start this {@link GobblinTaskRunner} instance.
    */

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2e6ace66/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskFactory.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskFactory.java
new file mode 100644
index 0000000..ecb97d5
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Map;
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Counter;
+import com.google.common.base.Optional;
+
+import org.apache.gobblin.util.GobblinProcessBuilder;
+import org.apache.gobblin.util.SystemPropertiesWrapper;
+
+
+public class HelixTaskFactory implements TaskFactory {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(HelixTaskFactory.class);
+
+  private static final String GOBBLIN_CLUSTER_NEW_HELIX_TASK_COUNTER =
+      "gobblin.cluster.new.helix.task";
+
+  private final Optional<ContainerMetrics> containerMetrics;
+
+  /**
+   * A {@link Counter} to count the number of new {@link GobblinHelixTask}s 
that are created.
+   */
+  private final Optional<Counter> newTasksCounter;
+  private final SingleTaskLauncher launcher;
+
+  public HelixTaskFactory(Optional<ContainerMetrics> containerMetrics, Path 
clusterConfPath) {
+    this.containerMetrics = containerMetrics;
+    if (this.containerMetrics.isPresent()) {
+      this.newTasksCounter = Optional
+          
.of(this.containerMetrics.get().getCounter(GOBBLIN_CLUSTER_NEW_HELIX_TASK_COUNTER));
+    } else {
+      this.newTasksCounter = Optional.absent();
+    }
+    launcher = new SingleTaskLauncher(new GobblinProcessBuilder(), new 
SystemPropertiesWrapper(),
+        clusterConfPath);
+  }
+
+  @Override
+  public Task createNewTask(TaskCallbackContext context) {
+    try {
+      if (this.newTasksCounter.isPresent()) {
+        this.newTasksCounter.get().inc();
+      }
+      Map<String, String> configMap = context.getTaskConfig().getConfigMap();
+      return new SingleHelixTask(this.launcher, configMap);
+    } catch (IOException ioe) {
+      final String msg = "Failed to create a new SingleHelixTask";
+      logger.error(msg, ioe);
+      throw new GobblinClusterException(msg, ioe);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2e6ace66/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
index b54db1a..3e40363 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -72,10 +72,23 @@ public class ClusterIntegrationTest {
   private TestingServer _testingZKServer;
   private GobblinTaskRunner _worker;
   private GobblinClusterManager _manager;
+  private boolean _runTaskInSeparateProcess;
 
 
   @Test
   public void simpleJobShouldComplete() throws Exception {
+    runSimpleJobAndVerifyResult();
+  }
+
+  @Test
+  public void simpleJobShouldCompleteInTaskIsolationMode()
+      throws Exception {
+    _runTaskInSeparateProcess = true;
+    runSimpleJobAndVerifyResult();
+  }
+
+  private void runSimpleJobAndVerifyResult()
+      throws Exception {
     init();
     startCluster();
     waitForAndVerifyOutputFiles();
@@ -138,6 +151,9 @@ public class ClusterIntegrationTest {
     String zkConnectionString = _testingZKServer.getConnectString();
     configMap.put(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY, 
zkConnectionString);
     configMap.put(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR, 
_workPath.toString());
+    if (_runTaskInSeparateProcess) {
+      
configMap.put(GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS, 
"true");
+    }
     Config config = ConfigFactory.parseMap(configMap);
     return config;
   }

Reply via email to