Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 291b40bf9 -> e7bb4c40f


[GOBBLIN-506] Add job tags support for gobblin cluster

Closes #2376 from yukuai518/jobta


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/e7bb4c40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/e7bb4c40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/e7bb4c40

Branch: refs/heads/master
Commit: e7bb4c40fc7171e561f5ab02f10a9b6fddeed4b5
Parents: 291b40b
Author: Kuai Yu <[email protected]>
Authored: Mon Jun 4 15:02:52 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Mon Jun 4 15:02:52 2018 -0700

----------------------------------------------------------------------
 .../GobblinClusterConfigurationKeys.java        |   4 +
 .../cluster/GobblinHelixJobLauncher.java        |   6 +-
 .../gobblin/cluster/GobblinTaskRunner.java      | 115 +++-------
 .../gobblin/cluster/TaskRunnerSuiteBase.java    | 113 +++++++++
 .../cluster/TaskRunnerSuiteProcessModel.java    |  62 +++++
 .../cluster/TaskRunnerSuiteThreadModel.java     |  91 ++++++++
 .../gobblin/cluster/ClusterIntegrationTest.java | 228 ++-----------------
 .../cluster/TaskRunnerSuiteForJobTagTest.java   |  72 ++++++
 .../cluster/suite/IntegrationBasicSuite.java    | 220 ++++++++++++++++++
 ...IntegrationDedicatedManagerClusterSuite.java |  54 +++++
 .../cluster/suite/IntegrationJobTagSuite.java   | 202 ++++++++++++++++
 .../suite/IntegrationSeparateProcessSuite.java  |  45 ++++
 .../src/test/resources/BasicCluster.conf        |   4 -
 .../src/test/resources/BasicManager.conf        |  22 ++
 .../src/test/resources/BasicWorker.conf         |  19 ++
 .../gobblin/runtime/ForkThrowableHolder.java    |   2 +-
 16 files changed, 971 insertions(+), 288 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 1ad7445..492edb8 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -63,6 +63,8 @@ public class GobblinClusterConfigurationKeys {
   // Should job be executed in the scheduler thread?
   public static final String JOB_EXECUTE_IN_SCHEDULING_THREAD = 
GOBBLIN_CLUSTER_PREFIX + "job.executeInSchedulingThread";
   public static final boolean JOB_EXECUTE_IN_SCHEDULING_THREAD_DEFAULT = true;
+  public static final String HELIX_JOB_TAG_KEY = GOBBLIN_CLUSTER_PREFIX + 
"helixJobTag";
+  public static final String HELIX_INSTANCE_TAGS_KEY = GOBBLIN_CLUSTER_PREFIX 
+ "helixInstanceTags";
 
   /**
    * A path pointing to a directory that contains job execution files to be 
executed by Gobblin. This directory can
@@ -98,4 +100,6 @@ public class GobblinClusterConfigurationKeys {
 
   public static final String HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS = 
GOBBLIN_CLUSTER_PREFIX + "jobQueueDeleteTimeoutSeconds";
   public static final long DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS = 
300;
+
+  public static final String TASK_RUNNER_SUITE_BUILDER = 
GOBBLIN_CLUSTER_PREFIX + "taskRunnerSuite.builder";
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 8c7bbe1..6b86c5c 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -257,13 +257,17 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     
jobConfigBuilder.setTimeoutPerTask(this.jobContext.getJobState().getPropAsLong(
         ConfigurationKeys.HELIX_TASK_TIMEOUT_SECONDS,
         ConfigurationKeys.DEFAULT_HELIX_TASK_TIMEOUT_SECONDS) * 1000);
-    
+
     jobConfigBuilder.setFailureThreshold(workUnits.size());
     
jobConfigBuilder.addTaskConfigMap(taskConfigMap).setCommand(GobblinTaskRunner.GOBBLIN_TASK_FACTORY_NAME);
     
jobConfigBuilder.setNumConcurrentTasksPerInstance(ConfigUtils.getInt(jobConfig,
         GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY,
         
GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY_DEFAULT));
 
+    if 
(this.jobConfig.hasPath(GobblinClusterConfigurationKeys.HELIX_JOB_TAG_KEY)) {
+      
jobConfigBuilder.setInstanceGroupTag(this.jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_TAG_KEY));
+    }
+
     if 
(Task.getExecutionModel(ConfigUtils.configToState(jobConfig)).equals(ExecutionModel.STREAMING))
 {
       jobConfigBuilder.setRebalanceRunningTask(true);
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index e68774d..b6f04f1 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -58,7 +58,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.EventBus;
@@ -70,22 +69,18 @@ import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
 import javax.annotation.Nonnull;
-import lombok.Getter;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
 import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.runtime.TaskExecutor;
-import org.apache.gobblin.runtime.TaskStateTracker;
-import org.apache.gobblin.runtime.services.JMXReportingService;
+import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.FileUtils;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.JvmUtils;
-import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 import static 
org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR;
 
@@ -124,6 +119,8 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
 
   private final String helixInstanceName;
 
+  private final String clusterName;
+
   private HelixManager helixManager;
 
   private final ServiceManager serviceManager;
@@ -147,6 +144,7 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
   private final String applicationName;
   private final String applicationId;
   private final Path appWorkPath;
+
   private final MetricContext metricContext;
   private final StandardMetricsBridge.StandardMetrics metrics;
 
@@ -160,21 +158,30 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
 
     Configuration conf = HadoopUtils.newConfiguration();
     this.fs = buildFileSystem(config, conf);
-
     this.appWorkPath = initAppWorkDir(config, appWorkDirOptional);
-
     this.config = saveConfigToFile(config);
+    this.clusterName = 
this.config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
 
     initHelixManager();
 
     this.containerMetrics = buildContainerMetrics();
-    TaskFactoryBuilder builder = new TaskFactoryBuilder(this.config);
-    this.taskStateModelFactory = createTaskStateModelFactory(builder.build());
-    this.metrics = builder.getTaskMetrics();
-    this.metricContext = builder.getMetricContext();
 
-    services.addAll(getServices());
-    if (services.isEmpty()) {
+    String builderStr = ConfigUtils.getString(this.config, 
GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER, 
TaskRunnerSuiteBase.Builder.class.getName());
+    TaskRunnerSuiteBase.Builder builder = 
GobblinConstructorUtils.<TaskRunnerSuiteBase.Builder>invokeLongestConstructor(
+          new 
ClassAliasResolver(TaskRunnerSuiteBase.Builder.class).resolveClass(builderStr), 
this.config);
+
+    TaskRunnerSuiteBase suite = builder.setAppWorkPath(this.appWorkPath)
+        .setContainerMetrics(this.containerMetrics)
+        .setFileSystem(this.fs)
+        .setHelixManager(this.helixManager).build();
+
+    this.taskStateModelFactory = 
createTaskStateModelFactory(suite.getTaskFactory());
+    this.metrics = suite.getTaskMetrics();
+    this.metricContext = suite.getMetricContext();
+    this.services.addAll(suite.getServices());
+
+    this.services.addAll(getServices());
+    if (this.services.isEmpty()) {
       this.serviceManager = null;
     } else {
       this.serviceManager = new ServiceManager(services);
@@ -184,38 +191,6 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
         applicationName, helixInstanceName, applicationId, taskRunnerId, 
config, appWorkDirOptional);
   }
 
-  private class TaskFactoryBuilder {
-    private final boolean isRunTaskInSeparateProcessEnabled;
-    private final TaskFactory taskFactory;
-    @Getter
-    private final MetricContext metricContext;
-    @Getter
-    private StandardMetricsBridge.StandardMetrics taskMetrics;
-
-    public TaskFactoryBuilder(Config config) {
-      isRunTaskInSeparateProcessEnabled = 
getIsRunTaskInSeparateProcessEnabled(config);
-      metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
this.getClass());
-      if (isRunTaskInSeparateProcessEnabled) {
-        logger.info("Running a task in a separate process is enabled.");
-        taskFactory = new 
HelixTaskFactory(GobblinTaskRunner.this.containerMetrics, CLUSTER_CONF_PATH, 
config);
-        taskMetrics = new GobblinTaskRunnerMetrics.JvmTaskRunnerMetrics();
-      } else {
-        Properties properties = ConfigUtils.configToProperties(config);
-        TaskExecutor taskExecutor = new TaskExecutor(properties);
-        taskFactory = getInProcessTaskFactory(taskExecutor);
-        taskMetrics = new 
GobblinTaskRunnerMetrics.InProcessTaskRunnerMetrics(taskExecutor, 
metricContext);
-      }
-    }
-
-    public TaskFactory build(){
-       return taskFactory;
-    }
-
-    private Boolean getIsRunTaskInSeparateProcessEnabled(Config config) {
-      return ConfigUtils.getBoolean(config, 
GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS, false);
-    }
-  }
-
   private Path initAppWorkDir(Config config, Optional<Path> 
appWorkDirOptional) {
     return appWorkDirOptional.isPresent() ? appWorkDirOptional.get() : 
GobblinClusterUtils
         .getAppWorkDirPathFromConfig(config, this.fs, this.applicationName, 
this.applicationId);
@@ -227,8 +202,7 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
     logger.info("Using ZooKeeper connection string: " + zkConnectionString);
 
     this.helixManager = HelixManagerFactory.getZKHelixManager(
-        
this.config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY),
-        this.helixInstanceName, InstanceType.PARTICIPANT, zkConnectionString);
+        this.clusterName, this.helixInstanceName, InstanceType.PARTICIPANT, 
zkConnectionString);
   }
 
   private TaskStateModelFactory createTaskStateModelFactory(TaskFactory 
factory) {
@@ -242,35 +216,6 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
     return taskStateModelFactory;
   }
 
-  private TaskFactory getInProcessTaskFactory(TaskExecutor taskExecutor) {
-    Properties properties = ConfigUtils.configToProperties(this.config);
-    URI rootPathUri = PathUtils.getRootPath(this.appWorkPath).toUri();
-    Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(properties)
-        .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY,
-            ConfigValueFactory.fromAnyRef(rootPathUri.toString()));
-
-    TaskStateTracker taskStateTracker = new 
GobblinHelixTaskStateTracker(properties);
-
-    services.add(taskExecutor);
-    services.add(taskStateTracker);
-    services.add(new JMXReportingService(
-        ImmutableMap.of("task.executor", 
taskExecutor.getTaskExecutorQueueMetricSet())));
-
-    TaskFactory taskFactory =
-        new GobblinHelixTaskFactory(this.containerMetrics, taskExecutor, 
taskStateTracker, this.fs,
-            this.appWorkPath, stateStoreJobConfig, this.helixManager);
-    return taskFactory;
-  }
-
-  private Boolean getIsRunTaskInSeparateProcessEnabled() {
-    Boolean enabled = false;
-    if 
(this.config.hasPath(GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS))
 {
-      enabled =
-          
this.config.getBoolean(GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS);
-    }
-    return enabled;
-  }
-
   private Config saveConfigToFile(Config config)
       throws IOException {
     Config newConf = config
@@ -292,6 +237,8 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
 
     connectHelixManager();
 
+    addInstanceTags();
+
     // Start metric reporting
     if (this.containerMetrics.isPresent()) {
       this.containerMetrics.get()
@@ -373,6 +320,18 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
   }
 
   /**
+   * Helix participant cannot pre-configure tags before it connects to ZK. So 
this method can only be invoked after
+   * {@link HelixManager#connect()}. However this will still work because 
tagged jobs won't be sent to a non-tagged instance. Hence
+   * the job with EXAMPLE_INSTANCE_TAG will remain in the ZK until an instance 
with EXAMPLE_INSTANCE_TAG was found.
+   */
+  private void addInstanceTags() {
+    if (this.helixManager.isConnected()) {
+      List<String> tags = ConfigUtils.getStringList(this.config, 
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY);
+      tags.forEach(tag -> 
helixManager.getClusterManagmentTool().addInstanceTag(this.clusterName, 
this.helixInstanceName, tag));
+    }
+  }
+
+  /**
    * Creates and returns a {@link MessageHandlerFactory} for handling of Helix
    * {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}s.
    *

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
new file mode 100644
index 0000000..080adb0
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.helix.HelixManager;
+import org.apache.helix.task.TaskFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Service;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.util.ConfigUtils;
+
+/**
+ * This suite class contains multiple components used by {@link 
GobblinTaskRunner}.
+ * Here is the list of components it contains:
+ * A {@link TaskFactory} : register Helix task state model.
+ * A {@link MetricContext} : create task related metrics.
+ * A {@link StandardMetricsBridge.StandardMetrics} : report task metrics.
+ * A list of {@link Service} : register any runtime services necessary to run 
the tasks.
+ */
+@Slf4j
+public abstract class TaskRunnerSuiteBase {
+  protected TaskFactory taskFactory;
+  protected MetricContext metricContext;
+  protected StandardMetricsBridge.StandardMetrics taskMetrics;
+  protected List<Service> services = Lists.newArrayList();
+
+  protected TaskRunnerSuiteBase(Builder builder) {
+    this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(builder.config), 
this.getClass());
+  }
+
+  protected MetricContext getMetricContext() {
+    return this.metricContext;
+  }
+
+  protected abstract StandardMetricsBridge.StandardMetrics getTaskMetrics();
+
+  protected abstract TaskFactory getTaskFactory();
+
+  protected abstract List<Service> getServices();
+
+  @Getter
+  public static class Builder {
+    private Config config;
+    private HelixManager helixManager;
+    private Optional<ContainerMetrics> containerMetrics;
+    private FileSystem fs;
+    private Path appWorkPath;
+
+    public Builder(Config config) {
+      this.config = config;
+    }
+
+    public Builder setHelixManager(HelixManager manager) {
+      this.helixManager = manager;
+      return this;
+    }
+
+    public Builder setContainerMetrics(Optional<ContainerMetrics> 
containerMetrics) {
+      this.containerMetrics = containerMetrics;
+      return this;
+    }
+
+    public Builder setFileSystem(FileSystem fs) {
+      this.fs = fs;
+      return this;
+    }
+
+    public Builder setAppWorkPath(Path appWorkPath) {
+      this.appWorkPath = appWorkPath;
+      return this;
+    }
+
+    public TaskRunnerSuiteBase build() {
+      if (getIsRunTaskInSeparateProcessEnabled(config)) {
+        return new TaskRunnerSuiteProcessModel(this);
+      } else {
+        return new TaskRunnerSuiteThreadModel(this);
+      }
+    }
+
+    private Boolean getIsRunTaskInSeparateProcessEnabled(Config config) {
+      return ConfigUtils.getBoolean(config, 
GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS, false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java
new file mode 100644
index 0000000..f54223f
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.util.List;
+
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+
+import com.google.common.util.concurrent.Service;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+
+/**
+ * A sub-type of {@link TaskRunnerSuiteBase} suite which runs all tasks in 
separate JVMs.
+ *
+ * Please refer to {@link HelixTaskFactory#createNewTask(TaskCallbackContext)}.
+ */
+@Slf4j
+class TaskRunnerSuiteProcessModel extends TaskRunnerSuiteBase {
+
+  TaskRunnerSuiteProcessModel(TaskRunnerSuiteBase.Builder builder) {
+    super(builder);
+    log.info("Running a task in a separate process is enabled.");
+    taskFactory = new HelixTaskFactory(builder.getContainerMetrics(),
+        GobblinTaskRunner.CLUSTER_CONF_PATH,
+        builder.getConfig());
+    taskMetrics = new GobblinTaskRunnerMetrics.JvmTaskRunnerMetrics();
+  }
+
+  @Override
+  protected StandardMetricsBridge.StandardMetrics getTaskMetrics() {
+    return this.taskMetrics;
+  }
+
+  @Override
+  protected TaskFactory getTaskFactory() {
+    return this.taskFactory;
+  }
+
+  @Override
+  protected List<Service> getServices() {
+    return this.services;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
new file mode 100644
index 0000000..fefa5b6
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.helix.task.TaskFactory;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Service;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.runtime.TaskExecutor;
+import org.apache.gobblin.runtime.TaskStateTracker;
+import org.apache.gobblin.runtime.services.JMXReportingService;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PathUtils;
+
+/**
+ * A sub-type of {@link TaskRunnerSuiteBase} suite which runs all tasks in a 
thread pool.
+ */
+class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase {
+  private final TaskExecutor taskExecutor;
+
+  TaskRunnerSuiteThreadModel(TaskRunnerSuiteBase.Builder builder) {
+    super(builder);
+    this.taskExecutor = new 
TaskExecutor(ConfigUtils.configToProperties(builder.getConfig()));
+    this.taskFactory = getInProcessTaskFactory(taskExecutor, builder);
+    this.taskMetrics = new 
GobblinTaskRunnerMetrics.InProcessTaskRunnerMetrics(taskExecutor, 
metricContext);
+  }
+
+  @Override
+  protected StandardMetricsBridge.StandardMetrics getTaskMetrics() {
+    return this.taskMetrics;
+  }
+
+  @Override
+  protected TaskFactory getTaskFactory() {
+    return this.taskFactory;
+  }
+
+  @Override
+  protected List<Service> getServices() {
+    return this.services;
+  }
+
+  private TaskFactory getInProcessTaskFactory(TaskExecutor taskExecutor, 
Builder builder) {
+    Properties properties = 
ConfigUtils.configToProperties(builder.getConfig());
+    URI rootPathUri = PathUtils.getRootPath(builder.getAppWorkPath()).toUri();
+    Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(properties)
+        .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY,
+            ConfigValueFactory.fromAnyRef(rootPathUri.toString()));
+
+    TaskStateTracker taskStateTracker = new 
GobblinHelixTaskStateTracker(properties);
+
+    services.add(taskExecutor);
+    services.add(taskStateTracker);
+    services.add(new JMXReportingService(
+        ImmutableMap.of("task.executor", 
taskExecutor.getTaskExecutorQueueMetricSet())));
+
+    TaskFactory taskFactory =
+        new GobblinHelixTaskFactory(builder.getContainerMetrics(),
+            taskExecutor,
+            taskStateTracker,
+            builder.getFs(),
+            builder.getAppWorkPath(),
+            stateStoreJobConfig,
+            builder.getHelixManager());
+    return taskFactory;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
index 58a5210..408ebe2 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -15,239 +15,59 @@
  * limitations under the License.
  */
 
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.gobblin.cluster;
 
-import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.curator.test.TestingServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.base.Optional;
-import com.google.common.io.Resources;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import org.apache.gobblin.testing.AssertWithBackoff;
+import org.apache.gobblin.cluster.suite.IntegrationBasicSuite;
+import 
org.apache.gobblin.cluster.suite.IntegrationDedicatedManagerClusterSuite;
+import org.apache.gobblin.cluster.suite.IntegrationJobTagSuite;
+import org.apache.gobblin.cluster.suite.IntegrationSeparateProcessSuite;
 
 
 public class ClusterIntegrationTest {
 
-  public final static Logger _logger = 
LoggerFactory.getLogger(ClusterIntegrationTest.class);
-  public static final String JOB_CONF_NAME = "HelloWorldJob.conf";
-  Config _config;
-  private Path _workPath;
-  private Path _jobConfigPath;
-  private Path _jobOutputBasePath;
-  private URL _jobConfResourceUrl;
-  private TestingServer _testingZKServer;
-  private GobblinTaskRunner _worker;
-  private GobblinClusterManager _manager;
-  private boolean _runTaskInSeparateProcess;
-  private boolean _dedicatedClusterManager = false;
+  private IntegrationBasicSuite suite;
 
   @Test
-  public void simpleJobShouldComplete() throws Exception {
-    runSimpleJobAndVerifyResult();
+  public void testJobShouldComplete() throws Exception {
+    this.suite = new IntegrationBasicSuite();
+    runAndVerify();
   }
 
   @Test
-  public void simpleJobShouldCompleteInTaskIsolationMode()
+  public void testSeparateProcessMode()
       throws Exception {
-    _runTaskInSeparateProcess = true;
-    runSimpleJobAndVerifyResult();
+    this.suite = new IntegrationSeparateProcessSuite();
+    runAndVerify();
   }
 
   @Test
-  public void dedicatedManagerClusterMode()
+  public void testDedicatedManagerCluster()
       throws Exception {
-    _dedicatedClusterManager = true;
-    runSimpleJobAndVerifyResult();
+    this.suite = new IntegrationDedicatedManagerClusterSuite();
+    runAndVerify();
   }
 
-  private void runSimpleJobAndVerifyResult()
+  @Test
+  public void testJobWithTag()
       throws Exception {
-    init();
-    startCluster();
-    waitForAndVerifyOutputFiles();
-    shutdownCluster();
-  }
-
-  private void init() throws Exception {
-    initWorkDir();
-    initZooKeeper();
-    initConfig();
-    initJobConfDir();
-    initJobOutputDir();
-  }
-
-  private void initWorkDir() throws IOException {
-    // Relative to the current directory
-    _workPath = Paths.get("gobblin-integration-test-work-dir");
-    _logger.info("Created a new work directory: " + 
_workPath.toAbsolutePath());
-
-    // Delete the working directory in case the previous test fails to delete 
the directory
-    // e.g. when the test was killed forcefully under a debugger.
-    deleteWorkDir();
-    Files.createDirectory(_workPath);
-  }
-
-  private void initJobConfDir() throws IOException {
-    String jobConfigDir = 
_config.getString(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY);
-    _jobConfigPath = Paths.get(jobConfigDir);
-    Files.createDirectories(_jobConfigPath);
-    _jobConfResourceUrl = Resources.getResource(JOB_CONF_NAME);
-    copyJobConfFromResource();
-  }
-
-  private void initJobOutputDir() throws IOException {
-    _jobOutputBasePath = Paths.get(_workPath + "/job-output");
-    Files.createDirectory(_jobOutputBasePath);
-  }
-
-  private void copyJobConfFromResource() throws IOException {
-    try (InputStream resourceStream = _jobConfResourceUrl.openStream()) {
-      File targetFile = new File(_jobConfigPath + "/" + JOB_CONF_NAME);
-      FileUtils.copyInputStreamToFile(resourceStream, targetFile);
-    }
+    this.suite = new IntegrationJobTagSuite();
+    runAndVerify();
   }
 
-  private void initZooKeeper() throws Exception {
-    _testingZKServer = new TestingServer(false);
-    _logger.info(
-        "Created testing ZK Server. Connection string : " + 
_testingZKServer.getConnectString());
-  }
-
-  private void initConfig() {
-    Config configFromResource = getConfigFromResource();
-    Config configOverride = getConfigOverride();
-    _config = configOverride.withFallback(configFromResource).resolve();
-  }
-
-  private Config getConfigOverride() {
-    Map<String, String> configMap = new HashMap<>();
-    String zkConnectionString = _testingZKServer.getConnectString();
-    configMap.put(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY, 
zkConnectionString);
-    configMap.put(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR, 
_workPath.toString());
-    if (_runTaskInSeparateProcess) {
-      
configMap.put(GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS, 
"true");
-    }
-    if (_dedicatedClusterManager) {
-      
configMap.put(GobblinClusterConfigurationKeys.DEDICATED_MANAGER_CLUSTER_ENABLED,
 "true");
-      configMap.put(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY, 
"ManagerCluster");
-    }
-    Config config = ConfigFactory.parseMap(configMap);
-    return config;
-  }
-
-  private Config getConfigFromResource() {
-    URL url = Resources.getResource("BasicCluster.conf");
-    Config config = ConfigFactory.parseURL(url);
-    return config;
+  private void runAndVerify()
+      throws Exception {
+    suite.startCluster();
+    suite.waitForAndVerifyOutputFiles();
+    suite.shutdownCluster();
   }
 
   @AfterMethod
   public void tearDown() throws IOException {
-    deleteWorkDir();
-  }
-
-  private void deleteWorkDir() throws IOException {
-    if ((_workPath != null) && Files.exists(_workPath)) {
-      FileUtils.deleteDirectory(_workPath.toFile());
-    }
-  }
-
-  private void createHelixCluster() throws Exception {
-    String zkConnectionString = _config
-        .getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
-    String helix_cluster_name = _config
-        .getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
-    HelixUtils.createGobblinHelixCluster(zkConnectionString, 
helix_cluster_name);
-
-    if (_dedicatedClusterManager) {
-      String manager_cluster_name = _config
-          .getString(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY);
-      HelixUtils.createGobblinHelixCluster(zkConnectionString, 
manager_cluster_name);
-    }
-  }
-
-  private void startCluster() throws Exception {
-    _testingZKServer.start();
-    createHelixCluster();
-    startWorker();
-    startManager();
-  }
-
-  private void startWorker() throws Exception {
-    _worker = new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, "Worker",
-        TestHelper.TEST_APPLICATION_ID, "1",
-        _config, Optional.absent());
-
-    // Need to run in another thread since the start call will not return 
until the stop method
-    // is called.
-    Thread workerThread = new Thread(_worker::start);
-    workerThread.start();
-  }
-
-  private void startManager() throws Exception {
-    _manager = new GobblinClusterManager(TestHelper.TEST_APPLICATION_NAME,
-        TestHelper.TEST_APPLICATION_ID,
-        _config, Optional.absent());
-
-    _manager.start();
-  }
-
-  private void shutdownCluster() throws InterruptedException, IOException {
-    _worker.stop();
-    _manager.stop();
-    _testingZKServer.close();
-  }
-
-  private void waitForAndVerifyOutputFiles() throws Exception {
-
-    AssertWithBackoff asserter = 
AssertWithBackoff.create().logger(_logger).timeoutMs(60_000)
-        .maxSleepMs(100).backoffFactor(1.5);
-
-    asserter.assertTrue(this::hasExpectedFilesBeenCreated, "Waiting for 
job-completion");
-  }
-
-  private boolean hasExpectedFilesBeenCreated(Void input) {
-    int numOfFiles = getNumOfOutputFiles(_jobOutputBasePath);
-    return numOfFiles == 1;
-  }
-
-  private int getNumOfOutputFiles(Path jobOutputDir) {
-    Collection<File> outputFiles = FileUtils
-        .listFiles(jobOutputDir.toFile(), new String[]{"txt"}, true);
-    return outputFiles.size();
+    this.suite.deleteWorkDir();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobTagTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobTagTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobTagTest.java
new file mode 100644
index 0000000..5f3b2fe
--- /dev/null
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobTagTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+import org.testng.Assert;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.cluster.suite.IntegrationJobTagSuite;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+
+/**
+ * A special {@link TaskRunnerSuiteBase} which can verify if the worker gets 
the correct jobs based on the tag association.
+ */
+@Slf4j
+public class TaskRunnerSuiteForJobTagTest extends TaskRunnerSuiteThreadModel {
+  private TaskFactory jobTagTestFactory;
+  private String instanceName;
+
+  public 
TaskRunnerSuiteForJobTagTest(IntegrationJobTagSuite.JobTagTaskRunnerSuiteBuilder
 builder) {
+    super(builder);
+    this.instanceName = builder.getInstanceName();
+    this.jobTagTestFactory = new JobTagTestFactory(this.taskFactory);
+  }
+
+  @Override
+  protected TaskFactory getTaskFactory() {
+    return this.jobTagTestFactory;
+  }
+
+
+  public class JobTagTestFactory implements TaskFactory {
+    private TaskFactory factory;
+    public JobTagTestFactory(TaskFactory factory) {
+      this.factory = factory;
+    }
+
+    @Override
+    public Task createNewTask(TaskCallbackContext context) {
+      Map<String, String> configMap = context.getTaskConfig().getConfigMap();
+      String jobName = configMap.get(ConfigurationKeys.JOB_NAME_KEY);
+      List<String> allowedJobNames = 
IntegrationJobTagSuite.EXPECTED_JOB_NAMES.get(TaskRunnerSuiteForJobTagTest.this.instanceName);
+      if (allowedJobNames.contains(jobName)) {
+        log.info("{} has job name {}", instanceName, jobName);
+      } else {
+        Assert.fail(instanceName + " should not receive " + jobName);
+      }
+      return this.factory.createNewTask(context);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
new file mode 100644
index 0000000..bab8b30
--- /dev/null
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster.suite;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.assertj.core.util.Lists;
+
+import com.google.common.base.Optional;
+import com.google.common.io.Resources;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.cluster.ClusterIntegrationTest;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.cluster.GobblinTaskRunner;
+import org.apache.gobblin.cluster.HelixUtils;
+import org.apache.gobblin.cluster.TestHelper;
+import org.apache.gobblin.testing.AssertWithBackoff;
+
+/**
+ * A test suite used for {@link ClusterIntegrationTest#testJobShouldComplete()}
+ *
+ * This basic suite class provides utilities to launch one manager and 
multiple workers (participants).
+ * User can override {@link IntegrationBasicSuite#getWorkerConfigs()} for 
worker customization.
+ * User can also override {@link 
IntegrationBasicSuite#waitForAndVerifyOutputFiles()} to check different 
successful condition.
+ */
+@Slf4j
+public class IntegrationBasicSuite {
+  public static final String JOB_CONF_NAME = "HelloWorldJob.conf";
+  public static final String WORKER_INSTANCE_0 = "WorkerInstance_0";
+
+  // manager and workers
+  protected Config managerConfig;
+  protected Collection<Config> workerConfigs = Lists.newArrayList();
+  protected Collection<GobblinTaskRunner> workers = Lists.newArrayList();
+  protected GobblinClusterManager manager;
+
+  protected Path workPath;
+  protected Path jobConfigPath;
+  protected Path jobOutputBasePath;
+  protected URL jobConfResourceUrl;
+  protected TestingServer testingZKServer;
+
+  public IntegrationBasicSuite() {
+    try {
+      initWorkDir();
+      initJobOutputDir();
+      initZooKeeper();
+      initConfig();
+      initJobConfDir();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void initConfig() {
+    this.managerConfig = this.getManagerConfig();
+    this.workerConfigs = this.getWorkerConfigs();
+  }
+
+  private void initZooKeeper() throws Exception {
+    this.testingZKServer = new TestingServer(false);
+    log.info(
+        "Created testing ZK Server. Connection string : " + 
testingZKServer.getConnectString());
+  }
+
+  private void initJobConfDir() throws IOException {
+    String jobConfigDir = 
this.managerConfig.getString(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY);
+    this.jobConfigPath = Paths.get(jobConfigDir);
+    Files.createDirectories(this.jobConfigPath);
+    this.jobConfResourceUrl = Resources.getResource(JOB_CONF_NAME);
+    copyJobConfFromResource();
+  }
+
+  private void initJobOutputDir() throws IOException {
+    this.jobOutputBasePath = Paths.get(this.workPath + "/job-output");
+    Files.createDirectory(this.jobOutputBasePath);
+  }
+
+  private void initWorkDir() throws IOException {
+    // Relative to the current directory
+    this.workPath = Paths.get("gobblin-integration-test-work-dir");
+    log.info("Created a new work directory: " + 
this.workPath.toAbsolutePath());
+
+    // Delete the working directory in case the previous test fails to delete 
the directory
+    // e.g. when the test was killed forcefully under a debugger.
+    deleteWorkDir();
+    Files.createDirectory(this.workPath);
+  }
+
+  public void deleteWorkDir() throws IOException {
+    if ((this.workPath != null) && Files.exists(this.workPath)) {
+      FileUtils.deleteDirectory(this.workPath.toFile());
+    }
+  }
+
+  protected void copyJobConfFromResource() throws IOException {
+    try (InputStream resourceStream = this.jobConfResourceUrl.openStream()) {
+      File targetFile = new File(this.jobConfigPath + "/" + JOB_CONF_NAME);
+      FileUtils.copyInputStreamToFile(resourceStream, targetFile);
+    }
+  }
+
+  private Config getClusterConfig() {
+    URL url = Resources.getResource("BasicCluster.conf");
+    Config config = ConfigFactory.parseURL(url);
+
+    Map<String, String> configMap = new HashMap<>();
+    String zkConnectionString = this.testingZKServer.getConnectString();
+    configMap.put(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY, 
zkConnectionString);
+    configMap.put(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR, 
this.workPath.toString());
+    Config overrideConfig = ConfigFactory.parseMap(configMap);
+
+    return overrideConfig.withFallback(config);
+  }
+
+  protected Config getManagerConfig() {
+    // manager config initialization
+    URL url = Resources.getResource("BasicManager.conf");
+    Config managerConfig = ConfigFactory.parseURL(url);
+    managerConfig = managerConfig.withFallback(getClusterConfig());
+    return managerConfig.resolve();
+  }
+
+  protected Collection<Config> getWorkerConfigs() {
+    // worker config initialization
+    URL url = Resources.getResource("BasicWorker.conf");
+    Config workerConfig = ConfigFactory.parseURL(url);
+    workerConfig = workerConfig.withFallback(getClusterConfig());
+    return Lists.newArrayList(workerConfig.resolve());
+  }
+
+  public void waitForAndVerifyOutputFiles() throws Exception {
+    AssertWithBackoff asserter = 
AssertWithBackoff.create().logger(log).timeoutMs(60_000)
+        .maxSleepMs(100).backoffFactor(1.5);
+
+    asserter.assertTrue(this::hasExpectedFilesBeenCreated, "Waiting for 
job-completion");
+  }
+
+  protected boolean hasExpectedFilesBeenCreated(Void input) {
+    int numOfFiles = getNumOfOutputFiles(this.jobOutputBasePath);
+    return numOfFiles == 1;
+  }
+
+  protected int getNumOfOutputFiles(Path jobOutputDir) {
+    Collection<File> outputFiles = FileUtils
+        .listFiles(jobOutputDir.toFile(), new String[]{"txt"}, true);
+    return outputFiles.size();
+  }
+
+  public void startCluster() throws Exception {
+    this.testingZKServer.start();
+    createHelixCluster();
+    startWorker();
+    startManager();
+  }
+
+  private void startManager() throws Exception {
+    this.manager = new GobblinClusterManager(TestHelper.TEST_APPLICATION_NAME,
+        TestHelper.TEST_APPLICATION_ID,
+        this.managerConfig, Optional.absent());
+
+    this.manager.start();
+  }
+
+  protected void startWorker() throws Exception {
+    this.workers.add(new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, 
WORKER_INSTANCE_0,
+        TestHelper.TEST_APPLICATION_ID, "1",
+        this.workerConfigs.iterator().next(), Optional.absent()));
+
+    // Need to run in another thread since the start call will not return 
until the stop method
+    // is called.
+    Thread workerThread = new Thread(this.workers.iterator().next()::start);
+    workerThread.start();
+  }
+
+  public void shutdownCluster() throws InterruptedException, IOException {
+    workers.forEach(runner->runner.stop());
+    this.manager.stop();
+    this.testingZKServer.close();
+  }
+
+  protected void createHelixCluster() throws Exception {
+    String zkConnectionString = this.managerConfig
+        .getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+    String helix_cluster_name = this.managerConfig
+        .getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+    HelixUtils.createGobblinHelixCluster(zkConnectionString, 
helix_cluster_name);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedManagerClusterSuite.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedManagerClusterSuite.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedManagerClusterSuite.java
new file mode 100644
index 0000000..22e605d
--- /dev/null
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedManagerClusterSuite.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.gobblin.cluster.suite;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.cluster.ClusterIntegrationTest;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.HelixUtils;
+
+/**
+ * A test suite used for {@link 
ClusterIntegrationTest#testDedicatedManagerCluster()}
+ */
+public class IntegrationDedicatedManagerClusterSuite extends 
IntegrationBasicSuite {
+
+  @Override
+  public void createHelixCluster() throws Exception {
+    super.createHelixCluster();
+    String zkConnectionString = managerConfig
+        .getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+    String manager_cluster_name = managerConfig
+        .getString(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY);
+    HelixUtils.createGobblinHelixCluster(zkConnectionString, 
manager_cluster_name);
+  }
+
+  @Override
+  protected Config getManagerConfig() {
+    Map<String, String> configMap = new HashMap<>();
+    
configMap.put(GobblinClusterConfigurationKeys.DEDICATED_MANAGER_CLUSTER_ENABLED,
 "true");
+    configMap.put(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY, 
"ManagerCluster");
+    Config config = ConfigFactory.parseMap(configMap);
+    return config.withFallback(super.getManagerConfig()).resolve();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java
new file mode 100644
index 0000000..adaf702
--- /dev/null
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster.suite;
+
+import java.io.DataOutputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.io.Writer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.collections.Lists;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import com.typesafe.config.ConfigRenderOptions;
+import com.typesafe.config.ConfigSyntax;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.cluster.ClusterIntegrationTest;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinTaskRunner;
+import org.apache.gobblin.cluster.TaskRunnerSuiteBase;
+import org.apache.gobblin.cluster.TaskRunnerSuiteForJobTagTest;
+import org.apache.gobblin.cluster.TestHelper;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.testing.AssertWithBackoff;
+
+/**
+ * A test suite used for {@link ClusterIntegrationTest#testJobWithTag()}
+ *
+ * Each worker instance will have the tags it can accept.
+ * Each job is associated with a specific tag.
+ * Each job will always go to certain workers as expected due to the tag 
association.
+ */
+@Slf4j
+public class IntegrationJobTagSuite extends IntegrationBasicSuite {
+  private static final String WORKER_INSTANCE_NAME_KEY = 
"worker.instance.name";
+  private static final String WORKER_INSTANCE_1 = "WorkerInstance_1";
+  private static final String WORKER_INSTANCE_2 = "WorkerInstance_2";
+  private static final String WORKER_INSTANCE_3 = "WorkerInstance_3";
+
+  private static final Map<String, List<String>> WORKER_TAG_ASSOCIATION = 
ImmutableMap.of(
+      WORKER_INSTANCE_1, ImmutableList.of("T2", "T7", "T8"),
+      WORKER_INSTANCE_2, ImmutableList.of("T4", "T5", "T6"),
+      WORKER_INSTANCE_3, ImmutableList.of("T1", "T3"));
+
+  private static final Map<String, String> JOB_TAG_ASSOCIATION =  
ImmutableMap.<String, String>builder()
+      .put("jobHello_1", "T2")
+      .put("jobHello_2", "T4")
+      .put("jobHello_3", "T5")
+      .put("jobHello_4", "T6")
+      .put("jobHello_5", "T7")
+      .put("jobHello_6", "T8")
+      .put("jobHello_7", "T1")
+      .put("jobHello_8", "T3")
+      .build();
+
+  public static final Map<String, List<String>> EXPECTED_JOB_NAMES = 
ImmutableMap.of(
+      WORKER_INSTANCE_1, ImmutableList.of("jobHello_1", "jobHello_5", 
"jobHello_6"),
+      WORKER_INSTANCE_2, ImmutableList.of("jobHello_2", "jobHello_3", 
"jobHello_4"),
+      WORKER_INSTANCE_3, ImmutableList.of("jobHello_7", "jobHello_8"));
+
+  private Config addInstanceTags(Config workerConfig, String instanceName, 
List<String> tags) {
+    Map<String, String> configMap = new HashMap<>();
+    if (tags!= null && tags.size() > 0) {
+      configMap.put(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, 
Joiner.on(',').join(tags));
+      configMap.put(WORKER_INSTANCE_NAME_KEY, instanceName);
+    }
+    return ConfigFactory.parseMap(configMap).withFallback(workerConfig);
+  }
+
+  @Override
+  public  Collection<Config> getWorkerConfigs() {
+    Config parent = super.getWorkerConfigs().iterator().next();
+    Config worker_1 = addInstanceTags(parent, WORKER_INSTANCE_1, 
WORKER_TAG_ASSOCIATION.get(WORKER_INSTANCE_1));
+    Config worker_2 = addInstanceTags(parent, WORKER_INSTANCE_2, 
WORKER_TAG_ASSOCIATION.get(WORKER_INSTANCE_2));
+    Config worker_3 = addInstanceTags(parent, WORKER_INSTANCE_3, 
WORKER_TAG_ASSOCIATION.get(WORKER_INSTANCE_3));
+    worker_1 = addTaskRunnerSuiteBuilder(worker_1);
+    worker_2 = addTaskRunnerSuiteBuilder(worker_2);
+    worker_3 = addTaskRunnerSuiteBuilder(worker_3);
+    return Lists.newArrayList(worker_1, worker_2, worker_3);
+  }
+
+  private Config addTaskRunnerSuiteBuilder(Config workerConfig) {
+    return 
ConfigFactory.parseMap(ImmutableMap.of(GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER,
 "JobTagTaskRunnerSuiteBuilder")).withFallback(workerConfig);
+  }
+
+  @Override
+  protected void startWorker() throws Exception {
+    // Each workerConfig corresponds to a worker instance
+    for (Config workerConfig: this.workerConfigs) {
+      GobblinTaskRunner runner = new 
GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, 
workerConfig.getString(WORKER_INSTANCE_NAME_KEY),
+          TestHelper.TEST_APPLICATION_ID, "1",
+          workerConfig, Optional.absent());
+      this.workers.add(runner);
+
+      // Need to run in another thread since the start call will not return 
until the stop method
+      // is called.
+      Thread workerThread = new Thread(runner::start);
+      workerThread.start();
+    }
+  }
+
+  /**
+   * Create different jobs with different tags
+   */
+  @Override
+  protected void copyJobConfFromResource() throws IOException {
+    try (InputStream resourceStream = this.jobConfResourceUrl.openStream()) {
+      Reader reader = new InputStreamReader(resourceStream);
+      Config jobConfig = ConfigFactory.parseReader(reader, 
ConfigParseOptions.defaults().setSyntax(ConfigSyntax.CONF));
+      for(Map.Entry<String, String> assoc: JOB_TAG_ASSOCIATION.entrySet()) {
+        generateJobConf(jobConfig,assoc.getKey(),assoc.getValue());
+      }
+    }
+  }
+
+  private void generateJobConf(Config jobConfig, String jobName, String tag) 
throws IOException {
+    Config newConfig = addJobTag(jobConfig, tag);
+    newConfig = getConfigOverride(newConfig, jobName);
+
+    String targetPath = this.jobConfigPath + "/" + jobName + ".conf";
+    String renderedConfig = 
newConfig.root().render(ConfigRenderOptions.defaults());
+    try (DataOutputStream os = new DataOutputStream(new 
FileOutputStream(targetPath));
+        Writer writer = new OutputStreamWriter(os, Charsets.UTF_8)) {
+      writer.write(renderedConfig);
+    }
+  }
+
+  private Config getConfigOverride(Config config, String jobName) {
+    Config newConfig = ConfigFactory.parseMap(ImmutableMap.of(
+        ConfigurationKeys.JOB_NAME_KEY, jobName,
+        ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, this.jobOutputBasePath + 
"/" + jobName))
+        .withFallback(config);
+    return newConfig;
+  }
+
+  private Config addJobTag(Config jobConfig, String jobTag) {
+    return 
ConfigFactory.parseMap(ImmutableMap.of(GobblinClusterConfigurationKeys.HELIX_JOB_TAG_KEY,
 jobTag))
+        .withFallback(jobConfig);
+  }
+
+  @Override
+  public void waitForAndVerifyOutputFiles() throws Exception {
+    AssertWithBackoff asserter = 
AssertWithBackoff.create().logger(log).timeoutMs(60_000)
+        .maxSleepMs(100).backoffFactor(1.5);
+
+    asserter.assertTrue(this::hasExpectedFilesBeenCreated, "Waiting for 
job-completion");
+  }
+
+  @Override
+  protected boolean hasExpectedFilesBeenCreated(Void input) {
+    int numOfFiles = getNumOfOutputFiles(this.jobOutputBasePath);
+    return numOfFiles == JOB_TAG_ASSOCIATION.size();
+  }
+
+  @Alias("JobTagTaskRunnerSuiteBuilder")
+  public static class JobTagTaskRunnerSuiteBuilder extends 
TaskRunnerSuiteBase.Builder {
+    @Getter
+    private String instanceName;
+    public JobTagTaskRunnerSuiteBuilder(Config config) {
+      super(config);
+      this.instanceName = 
config.getString(IntegrationJobTagSuite.WORKER_INSTANCE_NAME_KEY);
+    }
+
+    @Override
+    public TaskRunnerSuiteBase build() {
+      return new TaskRunnerSuiteForJobTagTest(this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationSeparateProcessSuite.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationSeparateProcessSuite.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationSeparateProcessSuite.java
new file mode 100644
index 0000000..9969304
--- /dev/null
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationSeparateProcessSuite.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.gobblin.cluster.suite;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.cluster.ClusterIntegrationTest;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+
+/**
+ * A test suite used for {@link 
ClusterIntegrationTest#testSeparateProcessMode()}
+ */
+public class IntegrationSeparateProcessSuite extends IntegrationBasicSuite {
+
+  @Override
+  protected Collection<Config> getWorkerConfigs() {
+    Map<String, String> configMap = new HashMap<>();
+    
configMap.put(GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS, 
"true");
+    Config config = ConfigFactory.parseMap(configMap);
+    Config parent = super.getWorkerConfigs().iterator().next();
+    return Lists.newArrayList(config.withFallback(parent).resolve());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/test/resources/BasicCluster.conf
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/resources/BasicCluster.conf 
b/gobblin-cluster/src/test/resources/BasicCluster.conf
index 92016f8..4cd2b11 100644
--- a/gobblin-cluster/src/test/resources/BasicCluster.conf
+++ b/gobblin-cluster/src/test/resources/BasicCluster.conf
@@ -20,7 +20,3 @@ gobblin.cluster.helix.cluster.name=BasicGobblinCluster
 gobblin.cluster.workDir=/tmp/gobblinClusterBasicTest/
 gobblin.cluster.job.conf.path=${gobblin.cluster.workDir}/jobs
 gobblin.cluster.standaloneMode=true
-gobblin.cluster.job.executeInSchedulingThread=false
-gobblin.cluster.enableTaskInSeparateProcess=false
-gobblin.cluster.task.jvm.options="-Xms10m -Xmx1g -XX:MinHeapFreeRatio=10 
-XX:MaxHeapFreeRatio=20"
-jobexecutor.threadpool.size=20

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/test/resources/BasicManager.conf
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/resources/BasicManager.conf 
b/gobblin-cluster/src/test/resources/BasicManager.conf
new file mode 100644
index 0000000..52601c8
--- /dev/null
+++ b/gobblin-cluster/src/test/resources/BasicManager.conf
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Cluster / Helix configuration properties
+gobblin.cluster.job.executeInSchedulingThread=false
+gobblin.cluster.enableTaskInSeparateProcess=false
+gobblin.cluster.task.jvm.options="-Xms10m -Xmx1g -XX:MinHeapFreeRatio=10 
-XX:MaxHeapFreeRatio=20"
+jobexecutor.threadpool.size=20

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/test/resources/BasicWorker.conf
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/resources/BasicWorker.conf 
b/gobblin-cluster/src/test/resources/BasicWorker.conf
new file mode 100644
index 0000000..23d18c0
--- /dev/null
+++ b/gobblin-cluster/src/test/resources/BasicWorker.conf
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Cluster / Helix configuration properties
+taskexecutor.threadpool.size=10
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java
index 9485f59..cf53d56 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java
@@ -29,7 +29,7 @@ import lombok.extern.slf4j.Slf4j;
 
 
 /**
- * An object whcih holds all {@link Throwable}s thrown by {@link 
org.apache.gobblin.runtime.fork.Fork}, so that other
+ * An object which holds all {@link Throwable}s thrown by {@link 
org.apache.gobblin.runtime.fork.Fork}, so that other
  * Gobblin components (like {@link Task}) can have access.
  */
 @Slf4j

Reply via email to