Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 11a1c46ab -> d10bae881


[GOBBLIN-649] Add task driver cluster

Add task driver cluster

Fix comments and add more metrics

Closes #2518 from kyuamazon/td3


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d10bae88
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d10bae88
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d10bae88

Branch: refs/heads/master
Commit: d10bae881aead4ba9c91b06048434aefd7583285
Parents: 11a1c46
Author: Kuai Yu <[email protected]>
Authored: Mon Dec 10 17:28:41 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Mon Dec 10 17:28:41 2018 -0800

----------------------------------------------------------------------
 .../GobblinClusterConfigurationKeys.java        |  10 +-
 .../gobblin/cluster/GobblinClusterManager.java  |  10 +-
 ...blinHelixDistributeJobExecutionLauncher.java |  28 +++--
 .../cluster/GobblinHelixJobScheduler.java       |  27 +++--
 .../gobblin/cluster/GobblinHelixJobTask.java    |  18 ++--
 .../cluster/GobblinHelixMultiManager.java       |  62 ++++++++++-
 .../GobblinHelixPlanningJobLauncherMetrics.java |  76 ++++++++++++++
 .../gobblin/cluster/GobblinHelixTask.java       |  36 ++++---
 .../cluster/GobblinHelixTaskFactory.java        |  26 +++--
 .../gobblin/cluster/GobblinTaskRunner.java      |  81 +++++++++++----
 .../cluster/HelixRetriggeringJobCallable.java   |  40 +++++--
 .../gobblin/cluster/TaskRunnerSuiteBase.java    |  12 ++-
 .../cluster/TaskRunnerSuiteThreadModel.java     |  11 +-
 .../gobblin/cluster/ClusterIntegrationTest.java |  11 +-
 .../gobblin/cluster/GobblinHelixTaskTest.java   |  20 +++-
 .../cluster/suite/IntegrationBasicSuite.java    |  70 +++++++++++--
 ...egrationDedicatedTaskDriverClusterSuite.java | 103 +++++++++++++++++++
 .../cluster/suite/IntegrationJobTagSuite.java   |  21 +---
 .../src/test/resources/BasicTaskDriver.conf     |  19 ++++
 19 files changed, 561 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 873793f..dba2a42 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -51,12 +51,18 @@ public class GobblinClusterConfigurationKeys {
   public static final boolean DEFAULT_DISTRIBUTED_JOB_LAUNCHER_ENABLED = false;
   public static final String DISTRIBUTED_JOB_LAUNCHER_BUILDER = 
GOBBLIN_CLUSTER_PREFIX + "distributedJobLauncherBuilder";
 
-
   // Helix configuration properties.
+  public static final String DEDICATED_JOB_CLUSTER_CONTROLLER_ENABLED = 
GOBBLIN_CLUSTER_PREFIX + "dedicatedJobClusterController.enabled";
   public static final String HELIX_CLUSTER_NAME_KEY = GOBBLIN_CLUSTER_PREFIX + 
"helix.cluster.name";
+
   public static final String MANAGER_CLUSTER_NAME_KEY = GOBBLIN_CLUSTER_PREFIX 
+ "manager.cluster.name";
   public static final String DEDICATED_MANAGER_CLUSTER_ENABLED = 
GOBBLIN_CLUSTER_PREFIX + "dedicatedManagerCluster.enabled";
-  public static final String DEDICATED_JOB_CLUSTER_CONTROLLER_ENABLED = 
GOBBLIN_CLUSTER_PREFIX + "dedicatedJobClusterController.enabled";
+
+  public static final String DEDICATED_TASK_DRIVER_CLUSTER_CONTROLLER_ENABLED 
= GOBBLIN_CLUSTER_PREFIX + "dedicatedTaskDriverClusterController.enabled";
+  public static final String TASK_DRIVER_CLUSTER_NAME_KEY = 
GOBBLIN_CLUSTER_PREFIX + "taskDriver.cluster.name";
+  public static final String DEDICATED_TASK_DRIVER_CLUSTER_ENABLED = 
GOBBLIN_CLUSTER_PREFIX + "dedicatedTaskDriverCluster.enabled";
+  public static final String TASK_DRIVER_ENABLED = GOBBLIN_CLUSTER_PREFIX + 
"taskDriver.enabled";
+
   public static final String ZK_CONNECTION_STRING_KEY = GOBBLIN_CLUSTER_PREFIX 
+ "zk.connection.string";
   public static final String WORK_UNIT_FILE_PATH = GOBBLIN_CLUSTER_PREFIX + 
"work.unit.file.path";
   public static final String HELIX_INSTANCE_NAME_OPTION_NAME = 
"helix_instance_name";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 437da37..1722cc3 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -315,8 +315,14 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
   private GobblinHelixJobScheduler buildGobblinHelixJobScheduler(Config 
config, Path appWorkDir,
       List<? extends Tag<?>> metadataTags, SchedulerService schedulerService) 
throws Exception {
     Properties properties = ConfigUtils.configToProperties(config);
-    return new GobblinHelixJobScheduler(properties, 
this.multiManager.getJobClusterHelixManager(), this.eventBus, appWorkDir, 
metadataTags,
-        schedulerService, this.jobCatalog);
+    return new GobblinHelixJobScheduler(properties,
+        this.multiManager.getJobClusterHelixManager(),
+        this.multiManager.getTaskDriverHelixManager(),
+        this.eventBus,
+        appWorkDir,
+        metadataTags,
+        schedulerService,
+        this.jobCatalog);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index 609639e..1d592c4 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -82,11 +82,12 @@ import org.apache.gobblin.util.PropertiesUtils;
 @Slf4j
 class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher, Closeable {
 
-  protected HelixManager helixManager;
+  protected HelixManager planningJobHelixManager;
   protected TaskDriver helixTaskDriver;
   protected Properties sysProps;
   protected Properties jobPlanningProps;
   protected HelixJobsMapping jobsMapping;
+  protected GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics;
 
   protected static final String JOB_PROPS_PREFIX = "gobblin.jobProps.";
 
@@ -104,8 +105,12 @@ class GobblinHelixDistributeJobExecutionLauncher 
implements JobExecutionLauncher
   private DistributeJobMonitor jobMonitor;
 
   public GobblinHelixDistributeJobExecutionLauncher(Builder builder) throws 
Exception {
-    this.helixManager = builder.manager;
-    this.helixTaskDriver = new TaskDriver(this.helixManager);
+    if (builder.taskDriverHelixManager.isPresent()) {
+      this.planningJobHelixManager = builder.taskDriverHelixManager.get();
+    } else {
+      this.planningJobHelixManager = builder.jobHelixManager;
+    }
+    this.helixTaskDriver = new TaskDriver(this.planningJobHelixManager);
     this.sysProps = builder.sysProps;
     this.jobPlanningProps = builder.jobPlanningProps;
     this.jobSubmitted = false;
@@ -147,8 +152,10 @@ class GobblinHelixDistributeJobExecutionLauncher 
implements JobExecutionLauncher
   public static class Builder {
     Properties sysProps;
     Properties jobPlanningProps;
-    HelixManager manager;
+    HelixManager jobHelixManager;
+    Optional<HelixManager> taskDriverHelixManager;
     Path appWorkDir;
+    GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics;
     public GobblinHelixDistributeJobExecutionLauncher build() throws Exception 
{
       return new GobblinHelixDistributeJobExecutionLauncher(this);
     }
@@ -217,12 +224,12 @@ class GobblinHelixDistributeJobExecutionLauncher 
implements JobExecutionLauncher
    * @param jobConfigBuilder A job config builder which contains a single task.
    */
   private void submitJobToHelix(String jobName, String jobId, 
JobConfig.Builder jobConfigBuilder) throws Exception {
-    TaskDriver taskDriver = new TaskDriver(this.helixManager);
+    TaskDriver taskDriver = new TaskDriver(this.planningJobHelixManager);
     HelixUtils.submitJobToWorkFlow(jobConfigBuilder,
         jobName,
         jobId,
         taskDriver,
-        this.helixManager,
+        this.planningJobHelixManager,
         this.workFlowExpiryTimeSeconds);
     this.jobSubmitted = true;
   }
@@ -243,7 +250,10 @@ class GobblinHelixDistributeJobExecutionLauncher 
implements JobExecutionLauncher
       JobConfig.Builder builder = createJobBuilder(this.jobPlanningProps);
       try {
         submitJobToHelix(planningId, planningId, builder);
-        return waitForJobCompletion(planningId, planningId);
+        long startTime = System.currentTimeMillis();
+        DistributeJobResult rst = waitForJobCompletion(planningId, planningId);
+        
GobblinHelixDistributeJobExecutionLauncher.this.planningJobLauncherMetrics.updateTimeForHelixWait(startTime);
+        return rst;
       } catch (Exception e) {
         log.error(planningId + " is not able to submit.");
         return new DistributeJobResult(false);
@@ -259,14 +269,14 @@ class GobblinHelixDistributeJobExecutionLauncher 
implements JobExecutionLauncher
 
     try {
       HelixUtils.waitJobCompletion(
-          GobblinHelixDistributeJobExecutionLauncher.this.helixManager,
+          
GobblinHelixDistributeJobExecutionLauncher.this.planningJobHelixManager,
           workFlowName,
           jobName,
           timeoutEnabled ? Optional.of(timeoutInSeconds) : Optional.empty());
       return getResultFromUserContent();
     } catch (TimeoutException te) {
       HelixUtils.handleJobTimeout(workFlowName, jobName,
-          helixManager, this, null);
+          planningJobHelixManager, this, null);
       return new DistributeJobResult(false);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 2e488c5..134d382 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -21,6 +21,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.List;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -77,7 +78,8 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinHelixJobScheduler.class);
 
   private final Properties properties;
-  private final HelixManager helixManager;
+  private final HelixManager jobHelixManager;
+  private final Optional<HelixManager> taskDriverHelixManager;
   private final EventBus eventBus;
   private final Path appWorkDir;
   private final List<? extends Tag<?>> metadataTags;
@@ -87,11 +89,13 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
 
   final GobblinHelixJobSchedulerMetrics jobSchedulerMetrics;
   final GobblinHelixJobLauncherMetrics launcherMetrics;
+  final GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics;
 
   private boolean startServicesCompleted;
 
   public GobblinHelixJobScheduler(Properties properties,
-                                  HelixManager helixManager,
+                                  HelixManager jobHelixManager,
+                                  Optional<HelixManager> 
taskDriverHelixManager,
                                   EventBus eventBus,
                                   Path appWorkDir, List<? extends Tag<?>> 
metadataTags,
                                   SchedulerService schedulerService,
@@ -99,7 +103,8 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
 
     super(properties, schedulerService);
     this.properties = properties;
-    this.helixManager = helixManager;
+    this.jobHelixManager = jobHelixManager;
+    this.taskDriverHelixManager = taskDriverHelixManager;
     this.eventBus = eventBus;
     this.jobRunningMap = new ConcurrentHashMap<>();
     this.appWorkDir = appWorkDir;
@@ -119,12 +124,16 @@ public class GobblinHelixJobScheduler extends 
JobScheduler implements StandardMe
                                                                    
this.metricContext,
                                                                    
metricsWindowSizeInMin);
 
+    this.planningJobLauncherMetrics = new 
GobblinHelixPlanningJobLauncherMetrics("planningLauncherInScheduler",
+                                                                          
this.metricContext,
+                                                                          
metricsWindowSizeInMin);
+
     this.startServicesCompleted = false;
   }
 
   @Override
   public Collection<StandardMetrics> getStandardMetricsCollection() {
-    return ImmutableList.of(this.launcherMetrics, this.jobSchedulerMetrics);
+    return ImmutableList.of(this.launcherMetrics, this.jobSchedulerMetrics, 
this.planningJobLauncherMetrics);
   }
 
   @Override
@@ -162,8 +171,10 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
         this.properties,
         jobProps,
         jobListener,
+        this.planningJobLauncherMetrics,
         this.appWorkDir,
-        this.helixManager).call();
+        this.jobHelixManager,
+        this.taskDriverHelixManager).call();
   }
 
   @Override
@@ -173,7 +184,7 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
     combinedProps.putAll(properties);
     combinedProps.putAll(jobProps);
 
-    return new GobblinHelixJobLauncher(combinedProps, this.helixManager, 
this.appWorkDir, this.metadataTags, this.jobRunningMap);
+    return new GobblinHelixJobLauncher(combinedProps, this.jobHelixManager, 
this.appWorkDir, this.metadataTags, this.jobRunningMap);
   }
 
   public Future<?> scheduleJobImmediately(Properties jobProps, JobListener 
jobListener) {
@@ -181,8 +192,10 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
         this.properties,
         jobProps,
         jobListener,
+        this.planningJobLauncherMetrics,
         this.appWorkDir,
-        this.helixManager);
+        this.jobHelixManager,
+        this.taskDriverHelixManager);
 
     final Future<?> future = this.jobExecutor.submit(retriggeringJob);
     return new Future() {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
index 0266413..9447b94 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
@@ -60,8 +60,10 @@ class GobblinHelixJobTask implements Task {
   private final Config sysConfig;
   private final Properties jobPlusSysConfig;
   private final HelixJobsMapping jobsMapping;
+  private final String applicationName;
+  private final String instanceName;
   private final String planningJobId;
-  private final HelixManager helixManager;
+  private final HelixManager jobHelixManager;
   private final Path appWorkDir;
   private final List<? extends Tag<?>> metadataTags;
   private GobblinHelixJobLauncher launcher;
@@ -73,10 +75,12 @@ class GobblinHelixJobTask implements Task {
                               TaskRunnerSuiteBase.Builder builder,
                               GobblinHelixJobLauncherMetrics launcherMetrics,
                               GobblinHelixJobTaskMetrics jobTaskMetrics) {
+    this.applicationName = builder.getApplicationName();
+    this.instanceName = builder.getInstanceName();
     this.jobTaskMetrics = jobTaskMetrics;
     this.taskConfig = context.getTaskConfig();
     this.sysConfig = builder.getConfig();
-    this.helixManager = builder.getHelixManager();
+    this.jobHelixManager = builder.getJobHelixManager();
     this.jobPlusSysConfig = ConfigUtils.configToProperties(sysConfig);
     this.jobLauncherListener = new 
GobblinHelixJobLauncherListener(launcherMetrics);
 
@@ -124,7 +128,7 @@ class GobblinHelixJobTask implements Task {
   private GobblinHelixJobLauncher createJobLauncher()
       throws Exception {
     return new GobblinHelixJobLauncher(jobPlusSysConfig,
-        this.helixManager,
+        this.jobHelixManager,
         this.appWorkDir,
         this.metadataTags,
         new ConcurrentHashMap<>());
@@ -135,7 +139,7 @@ class GobblinHelixJobTask implements Task {
    */
   @Override
   public TaskResult run() {
-    log.info("Running planning job {}", this.planningJobId);
+    log.info("Running planning job {} [{} {}]", this.planningJobId, 
this.applicationName, this.instanceName);
     
this.jobTaskMetrics.updateTimeBetweenJobSubmissionAndExecution(this.jobPlusSysConfig);
 
     try (Closer closer = Closer.create()) {
@@ -156,12 +160,12 @@ class GobblinHelixJobTask implements Task {
         Optional<String> actualJobIdFromStateStore = 
this.jobsMapping.getActualJobId(jobName);
         if (actualJobIdFromStateStore.isPresent()) {
           String previousActualJobId = actualJobIdFromStateStore.get();
-          if (HelixUtils.isJobFinished(previousActualJobId, 
previousActualJobId, this.helixManager)) {
+          if (HelixUtils.isJobFinished(previousActualJobId, 
previousActualJobId, this.jobHelixManager)) {
             log.info("Previous actual job {} [plan: {}] finished, will launch 
a new job.", previousActualJobId, this.planningJobId);
           } else {
             log.info("Previous actual job {} [plan: {}] not finished, kill it 
now.", previousActualJobId, this.planningJobId);
             try {
-              HelixUtils.deleteWorkflow(previousActualJobId, 
this.helixManager, timeOut);
+              HelixUtils.deleteWorkflow(previousActualJobId, 
this.jobHelixManager, timeOut);
             } catch (HelixException e) {
               log.error("Helix cannot delete previous actual job id {} within 
5 min.", previousActualJobId);
               return new TaskResult(TaskResult.Status.FAILED, 
ExceptionUtils.getFullStackTrace(e));
@@ -184,9 +188,11 @@ class GobblinHelixJobTask implements Task {
         }
       }
     } catch (Exception e) {
+      log.info("Failing planning job {}", this.planningJobId);
       return new TaskResult(TaskResult.Status.FAILED, "Exception occurred for 
job " + planningJobId + ":" + ExceptionUtils
           .getFullStackTrace(e));
     }
+    log.info("Completing planning job {}", this.planningJobId);
     return new TaskResult(TaskResult.Status.COMPLETED, "");
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
index a7c5e2e..81ed7ce 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
@@ -88,6 +88,13 @@ public class GobblinHelixMultiManager implements 
StandardMetricsBridge {
   private HelixManager jobClusterHelixManager = null;
 
   /**
+   * Helix manager to handle planning job distribution.
+   * Corresponds to cluster with key name {@link 
GobblinClusterConfigurationKeys#HELIX_CLUSTER_NAME_KEY}.
+   */
+  @Getter
+  private Optional<HelixManager> taskDriverHelixManager = Optional.empty();
+
+  /**
    * Helix controller for job distribution. Effective only iff below two 
conditions are established:
    * 1. In {@link GobblinHelixMultiManager#dedicatedManagerCluster} mode.
    * 2. {@link GobblinHelixMultiManager#dedicatedJobClusterController} is 
turned on.
@@ -96,16 +103,31 @@ public class GobblinHelixMultiManager implements 
StandardMetricsBridge {
   private Optional<HelixManager> jobClusterController = Optional.empty();
 
   /**
+   * Helix controller for planning job distribution. Effective only iff below 
two conditions are established:
+   * 1. In {@link GobblinHelixMultiManager#dedicatedManagerCluster} mode.
+   * 2. {@link GobblinHelixMultiManager#dedicatedTaskDriverCluster} is turned 
on.
+   * Typically used for unit test and local deployment.
+   */
+  private Optional<HelixManager> taskDriverClusterController = 
Optional.empty();
+
+  /**
    * Separate manager cluster and job distribution cluster iff this flag is 
turned on. Otherwise {@link GobblinHelixMultiManager#jobClusterHelixManager}
    * is same as {@link GobblinHelixMultiManager#managerClusterHelixManager}.
    */
   private boolean dedicatedManagerCluster = false;
 
+  private boolean dedicatedTaskDriverCluster = false;
+
   /**
    * Create a dedicated controller for job distribution.
    */
   private boolean dedicatedJobClusterController = true;
 
+  /**
+   * Create a dedicated controller for planning job distribution.
+   */
+  private boolean dedicatedTaskDriverClusterController = true;
+
   @Getter
   boolean isLeader = false;
   boolean isStandaloneMode = false;
@@ -131,6 +153,8 @@ public class GobblinHelixMultiManager implements 
StandardMetricsBridge {
     this.metrics = new HelixManagerMetrics(this.metricContext, this.config);
     this.dedicatedManagerCluster = ConfigUtils.getBoolean(config,
        
GobblinClusterConfigurationKeys.DEDICATED_MANAGER_CLUSTER_ENABLED,false);
+    this.dedicatedTaskDriverCluster = ConfigUtils.getBoolean(config,
+        GobblinClusterConfigurationKeys.DEDICATED_TASK_DRIVER_CLUSTER_ENABLED, 
false);
     this.userDefinedMessageHandlerFactory = 
messageHandlerFactoryFunction.apply(null);
     initialize();
   }
@@ -180,6 +204,26 @@ public class GobblinHelixMultiManager implements 
StandardMetricsBridge {
             .buildHelixManager(this.config, zkConnectionString, 
GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY,
                 InstanceType.CONTROLLER));
       }
+
+      // This will creat a dedicated controller for planning job distribution
+      if (this.dedicatedTaskDriverCluster) {
+        // This will create a Helix administrator to dispatch jobs to ZooKeeper
+        this.taskDriverHelixManager = 
Optional.of(buildHelixManager(this.config,
+            zkConnectionString,
+            GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY,
+            InstanceType.ADMINISTRATOR));
+
+        this.dedicatedTaskDriverClusterController = ConfigUtils.getBoolean(
+            this.config,
+            
GobblinClusterConfigurationKeys.DEDICATED_TASK_DRIVER_CLUSTER_CONTROLLER_ENABLED,
+            true);
+
+        if (this.dedicatedTaskDriverClusterController) {
+          this.taskDriverClusterController = 
Optional.of(GobblinHelixMultiManager
+              .buildHelixManager(this.config, zkConnectionString, 
GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY,
+                  InstanceType.CONTROLLER));
+        }
+      }
     } else {
       log.info("We will use same cluster to manage GobblinClusterManager and 
job distribution.");
       // This will create and register a Helix controller in ZooKeeper
@@ -200,7 +244,15 @@ public class GobblinHelixMultiManager implements 
StandardMetricsBridge {
         if (jobClusterController.isPresent()) {
           this.jobClusterController.get().connect();
         }
+        if (this.dedicatedTaskDriverCluster) {
+          if (taskDriverClusterController.isPresent()) {
+            this.taskDriverClusterController.get().connect();
+          }
+        }
         this.jobClusterHelixManager.connect();
+        if (this.taskDriverHelixManager.isPresent()) {
+          this.taskDriverHelixManager.get().connect();
+        }
       }
 
       this.jobClusterHelixManager.addLiveInstanceChangeListener(new 
GobblinLiveInstanceChangeListener());
@@ -239,10 +291,17 @@ public class GobblinHelixMultiManager implements 
StandardMetricsBridge {
         this.jobClusterHelixManager.disconnect();
       }
 
+      if (taskDriverHelixManager.isPresent()) {
+        this.taskDriverHelixManager.get().disconnect();
+      }
+
       if (jobClusterController.isPresent() && 
jobClusterController.get().isConnected()) {
         this.jobClusterController.get().disconnect();
       }
 
+      if (taskDriverClusterController.isPresent() && 
taskDriverClusterController.get().isConnected()) {
+        this.taskDriverClusterController.get().disconnect();
+      }
     }
   }
 
@@ -269,7 +328,8 @@ public class GobblinHelixMultiManager implements 
StandardMetricsBridge {
   void handleLeadershipChange(NotificationContext changeContext) {
     this.metrics.clusterLeadershipChange.update(1);
     if (this.managerClusterHelixManager.isLeader()) {
-      // can get multiple notifications on a leadership change, so only start 
the application launcher the first time
+      // can get multiple notifications on a leadership change,
+      // so only start the application launcher the first time
       // the notification is received
       log.info("Leader notification for {} isLeader {} HM.isLeader {}",
           managerClusterHelixManager.getInstanceName(),

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java
new file mode 100644
index 0000000..a2e0991
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.ContextAwareTimer;
+import org.apache.gobblin.metrics.MetricContext;
+
+
+public class GobblinHelixPlanningJobLauncherMetrics extends 
StandardMetricsBridge.StandardMetrics {
+  private final String metricsName;
+
+  public static final String TIMER_FOR_COMPLETED_PLANNING_JOBS = 
"timeForCompletedPlanningJobs";
+  public static final String TIMER_FOR_FAILED_PLANNING_JOBS = 
"timeForFailedPlanningJobs";
+  public static final String TIMER_FOR_HELIX_WAIT = "timeForHelixWait";
+
+  final ContextAwareTimer timeForCompletedPlanningJobs;
+  final ContextAwareTimer timeForFailedPlanningJobs;
+  final ContextAwareTimer timeForHelixWait;
+
+  public GobblinHelixPlanningJobLauncherMetrics(String metricsName,
+      final MetricContext metricContext,
+      int windowSizeInMin) {
+
+    this.metricsName = metricsName;
+
+    this.timeForCompletedPlanningJobs = 
metricContext.contextAwareTimer(TIMER_FOR_COMPLETED_PLANNING_JOBS, 
windowSizeInMin, TimeUnit.MINUTES);
+    this.timeForFailedPlanningJobs = 
metricContext.contextAwareTimer(TIMER_FOR_FAILED_PLANNING_JOBS, 
windowSizeInMin, TimeUnit.MINUTES);
+    this.timeForHelixWait = 
metricContext.contextAwareTimer(TIMER_FOR_HELIX_WAIT, windowSizeInMin, 
TimeUnit.MINUTES);
+
+    this.contextAwareMetrics.add(timeForCompletedPlanningJobs);
+    this.contextAwareMetrics.add(timeForFailedPlanningJobs);
+    this.contextAwareMetrics.add(timeForHelixWait);
+  }
+
+  public void updateTimeForCompletedPlanningJobs(long startTime) {
+    Instrumented.updateTimer(
+        com.google.common.base.Optional.of(this.timeForCompletedPlanningJobs),
+        System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
+  }
+
+  public void updateTimeForFailedPlanningJobs(long startTime) {
+    Instrumented.updateTimer(
+        com.google.common.base.Optional.of(this.timeForFailedPlanningJobs),
+        System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
+  }
+
+  public void updateTimeForHelixWait(long startTime) {
+    Instrumented.updateTimer(
+        com.google.common.base.Optional.of(this.timeForHelixWait),
+        System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public String getName() {
+    return this.metricsName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index a2516eb..bd1d3b2 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -62,6 +62,9 @@ import org.apache.gobblin.util.Id;
 public class GobblinHelixTask implements Task {
 
   private final TaskConfig taskConfig;
+  private final String applicationName;
+  private final String instanceName;
+
   private String jobName;
   private String jobId;
   private String jobKey;
@@ -70,19 +73,27 @@ public class GobblinHelixTask implements Task {
 
   private SingleTask task;
 
-  public GobblinHelixTask(TaskCallbackContext taskCallbackContext, FileSystem 
fs, Path appWorkDir,
-      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores)
-      throws IOException {
+  public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder,
+                          TaskCallbackContext taskCallbackContext,
+                          TaskAttemptBuilder taskAttemptBuilder,
+                          StateStores stateStores)  throws IOException {
 
     this.taskConfig = taskCallbackContext.getTaskConfig();
+    this.applicationName = builder.getApplicationName();
+    this.instanceName = builder.getInstanceName();
     getInfoFromTaskConfig();
 
-    Path jobStateFilePath =
-        
GobblinClusterUtils.getJobStateFilePath(stateStores.haveJobStateStore(), 
appWorkDir, this.jobId);
-
-    this.task =
-        new SingleTask(this.jobId, workUnitFilePath, jobStateFilePath, fs, 
taskAttemptBuilder,
-            stateStores);
+    Path jobStateFilePath = GobblinClusterUtils
+        .getJobStateFilePath(stateStores.haveJobStateStore(),
+                             builder.getAppWorkPath(),
+                             this.jobId);
+
+    this.task = new SingleTask(this.jobId,
+                               this.workUnitFilePath,
+                               jobStateFilePath,
+                               builder.getFs(),
+                               taskAttemptBuilder,
+                               stateStores);
   }
 
   private void getInfoFromTaskConfig() {
@@ -97,18 +108,19 @@ public class GobblinHelixTask implements Task {
 
   @Override
   public TaskResult run() {
-    log.info("Actual task {} started.", this.taskId);
+    log.info("Actual task {} started. [{} {}]", this.taskId, 
this.applicationName, this.instanceName);
     try (Closer closer = Closer.create()) {
       closer.register(MDC.putCloseable(ConfigurationKeys.JOB_NAME_KEY, 
this.jobName));
       closer.register(MDC.putCloseable(ConfigurationKeys.JOB_KEY_KEY, 
this.jobKey));
       this.task.run();
-      log.info("Actual task {} finished.", this.taskId);
+      log.info("Actual task {} completed.", this.taskId);
       return new TaskResult(TaskResult.Status.COMPLETED, "");
     } catch (InterruptedException ie) {
       Thread.currentThread().interrupt();
+      log.error("Actual task {} interrupted.", this.taskId);
       return new TaskResult(TaskResult.Status.CANCELED, "");
     } catch (Throwable t) {
-      log.error("GobblinHelixTask " + taskId + " failed due to " + 
t.getMessage(), t);
+      log.error("Actual task {} failed due to {}", this.taskId, 
t.getMessage());
       return new TaskResult(TaskResult.Status.FAILED, 
Throwables.getStackTraceAsString(t));
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
index 2fa845c..14d22b4 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.cluster;
 
 import java.io.IOException;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
 import org.apache.helix.task.Task;
@@ -53,6 +52,7 @@ public class GobblinHelixTaskFactory implements TaskFactory {
 
   private final Optional<ContainerMetrics> containerMetrics;
   private final HelixManager helixManager;
+  private TaskRunnerSuiteBase.Builder builder;
 
   /**
    * A {@link Counter} to count the number of new {@link GobblinHelixTask}s 
that are created.
@@ -60,15 +60,17 @@ public class GobblinHelixTaskFactory implements TaskFactory 
{
   private final Optional<Counter> newTasksCounter;
   private final TaskExecutor taskExecutor;
   private final TaskStateTracker taskStateTracker;
-  private final FileSystem fs;
   private final Path appWorkDir;
   private final StateStores stateStores;
   private final TaskAttemptBuilder taskAttemptBuilder;
 
-  public GobblinHelixTaskFactory(Optional<ContainerMetrics> containerMetrics, 
TaskExecutor taskExecutor,
-      TaskStateTracker taskStateTracker, FileSystem fs, Path appWorkDir, 
Config config, HelixManager helixManager) {
-    this.containerMetrics = containerMetrics;
-    this.helixManager = helixManager;
+  public GobblinHelixTaskFactory(TaskRunnerSuiteBase.Builder builder,
+                                 TaskExecutor taskExecutor,
+                                 TaskStateTracker taskStateTracker,
+                                 Config stateStoreConfig) {
+    this.builder = builder;
+    this.containerMetrics = builder.getContainerMetrics();
+    this.helixManager = builder.getJobHelixManager();
     if (this.containerMetrics.isPresent()) {
       this.newTasksCounter = 
Optional.of(this.containerMetrics.get().getCounter(GOBBLIN_CLUSTER_NEW_HELIX_TASK_COUNTER));
     } else {
@@ -76,10 +78,12 @@ public class GobblinHelixTaskFactory implements TaskFactory 
{
     }
     this.taskExecutor = taskExecutor;
     this.taskStateTracker = taskStateTracker;
-    this.fs = fs;
-    this.appWorkDir = appWorkDir;
-    this.stateStores = new StateStores(config, appWorkDir, 
GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME,
-        appWorkDir, GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME, 
appWorkDir,
+
+    this.appWorkDir = builder.getAppWorkPath();
+    this.stateStores = new StateStores(stateStoreConfig,
+        appWorkDir, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME,
+        appWorkDir, GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME,
+        appWorkDir,
         GobblinClusterConfigurationKeys.JOB_STATE_DIR_NAME);
     this.taskAttemptBuilder = createTaskAttemptBuilder();
   }
@@ -98,7 +102,7 @@ public class GobblinHelixTaskFactory implements TaskFactory {
       if (this.newTasksCounter.isPresent()) {
         this.newTasksCounter.get().inc();
       }
-      return new GobblinHelixTask(context, this.fs, this.appWorkDir, 
this.taskAttemptBuilder, this.stateStores);
+      return new GobblinHelixTask(builder, context, this.taskAttemptBuilder, 
this.stateStores);
     } catch (IOException ioe) {
       LOGGER.error("Failed to create a new GobblinHelixTask", ioe);
       throw Throwables.propagate(ioe);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 63e7fd2..9f353ed 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -121,7 +121,9 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
 
   private final String clusterName;
 
-  private HelixManager helixManager;
+  private HelixManager jobHelixManager;
+
+  private Optional<HelixManager> taskDriverHelixManager = Optional.absent();
 
   private final ServiceManager serviceManager;
 
@@ -144,6 +146,8 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
   private final String applicationName;
   private final String applicationId;
   private final Path appWorkPath;
+  private boolean isTaskDriver;
+  private boolean dedicatedTaskDriverCluster;
 
   private final Collection<StandardMetricsBridge.StandardMetrics> 
metricsCollection;
 
@@ -153,12 +157,13 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
       String taskRunnerId,
       Config config,
       Optional<Path> appWorkDirOptional) throws Exception {
-
+    this.isTaskDriver = ConfigUtils.getBoolean(config, 
GobblinClusterConfigurationKeys.TASK_DRIVER_ENABLED,false);
     this.helixInstanceName = helixInstanceName;
     this.taskRunnerId = taskRunnerId;
     this.applicationName = applicationName;
     this.applicationId = applicationId;
-
+    this.dedicatedTaskDriverCluster = ConfigUtils.getBoolean(config,
+        GobblinClusterConfigurationKeys.DEDICATED_TASK_DRIVER_CLUSTER_ENABLED, 
false);
     Configuration conf = HadoopUtils.newConfiguration();
     this.fs = buildFileSystem(config, conf);
     this.appWorkPath = initAppWorkDir(config, appWorkDirOptional);
@@ -180,9 +185,10 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
     TaskRunnerSuiteBase suite = builder.setAppWorkPath(this.appWorkPath)
         .setContainerMetrics(this.containerMetrics)
         .setFileSystem(this.fs)
-        .setHelixManager(this.helixManager)
+        .setJobHelixManager(this.jobHelixManager)
         .setApplicationId(applicationId)
         .setApplicationName(applicationName)
+        .setInstanceName(helixInstanceName)
         .build();
 
     this.taskStateModelFactory = 
createTaskStateModelFactory(suite.getTaskFactoryMap());
@@ -196,7 +202,8 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
       this.serviceManager = new ServiceManager(services);
     }
 
-    logger.debug("GobblinTaskRunner: applicationName {}, helixInstanceName {}, 
applicationId {}, taskRunnerId {}, config {}, appWorkDir {}",
+    logger.info("GobblinTaskRunner({}): applicationName {}, helixInstanceName 
{}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}",
+        this.isTaskDriver?"taskDriver" : "worker",
         applicationName,
         helixInstanceName,
         applicationId,
@@ -215,14 +222,37 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
         
this.config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
     logger.info("Using ZooKeeper connection string: " + zkConnectionString);
 
-    this.helixManager = HelixManagerFactory.getZKHelixManager(
-        this.clusterName, this.helixInstanceName, InstanceType.PARTICIPANT, 
zkConnectionString);
+    if (this.isTaskDriver && this.dedicatedTaskDriverCluster) {
+      // This will create a Helix manager to receive the planning job
+      this.taskDriverHelixManager = 
Optional.of(HelixManagerFactory.getZKHelixManager(
+          ConfigUtils.getString(this.config, 
GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY, ""),
+          this.helixInstanceName,
+          InstanceType.PARTICIPANT,
+          zkConnectionString));
+      this.jobHelixManager = HelixManagerFactory.getZKHelixManager(
+          this.clusterName,
+          this.helixInstanceName,
+          InstanceType.ADMINISTRATOR,
+          zkConnectionString);
+    } else {
+      this.jobHelixManager = HelixManagerFactory.getZKHelixManager(
+          this.clusterName,
+          this.helixInstanceName,
+          InstanceType.PARTICIPANT,
+          zkConnectionString);
+    }
+  }
+
+  private HelixManager getReceiverManager() {
+    return taskDriverHelixManager.isPresent()?taskDriverHelixManager.get()
+        : this.jobHelixManager;
   }
 
   private TaskStateModelFactory createTaskStateModelFactory(Map<String, 
TaskFactory> taskFactoryMap) {
+    HelixManager receiverManager = getReceiverManager();
     TaskStateModelFactory taskStateModelFactory =
-        new TaskStateModelFactory(this.helixManager, taskFactoryMap);
-    this.helixManager.getStateMachineEngine()
+        new TaskStateModelFactory(receiverManager, taskFactoryMap);
+    receiverManager.getStateMachineEngine()
         .registerStateModelFactory("Task", taskStateModelFactory);
     return taskStateModelFactory;
   }
@@ -316,13 +346,16 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
   @VisibleForTesting
   void connectHelixManager() {
     try {
-      this.helixManager.connect();
-      this.helixManager.getMessagingService()
+      this.jobHelixManager.connect();
+      this.jobHelixManager.getMessagingService()
           
.registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
               new ParticipantShutdownMessageHandlerFactory());
-      this.helixManager.getMessagingService()
+      this.jobHelixManager.getMessagingService()
           
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
               getUserDefinedMessageHandlerFactory());
+      if (this.taskDriverHelixManager.isPresent()) {
+        this.taskDriverHelixManager.get().connect();
+      }
     } catch (Exception e) {
       logger.error("HelixManager failed to connect", e);
       throw Throwables.propagate(e);
@@ -335,12 +368,16 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
    * the job with EXAMPLE_INSTANCE_TAG will remain in the ZK until an instance 
with EXAMPLE_INSTANCE_TAG was found.
    */
   private void addInstanceTags() {
-    if (this.helixManager.isConnected()) {
-      List<String> tags = ConfigUtils.getStringList(this.config, 
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY);
-      logger.info("Adding tags binding " + tags);
-      tags.forEach(tag -> 
helixManager.getClusterManagmentTool().addInstanceTag(this.clusterName, 
this.helixInstanceName, tag));
-      logger.info("Actual tags binding " + 
helixManager.getClusterManagmentTool()
-          .getInstanceConfig(this.clusterName, 
this.helixInstanceName).getTags());
+    List<String> tags = ConfigUtils.getStringList(this.config, 
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY);
+    HelixManager receiverManager = getReceiverManager();
+    if (receiverManager.isConnected()) {
+      if (!tags.isEmpty()) {
+        logger.info("Adding tags binding " + tags);
+        tags.forEach(tag -> receiverManager.getClusterManagmentTool()
+            .addInstanceTag(this.clusterName, this.helixInstanceName, tag));
+        logger.info("Actual tags binding " + 
receiverManager.getClusterManagmentTool()
+            .getInstanceConfig(this.clusterName, 
this.helixInstanceName).getTags());
+      }
     }
   }
 
@@ -356,8 +393,12 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
 
   @VisibleForTesting
   void disconnectHelixManager() {
-    if (this.helixManager.isConnected()) {
-      this.helixManager.disconnect();
+    if (this.jobHelixManager.isConnected()) {
+      this.jobHelixManager.disconnect();
+    }
+
+    if (this.taskDriverHelixManager.isPresent()) {
+      this.taskDriverHelixManager.get().disconnect();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index f9c70f7..4cda2ed 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.cluster;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
@@ -30,6 +31,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.api.JobExecutionMonitor;
@@ -84,8 +86,10 @@ class HelixRetriggeringJobCallable implements Callable {
   private final Properties sysProps;
   private final Properties jobProps;
   private final JobListener jobListener;
+  private final GobblinHelixPlanningJobLauncherMetrics 
planningJobLauncherMetrics;
   private final Path appWorkDir;
-  private final HelixManager helixManager;
+  private final HelixManager jobHelixManager;
+  private final Optional<HelixManager> taskDriverHelixManager;
   protected HelixJobsMapping jobsMapping;
   private GobblinHelixJobLauncher currentJobLauncher = null;
   private JobExecutionMonitor currentJobMonitor = null;
@@ -96,14 +100,18 @@ class HelixRetriggeringJobCallable implements Callable {
       Properties sysProps,
       Properties jobProps,
       JobListener jobListener,
+      GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics,
       Path appWorkDir,
-      HelixManager helixManager) {
+      HelixManager jobHelixManager,
+      Optional<HelixManager> taskDriverHelixManager) {
     this.jobScheduler = jobScheduler;
     this.sysProps = sysProps;
     this.jobProps = jobProps;
     this.jobListener = jobListener;
+    this.planningJobLauncherMetrics = planningJobLauncherMetrics;
     this.appWorkDir = appWorkDir;
-    this.helixManager = helixManager;
+    this.jobHelixManager = jobHelixManager;
+    this.taskDriverHelixManager = taskDriverHelixManager;
     this.isDistributeJobEnabled = isDistributeJobEnabled();
     this.jobsMapping = new 
HelixJobsMapping(ConfigUtils.propertiesToConfig(sysProps),
                                             
PathUtils.getRootPath(appWorkDir).toUri(),
@@ -169,6 +177,7 @@ class HelixRetriggeringJobCallable implements Callable {
    * @see {@link GobblinHelixJobTask#run()} for the task driver logic.
    */
   private void runJobExecutionLauncher() throws JobException {
+    long startTime = 0;
     try {
       String builderStr = 
jobProps.getProperty(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER,
           GobblinHelixDistributeJobExecutionLauncher.Builder.class.getName());
@@ -179,7 +188,10 @@ class HelixRetriggeringJobCallable implements Callable {
 
       if (planningJobIdFromStore.isPresent()) {
         String previousPlanningJobId = planningJobIdFromStore.get();
-        if (HelixUtils.isJobFinished(previousPlanningJobId, 
previousPlanningJobId, this.helixManager)) {
+        HelixManager planningJobManager = 
this.taskDriverHelixManager.isPresent()?
+            this.taskDriverHelixManager.get() : this.jobHelixManager;
+
+        if (HelixUtils.isJobFinished(previousPlanningJobId, 
previousPlanningJobId, planningJobManager)) {
           log.info("Previous planning job {} has reached to the final state. 
Start a new one.", previousPlanningJobId);
         } else {
           log.info("Previous planning job {} has not finished yet. Skip it.", 
previousPlanningJobId);
@@ -205,22 +217,34 @@ class HelixRetriggeringJobCallable implements Callable {
 
       builder.setSysProps(this.sysProps);
       builder.setJobPlanningProps(jobPlanningProps);
-      builder.setManager(this.helixManager);
+      builder.setJobHelixManager(this.jobHelixManager);
+      builder.setTaskDriverHelixManager(this.taskDriverHelixManager);
       builder.setAppWorkDir(this.appWorkDir);
+      builder.setPlanningJobLauncherMetrics(this.planningJobLauncherMetrics);
 
       try (Closer closer = Closer.create()) {
+        log.info("Planning job {} started.", planningId);
         GobblinHelixDistributeJobExecutionLauncher launcher = builder.build();
         closer.register(launcher);
         this.jobsMapping.setPlanningJobId(jobName, planningId);
+        startTime = System.currentTimeMillis();
         this.currentJobMonitor = launcher.launchJob(null);
         this.currentJobMonitor.get();
         this.currentJobMonitor = null;
+        log.info("Planning job {} finished.", planningId);
+        
this.planningJobLauncherMetrics.updateTimeForCompletedPlanningJobs(startTime);
       } catch (Throwable t) {
-        throw new JobException("Failed to launch and run job " + 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), t);
+        if (startTime != 0) {
+          
this.planningJobLauncherMetrics.updateTimeForFailedPlanningJobs(startTime);
+        }
+        throw new JobException("Failed to launch and run planning job " + 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), t);
       }
     } catch (Exception e) {
-      log.error("Failed to run job {}", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
-      throw new JobException("Failed to run job " + 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
+      if (startTime != 0) {
+        
this.planningJobLauncherMetrics.updateTimeForFailedPlanningJobs(startTime);
+      }
+      log.error("Failed to run planning job {}", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
+      throw new JobException("Failed to run planning job " + 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
index 182cd07..1ae8639 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
@@ -85,19 +85,20 @@ public abstract class TaskRunnerSuiteBase {
   @Getter
   public static class Builder {
     private Config config;
-    private HelixManager helixManager;
+    private HelixManager jobHelixManager;
     private Optional<ContainerMetrics> containerMetrics;
     private FileSystem fs;
     private Path appWorkPath;
     private String applicationId;
     private String applicationName;
+    private String instanceName;
 
     public Builder(Config config) {
       this.config = config;
     }
 
-    public Builder setHelixManager(HelixManager manager) {
-      this.helixManager = manager;
+    public Builder setJobHelixManager(HelixManager jobHelixManager) {
+      this.jobHelixManager = jobHelixManager;
       return this;
     }
 
@@ -106,6 +107,11 @@ public abstract class TaskRunnerSuiteBase {
       return this;
     }
 
+    public Builder setInstanceName(String instanceName) {
+      this.instanceName = instanceName;
+      return this;
+    }
+
     public Builder setApplicationId(String applicationId) {
       this.applicationId = applicationId;
       return this;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
index 6080e1f..92d3427 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
@@ -89,12 +89,9 @@ class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase 
{
     services.add(new JMXReportingService(
         ImmutableMap.of("task.executor", 
taskExecutor.getTaskExecutorQueueMetricSet())));
 
-    return new GobblinHelixTaskFactory(builder.getContainerMetrics(),
-            taskExecutor,
-            taskStateTracker,
-            builder.getFs(),
-            builder.getAppWorkPath(),
-            stateStoreJobConfig,
-            builder.getHelixManager());
+    return new GobblinHelixTaskFactory(builder,
+                                       taskExecutor,
+                                       taskStateTracker,
+                                       stateStoreJobConfig);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
index 99f225a..a2f0d7b 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -24,6 +24,7 @@ import org.testng.annotations.Test;
 
 import org.apache.gobblin.cluster.suite.IntegrationBasicSuite;
 import 
org.apache.gobblin.cluster.suite.IntegrationDedicatedManagerClusterSuite;
+import 
org.apache.gobblin.cluster.suite.IntegrationDedicatedTaskDriverClusterSuite;
 import org.apache.gobblin.cluster.suite.IntegrationJobFactorySuite;
 import org.apache.gobblin.cluster.suite.IntegrationJobTagSuite;
 import org.apache.gobblin.cluster.suite.IntegrationSeparateProcessSuite;
@@ -34,7 +35,8 @@ public class ClusterIntegrationTest {
   private IntegrationBasicSuite suite;
 
   @Test
-  public void testJobShouldComplete() throws Exception {
+  public void testJobShouldComplete()
+      throws Exception {
     this.suite = new IntegrationBasicSuite();
     runAndVerify();
   }
@@ -54,6 +56,13 @@ public class ClusterIntegrationTest {
   }
 
   @Test(enabled = false)
+  public void testDedicatedTaskDriverCluster()
+      throws Exception {
+    this.suite = new IntegrationDedicatedTaskDriverClusterSuite();
+    runAndVerify();
+  }
+
+  @Test(enabled = false)
   public void testJobWithTag()
       throws Exception {
     this.suite = new IntegrationJobTagSuite();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
index a104b7d..e91c984 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
@@ -48,8 +48,10 @@ import org.apache.gobblin.runtime.AbstractJobLauncher;
 import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.TaskExecutor;
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.Id;
 import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.apache.gobblin.writer.AvroDataWriterBuilder;
 import org.apache.gobblin.writer.Destination;
 import org.apache.gobblin.writer.WriterOutputFormat;
@@ -140,9 +142,23 @@ public class GobblinHelixTaskTest {
     Mockito.when(taskCallbackContext.getTaskConfig()).thenReturn(taskConfig);
     
Mockito.when(taskCallbackContext.getManager()).thenReturn(this.helixManager);
 
+
+    TaskRunnerSuiteBase.Builder builder = new 
TaskRunnerSuiteBase.Builder(ConfigFactory.empty());
+    builder.setInstanceName("TestInstance")
+        .setApplicationName("TestApplication")
+        .setAppWorkPath(appWorkDir)
+        .setContainerMetrics(Optional.absent())
+        .setFileSystem(localFs)
+        .setJobHelixManager(this.helixManager)
+        .setApplicationId("TestApplication-1")
+        .build();
+
     GobblinHelixTaskFactory gobblinHelixTaskFactory =
-        new GobblinHelixTaskFactory(Optional.<ContainerMetrics>absent(), 
this.taskExecutor, this.taskStateTracker,
-            this.localFs, this.appWorkDir, ConfigFactory.empty(), 
this.helixManager);
+        new GobblinHelixTaskFactory(builder,
+                                    this.taskExecutor,
+                                    this.taskStateTracker,
+                                    ConfigFactory.empty());
+
     this.gobblinHelixTask = (GobblinHelixTask) 
gobblinHelixTaskFactory.createNewTask(taskCallbackContext);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
index 6f758b8..5a4a977 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
@@ -30,6 +30,7 @@ import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -39,6 +40,7 @@ import org.apache.curator.test.TestingServer;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.io.Resources;
@@ -69,11 +71,14 @@ import org.apache.gobblin.testing.AssertWithBackoff;
 public class IntegrationBasicSuite {
   public static final String JOB_CONF_NAME = "HelloWorldJob.conf";
   public static final String WORKER_INSTANCE_0 = "WorkerInstance_0";
+  public static final String TEST_INSTANCE_NAME_KEY = "worker.instance.name";
 
   // manager and workers
   protected Config managerConfig;
+  protected Collection<Config> taskDriverConfigs = Lists.newArrayList();
   protected Collection<Config> workerConfigs = Lists.newArrayList();
   protected Collection<GobblinTaskRunner> workers = Lists.newArrayList();
+  protected Collection<GobblinTaskRunner> taskDrivers = Lists.newArrayList();
   protected GobblinClusterManager manager;
 
   protected Path workPath;
@@ -96,6 +101,7 @@ public class IntegrationBasicSuite {
 
   private void initConfig() {
     this.managerConfig = this.getManagerConfig();
+    this.taskDriverConfigs = this.getTaskDriverConfigs();
     this.workerConfigs = this.getWorkerConfigs();
   }
 
@@ -164,7 +170,7 @@ public class IntegrationBasicSuite {
     }
   }
 
-  private Config getClusterConfig() {
+  protected Config getClusterConfig() {
     URL url = Resources.getResource("BasicCluster.conf");
     Config config = ConfigFactory.parseURL(url);
 
@@ -185,6 +191,10 @@ public class IntegrationBasicSuite {
     return managerConfig.resolve();
   }
 
+  protected Collection<Config> getTaskDriverConfigs() {
+    return new ArrayList<>();
+  }
+
   protected Collection<Config> getWorkerConfigs() {
     // worker config initialization
     URL url = Resources.getResource("BasicWorker.conf");
@@ -193,6 +203,13 @@ public class IntegrationBasicSuite {
     return Lists.newArrayList(workerConfig.resolve());
   }
 
+  protected Config addInstanceName(Config baseConfig, String instanceName) {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(IntegrationBasicSuite.TEST_INSTANCE_NAME_KEY, instanceName);
+    Config instanceConfig = ConfigFactory.parseMap(configMap);
+    return instanceConfig.withFallback(baseConfig).resolve();
+  }
+
   public void waitForAndVerifyOutputFiles() throws Exception {
     AssertWithBackoff asserter = 
AssertWithBackoff.create().logger(log).timeoutMs(120_000)
         .maxSleepMs(100).backoffFactor(1.5);
@@ -215,6 +232,7 @@ public class IntegrationBasicSuite {
     this.testingZKServer.start();
     createHelixCluster();
     startWorker();
+    startTaskDriver();
     startManager();
   }
 
@@ -226,19 +244,51 @@ public class IntegrationBasicSuite {
     this.manager.start();
   }
 
-  protected void startWorker() throws Exception {
-    this.workers.add(new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, 
WORKER_INSTANCE_0,
-        TestHelper.TEST_APPLICATION_ID, "1",
-        this.workerConfigs.iterator().next(), Optional.absent()));
+  private void startTaskDriver() throws Exception {
+    for (Config taskDriverConfig: this.taskDriverConfigs) {
+      GobblinTaskRunner runner = new 
GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME,
+          taskDriverConfig.getString(TEST_INSTANCE_NAME_KEY),
+          TestHelper.TEST_APPLICATION_ID, "1",
+          taskDriverConfig, Optional.absent());
+      this.taskDrivers.add(runner);
+
+      // Need to run in another thread since the start call will not return 
until the stop method
+      // is called.
+      Thread workerThread = new Thread(runner::start);
+      workerThread.start();
+    }
+  }
 
-    // Need to run in another thread since the start call will not return 
until the stop method
-    // is called.
-    Thread workerThread = new Thread(this.workers.iterator().next()::start);
-    workerThread.start();
+  private void startWorker() throws Exception {
+    if (workerConfigs.size() == 1) {
+      this.workers.add(new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, 
WORKER_INSTANCE_0,
+          TestHelper.TEST_APPLICATION_ID, "1",
+          this.workerConfigs.iterator().next(), Optional.absent()));
+
+      // Need to run in another thread since the start call will not return 
until the stop method
+      // is called.
+      Thread workerThread = new Thread(this.workers.iterator().next()::start);
+      workerThread.start();
+    } else {
+      // Each workerConfig corresponds to a worker instance
+      for (Config workerConfig: this.workerConfigs) {
+        GobblinTaskRunner runner = new 
GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME,
+            workerConfig.getString(TEST_INSTANCE_NAME_KEY),
+            TestHelper.TEST_APPLICATION_ID, "1",
+            workerConfig, Optional.absent());
+        this.workers.add(runner);
+
+        // Need to run in another thread since the start call will not return 
until the stop method
+        // is called.
+        Thread workerThread = new Thread(runner::start);
+        workerThread.start();
+      }
+    }
   }
 
   public void shutdownCluster() throws InterruptedException, IOException {
-    workers.forEach(runner->runner.stop());
+    this.workers.forEach(runner->runner.stop());
+    this.taskDrivers.forEach(runner->runner.stop());
     this.manager.stop();
     this.testingZKServer.close();
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java
new file mode 100644
index 0000000..1a14451
--- /dev/null
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster.suite;
+
+import java.net.URL;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Resources;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.cluster.ClusterIntegrationTest;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.HelixUtils;
+
+/**
+ * <p> A test suite used for {@link 
ClusterIntegrationTest#testDedicatedTaskDriverCluster()} ()}
+ *
+ * <p> We will have two separate clusters, one for planning job, one for 
actual job.
+ *
+ * <p> Each planning job is submitted by manager instance and reaches to task 
driver
+ * instance via 'task driver cluster (or planning job cluster)'.
+ *
+ * <p> Each actual job is submitted by task driver instance and reaches to the 
worker
+ * instance via 'job cluster'.
+ */
+public class IntegrationDedicatedTaskDriverClusterSuite extends 
IntegrationBasicSuite {
+
+  @Override
+  public void createHelixCluster() throws Exception {
+    super.createHelixCluster();
+    String zkConnectionString = managerConfig
+        .getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+    String managerClusterName = managerConfig
+        .getString(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY);
+    HelixUtils.createGobblinHelixCluster(zkConnectionString, 
managerClusterName);
+    String taskDriverClusterName = taskDriverConfigs.iterator().next()
+        
.getString(GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY);
+    HelixUtils.createGobblinHelixCluster(zkConnectionString, 
taskDriverClusterName);
+  }
+
+  @Override
+  protected Config getManagerConfig() {
+    Map<String, String> configMap = new HashMap<>();
+    
configMap.put(GobblinClusterConfigurationKeys.DEDICATED_MANAGER_CLUSTER_ENABLED,
 "true");
+    configMap.put(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY, 
"ManagerCluster");
+    Config config = ConfigFactory.parseMap(configMap);
+    return config.withFallback(super.getManagerConfig()).resolve();
+  }
+
+  @Override
+  protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) {
+    Config newConfig = ConfigFactory.parseMap(ImmutableMap.of(
+        GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_ENABLED, 
true))
+        .withFallback(rawJobConfig);
+    return ImmutableMap.of("HelloWorldJob", newConfig);
+  }
+
+  @Override
+  protected Collection<Config> getTaskDriverConfigs() {
+    // task driver config initialization
+    URL url = Resources.getResource("BasicTaskDriver.conf");
+    Config taskDriverConfig = ConfigFactory.parseURL(url);
+    taskDriverConfig = taskDriverConfig.withFallback(getClusterConfig());
+    Config taskDriver1 = addInstanceName(taskDriverConfig, "TaskDriver1");
+    return ImmutableList.of(taskDriver1);
+  }
+
+  @Override
+  protected Config getClusterConfig() {
+    Map<String, String> configMap = new HashMap<>();
+    
configMap.put(GobblinClusterConfigurationKeys.DEDICATED_TASK_DRIVER_CLUSTER_ENABLED,
 "true");
+    
configMap.put(GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY, 
"TaskDriverCluster");
+    Config config = ConfigFactory.parseMap(configMap);
+    return config.withFallback(super.getClusterConfig()).resolve();
+  }
+
+  @Override
+  protected Collection<Config> getWorkerConfigs() {
+    Config baseConfig = super.getWorkerConfigs().iterator().next();
+    Config workerConfig1 = addInstanceName(baseConfig, "Worker1");
+    return ImmutableList.of(workerConfig1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java
index 4424cae..905db6d 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java
@@ -54,7 +54,6 @@ import org.apache.gobblin.testing.AssertWithBackoff;
  */
 @Slf4j
 public class IntegrationJobTagSuite extends IntegrationBasicSuite {
-  private static final String WORKER_INSTANCE_NAME_KEY = 
"worker.instance.name";
   private static final String WORKER_INSTANCE_1 = "WorkerInstance_1";
   private static final String WORKER_INSTANCE_2 = "WorkerInstance_2";
   private static final String WORKER_INSTANCE_3 = "WorkerInstance_3";
@@ -84,7 +83,7 @@ public class IntegrationJobTagSuite extends 
IntegrationBasicSuite {
     Map<String, String> configMap = new HashMap<>();
     if (tags!= null && tags.size() > 0) {
       configMap.put(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, 
Joiner.on(',').join(tags));
-      configMap.put(WORKER_INSTANCE_NAME_KEY, instanceName);
+      configMap.put(IntegrationBasicSuite.TEST_INSTANCE_NAME_KEY, 
instanceName);
     }
     return ConfigFactory.parseMap(configMap).withFallback(workerConfig);
   }
@@ -105,22 +104,6 @@ public class IntegrationJobTagSuite extends 
IntegrationBasicSuite {
     return 
ConfigFactory.parseMap(ImmutableMap.of(GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER,
 "JobTagTaskRunnerSuiteBuilder")).withFallback(workerConfig);
   }
 
-  @Override
-  protected void startWorker() throws Exception {
-    // Each workerConfig corresponds to a worker instance
-    for (Config workerConfig: this.workerConfigs) {
-      GobblinTaskRunner runner = new 
GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, 
workerConfig.getString(WORKER_INSTANCE_NAME_KEY),
-          TestHelper.TEST_APPLICATION_ID, "1",
-          workerConfig, Optional.absent());
-      this.workers.add(runner);
-
-      // Need to run in another thread since the start call will not return 
until the stop method
-      // is called.
-      Thread workerThread = new Thread(runner::start);
-      workerThread.start();
-    }
-  }
-
   /**
    * Create different jobs with different tags
    */
@@ -163,7 +146,7 @@ public class IntegrationJobTagSuite extends 
IntegrationBasicSuite {
     private String instanceName;
     public JobTagTaskRunnerSuiteBuilder(Config config) {
       super(config);
-      this.instanceName = 
config.getString(IntegrationJobTagSuite.WORKER_INSTANCE_NAME_KEY);
+      this.instanceName = 
config.getString(IntegrationJobTagSuite.TEST_INSTANCE_NAME_KEY);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/test/resources/BasicTaskDriver.conf
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/resources/BasicTaskDriver.conf 
b/gobblin-cluster/src/test/resources/BasicTaskDriver.conf
new file mode 100644
index 0000000..84bc5b2
--- /dev/null
+++ b/gobblin-cluster/src/test/resources/BasicTaskDriver.conf
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Cluster / Helix configuration properties
+gobblin.cluster.taskDriver.enabled=true
\ No newline at end of file

Reply via email to