Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 31986790e -> c1e9cf250


[GOBBLIN-490] Allow jobs to be re-distributed to worker nodes and launch there

Closes #2360 from yukuai518/jobDistribute


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/c1e9cf25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/c1e9cf25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/c1e9cf25

Branch: refs/heads/master
Commit: c1e9cf250c3e844dcf649b730fb9a6b464b740a9
Parents: 3198679
Author: Kuai Yu <[email protected]>
Authored: Fri Jun 15 16:07:37 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Fri Jun 15 16:07:37 2018 -0700

----------------------------------------------------------------------
 .../GobblinClusterConfigurationKeys.java        |  10 +
 ...blinHelixDistributeJobExecutionLauncher.java | 312 +++++++++++++++++++
 .../gobblin/cluster/GobblinHelixJobFactory.java |  63 ++++
 .../cluster/GobblinHelixJobLauncher.java        |  79 ++---
 .../cluster/GobblinHelixJobScheduler.java       |  63 +---
 .../gobblin/cluster/GobblinHelixJobTask.java    | 107 +++++++
 .../gobblin/cluster/GobblinTaskRunner.java      |  10 +-
 .../cluster/HelixRetriggeringJobCallable.java   | 150 +++++++++
 .../org/apache/gobblin/cluster/HelixUtils.java  | 109 ++++++-
 .../gobblin/cluster/TaskRunnerSuiteBase.java    |   6 +-
 .../cluster/TaskRunnerSuiteProcessModel.java    |  11 +-
 .../cluster/TaskRunnerSuiteThreadModel.java     |  10 +-
 .../gobblin/cluster/ClusterIntegrationTest.java |   8 +
 .../TaskRunnerSuiteForJobFactoryTest.java       | 113 +++++++
 .../cluster/TaskRunnerSuiteForJobTagTest.java   |   8 +-
 .../cluster/suite/IntegrationBasicSuite.java    |  39 ++-
 .../suite/IntegrationJobFactorySuite.java       |  81 +++++
 .../cluster/suite/IntegrationJobTagSuite.java   |  46 +--
 .../apache/gobblin/util/PropertiesUtils.java    |   4 +
 19 files changed, 1061 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 492edb8..648b5bc 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -47,6 +47,11 @@ public class GobblinClusterConfigurationKeys {
   public static final boolean DEFAULT_STANDALONE_CLUSTER_MODE = false;
   public static final String CLUSTER_WORK_DIR = GOBBLIN_CLUSTER_PREFIX + 
"workDir";
 
+  public static final String DISTRIBUTED_JOB_LAUNCHER_ENABLED = 
GOBBLIN_CLUSTER_PREFIX + "distributedJobLauncherEnabled";
+  public static final boolean DEFAULT_DISTRIBUTED_JOB_LAUNCHER_ENABLED = false;
+  public static final String DISTRIBUTED_JOB_LAUNCHER_BUILDER = 
GOBBLIN_CLUSTER_PREFIX + "distributedJobLauncherBuilder";
+
+
   // Helix configuration properties.
   public static final String HELIX_CLUSTER_NAME_KEY = GOBBLIN_CLUSTER_PREFIX + 
"helix.cluster.name";
   public static final String MANAGER_CLUSTER_NAME_KEY = GOBBLIN_CLUSTER_PREFIX 
+ "manager.cluster.name";
@@ -66,6 +71,11 @@ public class GobblinClusterConfigurationKeys {
   public static final String HELIX_JOB_TAG_KEY = GOBBLIN_CLUSTER_PREFIX + 
"helixJobTag";
   public static final String HELIX_INSTANCE_TAGS_KEY = GOBBLIN_CLUSTER_PREFIX 
+ "helixInstanceTags";
 
+  // Planning job properties
+  public static final String PLANNING_JOB_NAME_PREFIX = "PlanningJob";
+  public static final String PLANNING_CONF_PREFIX = GOBBLIN_CLUSTER_PREFIX + 
"planning.";
+  public static final String PLANNING_ID_KEY = PLANNING_CONF_PREFIX + "idKey";
+
   /**
    * A path pointing to a directory that contains job execution files to be 
executed by Gobblin. This directory can
    * have a nested structure.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
new file mode 100644
index 0000000..5800e8d
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.helix.HelixManager;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import javax.annotation.Nonnull;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.api.ExecutionResult;
+import org.apache.gobblin.runtime.api.JobExecutionLauncher;
+import org.apache.gobblin.runtime.api.JobExecutionMonitor;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.MonitoredObject;
+import org.apache.gobblin.runtime.util.StateStores;
+import org.apache.gobblin.source.extractor.partition.Partitioner;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+/**
+ * To avoid all the task driver logic ({@link GobblinHelixJobLauncher}) runs 
on the same instance (node), this
+ * {@link JobExecutionLauncher} can distribute the original job (called 
planning job) to Helix. Helix will
+ * assign this job to one participant. The participant can parse the original 
job properties and run the task driver.
+ *
+ * <p>
+ *   For job submission, the Helix workflow name will be the original job name 
with prefix
+ *   {@link GobblinClusterConfigurationKeys#PLANNING_JOB_NAME_PREFIX}. The 
Helix job name will be the auto-generated planning
+ *   job ID with prefix {@link 
GobblinClusterConfigurationKeys#PLANNING_ID_KEY}.
+ * </p>
+ *
+ * <p>
+ *   We will associate this job to Helix's {@link 
org.apache.helix.task.TaskFactory}
+ *   by specifying {@link GobblinTaskRunner#GOBBLIN_JOB_FACTORY_NAME} in the 
{@link JobConfig.Builder}.
+ *   This job will only contain a single task, which is the same as planningID.
+ * </p>
+ */
+@Alpha
+@Slf4j
+class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher {
+  protected HelixManager helixManager;
+  protected TaskDriver helixTaskDriver;
+  protected Properties sysProperties;
+  protected Properties jobProperties;
+  protected StateStores stateStores;
+
+  protected static final String PLANNING_WORK_UNIT_DIR_NAME = 
"_plan_workunits";
+  protected static final String PLANNING_TASK_STATE_DIR_NAME = 
"_plan_taskstates";
+  protected static final String PLANNING_JOB_STATE_DIR_NAME = 
"_plan_jobstates";
+
+  protected static final String JOB_PROPS_PREFIX = "gobblin.jobProps.";
+
+  private final long jobQueueDeleteTimeoutSeconds;
+
+  public GobblinHelixDistributeJobExecutionLauncher(Builder builder) throws 
Exception {
+    this.helixManager = builder.manager;
+    this.helixTaskDriver = new TaskDriver(this.helixManager);
+    this.sysProperties = builder.sysProperties;
+    this.jobProperties = builder.jobProperties;
+
+    Config combined = ConfigUtils.propertiesToConfig(jobProperties)
+        .withFallback(ConfigUtils.propertiesToConfig(sysProperties));
+
+    Config stateStoreJobConfig = combined
+        .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+            new URI(builder.appWorkDir.toUri().getScheme(), null, 
builder.appWorkDir.toUri().getHost(),
+                builder.appWorkDir.toUri().getPort(), null, null, 
null).toString()));
+
+    this.stateStores = new StateStores(stateStoreJobConfig,
+        builder.appWorkDir, PLANNING_TASK_STATE_DIR_NAME,
+        builder.appWorkDir, PLANNING_WORK_UNIT_DIR_NAME,
+        builder.appWorkDir, PLANNING_JOB_STATE_DIR_NAME);
+
+    this.jobQueueDeleteTimeoutSeconds = ConfigUtils.getLong(combined,
+        GobblinClusterConfigurationKeys.HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS,
+        
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS);
+  }
+
+  @Setter
+  public static class Builder {
+    Properties sysProperties;
+    Properties jobProperties;
+    HelixManager manager;
+    Path appWorkDir;
+    public GobblinHelixDistributeJobExecutionLauncher build() throws Exception 
{
+      return new GobblinHelixDistributeJobExecutionLauncher(this);
+    }
+  }
+
+  private String getPlanningJobName (Properties jobProps) {
+    String jobName = JobState.getJobNameFromProps(jobProps);
+    return GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX + jobName;
+  }
+
+  protected String getPlanningJobId (Properties jobProps) {
+    if (jobProps.containsKey(GobblinClusterConfigurationKeys.PLANNING_ID_KEY)) 
{
+      return 
jobProps.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY);
+    }
+    String planningId = 
JobLauncherUtils.newJobId(getPlanningJobName(jobProps));
+    jobProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, 
planningId);
+    return planningId;
+  }
+
+  /**
+   * Create a job config builder which has a single task that wraps the 
original jobProps.
+   */
+  private JobConfig.Builder createPlanningJob (Properties jobProps) {
+    // Create a single task for job planning
+    String planningId = getPlanningJobId(jobProps);
+    Map<String, TaskConfig> taskConfigMap = Maps.newHashMap();
+    Map<String, String> rawConfigMap = Maps.newHashMap();
+    for (String key : jobProps.stringPropertyNames()) {
+      rawConfigMap.put(JOB_PROPS_PREFIX + key, (String)jobProps.get(key));
+    }
+    
rawConfigMap.put(GobblinClusterConfigurationKeys.TASK_SUCCESS_OPTIONAL_KEY, 
"true");
+
+    // Create a single Job which only contains a single task
+    taskConfigMap.put(planningId, TaskConfig.Builder.from(rawConfigMap));
+    JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
+
+    jobConfigBuilder.setTimeoutPerTask(PropertiesUtils.getPropAsLong(
+        jobProps,
+        ConfigurationKeys.HELIX_TASK_TIMEOUT_SECONDS,
+        ConfigurationKeys.DEFAULT_HELIX_TASK_TIMEOUT_SECONDS) * 1000);
+
+    jobConfigBuilder.setFailureThreshold(1);
+    
jobConfigBuilder.addTaskConfigMap(taskConfigMap).setCommand(GobblinTaskRunner.GOBBLIN_JOB_FACTORY_NAME);
+
+    return jobConfigBuilder;
+  }
+
+  /**
+   * Submit job to helix so that it can be re-assigned to one of its 
participants.
+   * @param jobName A planning job name which has prefix {@link 
GobblinClusterConfigurationKeys#PLANNING_JOB_NAME_PREFIX}.
+   * @param jobId   A planning job id created by {@link 
GobblinHelixDistributeJobExecutionLauncher#getPlanningJobId}.
+   * @param jobConfigBuilder A job config builder which contains a single task.
+   */
+  private void submitJobToHelix(String jobName, String jobId, 
JobConfig.Builder jobConfigBuilder) throws Exception {
+    TaskDriver taskDriver = new TaskDriver(this.helixManager);
+    HelixUtils.submitJobToQueue(jobConfigBuilder,
+        jobName,
+        jobId,
+        taskDriver,
+        this.helixManager,
+        this.jobQueueDeleteTimeoutSeconds);
+  }
+
+  @Override
+  public DistributeJobMonitor launchJob(JobSpec jobSpec) {
+    return new DistributeJobMonitor(new 
DistributeJobCallable(this.jobProperties));
+  }
+
+  @AllArgsConstructor
+  private class DistributeJobCallable implements Callable<ExecutionResult> {
+    Properties jobProps;
+    @Override
+    public DistributeJobResult call()
+        throws Exception {
+      String planningName = getPlanningJobName(this.jobProps);
+      String planningId = getPlanningJobId(this.jobProps);
+      JobConfig.Builder builder = createPlanningJob(this.jobProps);
+      try {
+        submitJobToHelix(planningName, planningId, builder);
+        return waitForJobCompletion(planningName, planningId);
+      } catch (Exception e) {
+        log.error(planningName + " is not able to submit.");
+        return new DistributeJobResult(Optional.empty(), Optional.of(e));
+      }
+    }
+  }
+
+  private DistributeJobResult waitForJobCompletion(String planningName, String 
planningId) throws InterruptedException {
+    boolean timeoutEnabled = 
Boolean.parseBoolean(this.jobProperties.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
+        ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED));
+    long timeoutInSeconds = 
Long.parseLong(this.jobProperties.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
+        ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS));
+
+    try {
+      HelixUtils.waitJobCompletion(
+          GobblinHelixDistributeJobExecutionLauncher.this.helixManager,
+          planningName,
+          planningId,
+          timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty());
+      return getResultFromUserContent();
+    } catch (TimeoutException te) {
+      HelixUtils.helixTaskDriverWaitToStop(helixManager, helixTaskDriver, 
planningName, 10L);
+      this.helixTaskDriver.delete(planningName);
+      this.helixTaskDriver.resume(planningName);
+      log.info("stopped the queue, deleted the job");
+      return new DistributeJobResult(Optional.empty(), Optional.of(te));
+    }
+  }
+
+  //TODO: change below to Helix UserConentStore
+  @VisibleForTesting
+  protected DistributeJobResult getResultFromUserContent() {
+    String planningId = getPlanningJobId(this.jobProperties);
+    try {
+      TaskState taskState = 
this.stateStores.getTaskStateStore().get(planningId, planningId, planningId);
+      return new DistributeJobResult(Optional.of(taskState.getProperties()), 
Optional.empty());
+    } catch (IOException e) {
+      return new DistributeJobResult(Optional.empty(), Optional.of(e));
+    }
+  }
+
+  @Getter
+  @AllArgsConstructor
+  static class DistributeJobResult implements ExecutionResult {
+    boolean isEarlyStopped = false;
+    Optional<Properties> properties;
+    Optional<Throwable> throwable;
+    public DistributeJobResult(Optional<Properties> properties, 
Optional<Throwable> throwable) {
+      this.properties = properties;
+      this.throwable = throwable;
+      if (properties.isPresent()) {
+        isEarlyStopped = 
PropertiesUtils.getPropAsBoolean(this.properties.get(), 
Partitioner.IS_EARLY_STOPPED, "false");
+      }
+    }
+  }
+
+  static class DistributeJobMonitor extends FutureTask<ExecutionResult> 
implements JobExecutionMonitor {
+    private ExecutorService executor = Executors.newSingleThreadExecutor();
+    public DistributeJobMonitor (Callable<ExecutionResult> c) {
+      super(c);
+      this.executor.execute(this);
+    }
+
+    @Override
+    public MonitoredObject getRunningState() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  @Override
+  public StandardMetrics getMetrics() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Nonnull
+  @Override
+  public MetricContext getMetricContext() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isInstrumentationEnabled() {
+    return false;
+  }
+
+  @Override
+  public List<Tag<?>> generateTags(State state) {
+    return Lists.newArrayList();
+  }
+
+  @Override
+  public void switchMetricContext(List<Tag<?>> tags) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void switchMetricContext(MetricContext context) {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
new file mode 100644
index 0000000..2f7ced2
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.net.URI;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.util.StateStores;
+import org.apache.gobblin.util.PathUtils;
+
+
+/**
+ * An implementation of Helix's {@link TaskFactory} for {@link 
GobblinHelixJobTask}s.
+ */
+@Slf4j
+public class GobblinHelixJobFactory implements TaskFactory {
+  protected Config sysConfig;
+  protected StateStores stateStores;
+
+  public GobblinHelixJobFactory(TaskRunnerSuiteBase.Builder builder) {
+    this.sysConfig = builder.getConfig();
+    Path appWorkDir = builder.getAppWorkPath();
+    URI rootPathUri = PathUtils.getRootPath(appWorkDir).toUri();
+    Config stateStoreJobConfig = sysConfig
+        .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY,
+            ConfigValueFactory.fromAnyRef(rootPathUri.toString()));
+
+    this.stateStores = new StateStores(stateStoreJobConfig,
+        appWorkDir, 
GobblinHelixDistributeJobExecutionLauncher.PLANNING_TASK_STATE_DIR_NAME,
+        appWorkDir, 
GobblinHelixDistributeJobExecutionLauncher.PLANNING_WORK_UNIT_DIR_NAME,
+        appWorkDir, 
GobblinHelixDistributeJobExecutionLauncher.PLANNING_JOB_STATE_DIR_NAME);
+  }
+
+  @Override
+  public Task createNewTask(TaskCallbackContext context) {
+    return new GobblinHelixJobTask(context, this.sysConfig, stateStores);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 6b86c5c..231575e 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -21,9 +21,11 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -31,12 +33,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
-import org.apache.helix.task.TargetState;
 import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskUtil;
-import org.apache.helix.task.WorkflowConfig;
-import org.apache.helix.task.WorkflowContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -279,27 +278,8 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
    * Submit a job to run.
    */
   private void submitJobToHelix(JobConfig.Builder jobConfigBuilder) throws 
Exception {
-    WorkflowConfig workflowConfig = 
this.helixTaskDriver.getWorkflowConfig(this.helixManager, this.helixQueueName);
-
-    // If the queue is present, but in delete state then wait for cleanup 
before recreating the queue
-    if (workflowConfig != null && workflowConfig.getTargetState() == 
TargetState.DELETE) {
-      GobblinHelixTaskDriver gobblinHelixTaskDriver = new 
GobblinHelixTaskDriver(this.helixManager);
-      gobblinHelixTaskDriver.deleteWorkflow(this.helixQueueName, 
this.jobQueueDeleteTimeoutSeconds);
-      // if we get here then the workflow was successfully deleted
-      workflowConfig = null;
-    }
-
-    // Create one queue for each job with the job name being the queue name
-    if (workflowConfig == null) {
-        JobQueue jobQueue = new JobQueue.Builder(this.helixQueueName).build();
-        this.helixTaskDriver.createQueue(jobQueue);
-        LOGGER.info("Created job queue {}", this.helixQueueName);
-    } else {
-      LOGGER.info("Job queue {} already exists", this.helixQueueName);
-    }
-
-    // Put the job into the queue
-    this.helixTaskDriver.enqueueJob(this.jobContext.getJobName(), 
this.jobContext.getJobId(), jobConfigBuilder);
+    HelixUtils.submitJobToQueue(jobConfigBuilder, this.helixQueueName, 
this.jobContext.getJobId(),
+        this.helixTaskDriver, this.helixManager, 
this.jobQueueDeleteTimeoutSeconds);
   }
 
   public void launchJob(@Nullable JobListener jobListener)
@@ -380,45 +360,22 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
         ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED));
     long timeoutInSeconds = 
Long.parseLong(this.jobProps.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
         ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS));
-    long endTime = System.currentTimeMillis() + timeoutInSeconds*1000;
-    while (!timeoutEnabled || System.currentTimeMillis() <= endTime) {
-      WorkflowContext workflowContext = 
TaskDriver.getWorkflowContext(this.helixManager, this.helixQueueName);
-      if (workflowContext != null) {
-        org.apache.helix.task.TaskState helixJobState = 
workflowContext.getJobState(this.jobResourceName);
-        if (helixJobState == org.apache.helix.task.TaskState.COMPLETED ||
-            helixJobState == org.apache.helix.task.TaskState.FAILED ||
-            helixJobState == org.apache.helix.task.TaskState.STOPPED) {
-          return;
-        }
-      }
-      Thread.sleep(1000);
-    }
-    helixTaskDriverWaitToStop(this.helixQueueName, 10L);
-    try {
-      cancelJob(this.jobListener);
-    } catch (JobException e) {
-      throw new RuntimeException("Unable to cancel job " + 
jobContext.getJobName() + ": ", e);
-    }
-    this.helixTaskDriver.resume(this.helixQueueName);
-    LOGGER.info("stopped the queue, deleted the job");
-  }
 
-  /**
-   * Because fix 
https://github.com/apache/helix/commit/ae8e8e2ef37f48d782fc12f85ca97728cf2b70c4
-   * is not available in currently used version 0.6.9
-   */
-  private void helixTaskDriverWaitToStop(String workflow, long 
timeoutInSeconds) throws InterruptedException {
-    this.helixTaskDriver.stop(workflow);
-    long endTime = System.currentTimeMillis() + timeoutInSeconds*1000;
-    while (System.currentTimeMillis() <= endTime) {
-      WorkflowContext workflowContext = 
TaskDriver.getWorkflowContext(this.helixManager, this.helixQueueName);
-      if (workflowContext == null || workflowContext.getWorkflowState()
-          .equals(org.apache.helix.task.TaskState.IN_PROGRESS)) {
-        Thread.sleep(1000);
-      } else {
-        LOGGER.info("Successfully stopped the queue");
-        return;
+    try {
+      HelixUtils.waitJobCompletion(
+          this.helixManager,
+          this.helixQueueName,
+          this.jobContext.getJobId(),
+          timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty());
+    } catch (TimeoutException te) {
+      HelixUtils.helixTaskDriverWaitToStop(helixManager, helixTaskDriver, 
helixQueueName, 10L);
+      try {
+        cancelJob(this.jobListener);
+      } catch (JobException e) {
+        throw new RuntimeException("Unable to cancel job " + 
jobContext.getJobName() + ": ", e);
       }
+      this.helixTaskDriver.resume(this.helixQueueName);
+      LOGGER.info("stopped the queue, deleted the job");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index fac7242..b991406 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -21,7 +21,6 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -41,7 +40,6 @@ import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
 
 import javax.annotation.Nonnull;
-import lombok.Getter;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent;
@@ -56,7 +54,6 @@ import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.JobContext;
 import org.apache.gobblin.runtime.JobException;
-import org.apache.gobblin.runtime.JobLauncher;
 import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.api.JobExecutionLauncher;
 import org.apache.gobblin.runtime.api.MutableJobCatalog;
@@ -65,7 +62,6 @@ import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.scheduler.JobScheduler;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.PropertiesUtils;
 
 
 /**
@@ -277,56 +273,27 @@ public class GobblinHelixJobScheduler extends 
JobScheduler implements StandardMe
 
   @Override
   public void runJob(Properties jobProps, JobListener jobListener) throws 
JobException {
-    new RetriggeringJobCallable(jobProps, jobListener).call();
+    new HelixRetriggeringJobCallable(this, this.properties, jobProps, 
jobListener, this.appWorkDir, this.helixManager).call();
   }
 
   @Override
   public GobblinHelixJobLauncher buildJobLauncher(Properties jobProps)
       throws Exception {
-    return new GobblinHelixJobLauncher(jobProps, this.helixManager, 
this.appWorkDir, this.metadataTags, this.jobRunningMap);
-  }
-
-  private class RetriggeringJobCallable implements Callable {
-    Properties jobProps;
-    JobListener jobListener;
-
-    public RetriggeringJobCallable(Properties jobProps, JobListener 
jobListener) {
-      this.jobProps = jobProps;
-      this.jobListener = jobListener;
-    }
+    Properties combinedProps = new Properties();
+    combinedProps.putAll(properties);
+    combinedProps.putAll(jobProps);
 
-    private boolean isRetriggeringEnabled() {
-      return PropertiesUtils.getPropAsBoolean(jobProps, 
ConfigurationKeys.JOB_RETRIGGERING_ENABLED, 
ConfigurationKeys.DEFAULT_JOB_RETRIGGERING_ENABLED);
-    }
-
-    @Getter
-    JobLauncher currentJobLauncher = null;
-
-    @Override
-    public Void call() throws JobException {
-      try {
-        while (true) {
-          currentJobLauncher = buildJobLauncher(jobProps);
-          boolean isEarlyStopped = runJob(jobProps, jobListener, 
currentJobLauncher);
-          boolean isRetriggerEnabled = this.isRetriggeringEnabled();
-          if (isEarlyStopped && isRetriggerEnabled) {
-            LOGGER.info("Job {} will be re-triggered.", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
-          } else {
-            break;
-          }
-          currentJobLauncher = null;
-        }
-      } catch (Exception e) {
-        LOGGER.error("Failed to run job {}", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
-        throw new JobException("Failed to run job " + 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
-      }
-
-      return null;
-    }
+    return new GobblinHelixJobLauncher(combinedProps, this.helixManager, 
this.appWorkDir, this.metadataTags, this.jobRunningMap);
   }
 
   public Future<?> scheduleJobImmediately(Properties jobProps, JobListener 
jobListener) {
-    RetriggeringJobCallable retriggeringJob = new 
RetriggeringJobCallable(jobProps, jobListener);
+    HelixRetriggeringJobCallable retriggeringJob = new 
HelixRetriggeringJobCallable(this,
+        this.properties,
+        jobProps,
+        jobListener,
+        this.appWorkDir,
+        this.helixManager);
+
     final Future<?> future = this.jobExecutor.submit(retriggeringJob);
     return new Future() {
       @Override
@@ -336,10 +303,7 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
         }
         boolean result = true;
         try {
-          JobLauncher jobLauncher = retriggeringJob.getCurrentJobLauncher();
-          if (jobLauncher != null) {
-            jobLauncher.cancelJob(jobListener);
-          }
+          retriggeringJob.cancel();
         } catch (JobException e) {
           LOGGER.error("Failed to cancel job " + 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
           result = false;
@@ -377,7 +341,6 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
     LOGGER.info("Received new job configuration of job " + 
newJobArrival.getJobName());
     try {
       Properties jobConfig = new Properties();
-      jobConfig.putAll(this.properties);
       jobConfig.putAll(newJobArrival.getJobConfig());
 
       metrics.updateTimeBeforeJobScheduling(jobConfig);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
new file mode 100644
index 0000000..f60852e
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskResult;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.util.StateStores;
+import org.apache.gobblin.source.extractor.partition.Partitioner;
+import org.apache.gobblin.util.ConfigUtils;
+
+/**
+ * An implementation of Helix's {@link org.apache.helix.task.Task} that runs 
original {@link GobblinHelixJobLauncher}
+ */
+@Slf4j
+public class GobblinHelixJobTask implements Task {
+
+  private final TaskConfig taskConfig;
+  private Config sysConfig;
+  private Properties jobConfig;
+  private StateStores stateStores;
+  private String planningJobId;
+
+  public GobblinHelixJobTask(TaskCallbackContext context,
+      Config sysConfig,
+      StateStores stateStores) {
+    this.taskConfig = context.getTaskConfig();
+    this.sysConfig = sysConfig;
+    this.jobConfig = ConfigUtils.configToProperties(sysConfig);
+    Map<String, String> configMap = this.taskConfig.getConfigMap();
+    for (Map.Entry<String, String> entry: configMap.entrySet()) {
+      if 
(entry.getKey().startsWith(GobblinHelixDistributeJobExecutionLauncher.JOB_PROPS_PREFIX))
 {
+          String key = 
entry.getKey().replaceFirst(GobblinHelixDistributeJobExecutionLauncher.JOB_PROPS_PREFIX,
 "");
+          jobConfig.put(key, entry.getValue());
+      }
+    }
+
+    if 
(!jobConfig.containsKey(GobblinClusterConfigurationKeys.PLANNING_ID_KEY)) {
+      throw new RuntimeException("Job doesn't have plannning ID");
+    }
+
+    this.planningJobId = 
jobConfig.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY);
+    this.stateStores = stateStores;
+  }
+
+  @Override
+  public TaskResult run() {
+    log.info("We will run planning job " + this.planningJobId);
+
+    // TODO: We should run GobblinHelixJobLauncher#launchJob() here
+
+    try {
+      setResultToUserContent(ImmutableMap.of(Partitioner.IS_EARLY_STOPPED, 
"false"));
+    } catch (IOException e) {
+      return new TaskResult(TaskResult.Status.FAILED, "State store cannot be 
persisted for job " + planningJobId);
+    }
+    return new TaskResult(TaskResult.Status.COMPLETED, "");
+  }
+
+  //TODO: change below to Helix UserConentStore
+  @VisibleForTesting
+  protected void setResultToUserContent(Map<String, String> keyValues) throws 
IOException {
+    WorkUnitState wus = new WorkUnitState();
+    wus.setProp(ConfigurationKeys.JOB_ID_KEY, this.planningJobId);
+    wus.setProp(ConfigurationKeys.TASK_ID_KEY, this.planningJobId);
+    wus.setProp(ConfigurationKeys.TASK_KEY_KEY, this.planningJobId);
+    keyValues.forEach((key, value)->wus.setProp(key, value));
+    TaskState taskState = new TaskState(wus);
+
+    this.stateStores.getTaskStateStore().put(this.planningJobId, 
this.planningJobId, taskState);
+  }
+
+  @Override
+  public void cancel() {
+    // TODO: We should delete the real job.
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index b6f04f1..fb7136d 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -59,7 +59,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.eventbus.EventBus;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Service;
@@ -117,6 +116,8 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
 
   static final String GOBBLIN_TASK_FACTORY_NAME = "GobblinTaskFactory";
 
+  static final String GOBBLIN_JOB_FACTORY_NAME = "GobblinJobFactory";
+
   private final String helixInstanceName;
 
   private final String clusterName;
@@ -175,7 +176,7 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
         .setFileSystem(this.fs)
         .setHelixManager(this.helixManager).build();
 
-    this.taskStateModelFactory = 
createTaskStateModelFactory(suite.getTaskFactory());
+    this.taskStateModelFactory = 
createTaskStateModelFactory(suite.getTaskFactoryMap());
     this.metrics = suite.getTaskMetrics();
     this.metricContext = suite.getMetricContext();
     this.services.addAll(suite.getServices());
@@ -205,10 +206,7 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
         this.clusterName, this.helixInstanceName, InstanceType.PARTICIPANT, 
zkConnectionString);
   }
 
-  private TaskStateModelFactory createTaskStateModelFactory(TaskFactory 
factory) {
-    Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap();
-
-    taskFactoryMap.put(GOBBLIN_TASK_FACTORY_NAME, factory);
+  private TaskStateModelFactory createTaskStateModelFactory(Map<String, 
TaskFactory> taskFactoryMap) {
     TaskStateModelFactory taskStateModelFactory =
         new TaskStateModelFactory(this.helixManager, taskFactoryMap);
     this.helixManager.getStateMachineEngine()

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
new file mode 100644
index 0000000..ce3619b
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.util.Properties;
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.helix.HelixManager;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.JobException;
+import org.apache.gobblin.runtime.JobLauncher;
+import org.apache.gobblin.runtime.api.ExecutionResult;
+import org.apache.gobblin.runtime.api.JobExecutionMonitor;
+import org.apache.gobblin.runtime.listeners.JobListener;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * A {@link Callable} that runs {@link JobLauncher} multiple times iff 
re-triggering is enabled and job stops early.
+ */
+@Slf4j
+@Alpha
+class HelixRetriggeringJobCallable implements Callable {
+  private GobblinHelixJobScheduler jobScheduler;
+  private Properties sysProps;
+  private Properties jobProps;
+  private JobListener jobListener;
+  private JobLauncher currentJobLauncher = null;
+  private JobExecutionMonitor currentJobMonitor = null;
+  private Path appWorkDir;
+  private HelixManager helixManager;
+
+  public HelixRetriggeringJobCallable(
+      GobblinHelixJobScheduler jobScheduler,
+      Properties sysProps,
+      Properties jobProps,
+      JobListener jobListener,
+      Path appWorkDir,
+      HelixManager helixManager) {
+    this.jobScheduler = jobScheduler;
+    this.sysProps = sysProps;
+    this.jobProps = jobProps;
+    this.jobListener = jobListener;
+    this.appWorkDir = appWorkDir;
+    this.helixManager = helixManager;
+  }
+
+  private boolean isRetriggeringEnabled() {
+    return PropertiesUtils.getPropAsBoolean(jobProps, 
ConfigurationKeys.JOB_RETRIGGERING_ENABLED,
+        ConfigurationKeys.DEFAULT_JOB_RETRIGGERING_ENABLED);
+  }
+
+  private boolean isDistributeJobEnabled() {
+    Properties combinedProps = new Properties();
+    combinedProps.putAll(sysProps);
+    combinedProps.putAll(jobProps);
+    return (PropertiesUtils.getPropAsBoolean(combinedProps,
+        GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_ENABLED,
+        
Boolean.toString(GobblinClusterConfigurationKeys.DEFAULT_DISTRIBUTED_JOB_LAUNCHER_ENABLED)));
+  }
+
+  @Override
+  public Void call() throws JobException {
+    if (isDistributeJobEnabled()) {
+      launchJobExecutionLauncherLoop();
+    } else {
+      launchJobLauncherLoop();
+    }
+
+    return null;
+  }
+
+  private void launchJobLauncherLoop() throws JobException {
+    try {
+      while (true) {
+        currentJobLauncher = this.jobScheduler.buildJobLauncher(jobProps);
+        boolean isEarlyStopped = this.jobScheduler.runJob(jobProps, 
jobListener, currentJobLauncher);
+        boolean isRetriggerEnabled = this.isRetriggeringEnabled();
+        if (isEarlyStopped && isRetriggerEnabled) {
+          log.info("Job {} will be re-triggered.", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+        } else {
+          break;
+        }
+        currentJobLauncher = null;
+      }
+    } catch (Exception e) {
+      log.error("Failed to run job {}", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
+      throw new JobException("Failed to run job " + 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
+    }
+  }
+
+  private void launchJobExecutionLauncherLoop() throws JobException {
+    try {
+      while (true) {
+        String builderStr = 
jobProps.getProperty(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER,
 GobblinHelixDistributeJobExecutionLauncher.Builder.class.getName());
+        GobblinHelixDistributeJobExecutionLauncher.Builder builder = 
GobblinConstructorUtils.<GobblinHelixDistributeJobExecutionLauncher.Builder>invokeLongestConstructor(
+            new 
ClassAliasResolver(GobblinHelixDistributeJobExecutionLauncher.Builder.class).resolveClass(builderStr));
+
+        builder.setSysProperties(this.sysProps);
+        builder.setJobProperties(this.jobProps);
+        builder.setManager(this.helixManager);
+        builder.setAppWorkDir(this.appWorkDir);
+
+        this.currentJobMonitor = builder.build().launchJob(null);
+        ExecutionResult result = this.currentJobMonitor.get();
+        boolean isEarlyStopped = 
((GobblinHelixDistributeJobExecutionLauncher.DistributeJobResult) 
result).isEarlyStopped();
+        boolean isRetriggerEnabled = this.isRetriggeringEnabled();
+        if (isEarlyStopped && isRetriggerEnabled) {
+          log.info("DistributeJob {} will be re-triggered.", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+        } else {
+          break;
+        }
+        currentJobMonitor = null;
+      }
+    } catch (Exception e) {
+      log.error("Failed to run job {}", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
+      throw new JobException("Failed to run job " + 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
+    }
+  }
+
+  public void cancel() throws JobException {
+    if (currentJobLauncher != null) {
+      currentJobLauncher.cancelJob(this.jobListener);
+    } else if (currentJobMonitor != null) {
+      currentJobMonitor.cancel(false);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 8be0621..0c5dbec 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -17,12 +17,24 @@
 
 package org.apache.gobblin.cluster;
 
+import java.util.Optional;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.helix.HelixManager;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TargetState;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
 import org.apache.helix.tools.ClusterSetup;
 
-import org.apache.gobblin.annotation.Alpha;
+import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.annotation.Alpha;
 
 /**
  * A utility class for working with Gobblin on Helix.
@@ -30,6 +42,7 @@ import org.apache.gobblin.annotation.Alpha;
  * @author Yinan Li
  */
 @Alpha
+@Slf4j
 public class HelixUtils {
 
   /**
@@ -38,7 +51,9 @@ public class HelixUtils {
    * @param zkConnectionString the ZooKeeper connection string
    * @param clusterName the Helix cluster name
    */
-  public static void createGobblinHelixCluster(String zkConnectionString, 
String clusterName) {
+  public static void createGobblinHelixCluster(
+      String zkConnectionString,
+      String clusterName) {
     createGobblinHelixCluster(zkConnectionString, clusterName, true);
   }
 
@@ -49,7 +64,10 @@ public class HelixUtils {
    * @param clusterName the Helix cluster name
    * @param overwrite true to overwrite exiting cluster, false to reuse 
existing cluster
    */
-  public static void createGobblinHelixCluster(String zkConnectionString, 
String clusterName, boolean overwrite) {
+  public static void createGobblinHelixCluster(
+      String zkConnectionString,
+      String clusterName,
+      boolean overwrite) {
     ClusterSetup clusterSetup = new ClusterSetup(zkConnectionString);
     // Create the cluster and overwrite if it already exists
     clusterSetup.addCluster(clusterName, overwrite);
@@ -65,7 +83,90 @@ public class HelixUtils {
    * @param instanceId an integer instance ID
    * @return a Helix instance name that is a concatenation of the given prefix 
and instance ID
    */
-  public static String getHelixInstanceName(String namePrefix, int instanceId) 
{
+  public static String getHelixInstanceName(
+      String namePrefix,
+      int instanceId) {
     return namePrefix + "_" + instanceId;
   }
+
+  public static void submitJobToQueue(
+      JobConfig.Builder jobConfigBuilder,
+      String queueName,
+      String jobName,
+      TaskDriver helixTaskDriver,
+      HelixManager helixManager,
+      long jobQueueDeleteTimeoutSeconds) throws Exception {
+
+    WorkflowConfig workflowConfig = 
helixTaskDriver.getWorkflowConfig(helixManager, queueName);
+
+    // If the queue is present, but in delete state then wait for cleanup 
before recreating the queue
+    if (workflowConfig != null && workflowConfig.getTargetState() == 
TargetState.DELETE) {
+      GobblinHelixTaskDriver gobblinHelixTaskDriver = new 
GobblinHelixTaskDriver(helixManager);
+      gobblinHelixTaskDriver.deleteWorkflow(queueName, 
jobQueueDeleteTimeoutSeconds);
+      // if we get here then the workflow was successfully deleted
+      workflowConfig = null;
+    }
+
+    // Create one queue for each job with the job name being the queue name
+    if (workflowConfig == null) {
+      JobQueue jobQueue = new JobQueue.Builder(queueName).build();
+      helixTaskDriver.createQueue(jobQueue);
+      log.info("Created job queue {}", queueName);
+    } else {
+      log.info("Job queue {} already exists", queueName);
+    }
+
+    // Put the job into the queue
+    helixTaskDriver.enqueueJob(queueName, jobName, jobConfigBuilder);
+  }
+
+  public static void waitJobCompletion(
+      HelixManager helixManager,
+      String queueName,
+      String jobName,
+      Optional<Long> timeoutInSeconds) throws InterruptedException, 
TimeoutException {
+
+    log.info("Waiting for job to complete...");
+    long endTime = 0;
+    if (timeoutInSeconds.isPresent()) {
+      endTime = System.currentTimeMillis() + timeoutInSeconds.get() * 1000;
+    }
+
+    while (!timeoutInSeconds.isPresent() || System.currentTimeMillis() <= 
endTime) {
+      WorkflowContext workflowContext = 
TaskDriver.getWorkflowContext(helixManager, queueName);
+      if (workflowContext != null) {
+        org.apache.helix.task.TaskState helixJobState = 
workflowContext.getJobState(TaskUtil.getNamespacedJobName(queueName, jobName));
+        if (helixJobState == org.apache.helix.task.TaskState.COMPLETED ||
+            helixJobState == org.apache.helix.task.TaskState.FAILED ||
+            helixJobState == org.apache.helix.task.TaskState.STOPPED) {
+          return;
+        }
+      }
+      Thread.sleep(1000);
+    }
+
+    throw new TimeoutException("task driver wait time [" + timeoutInSeconds + 
" sec] is expired.");
+  }
+
+  /**
+   * Because fix 
https://github.com/apache/helix/commit/ae8e8e2ef37f48d782fc12f85ca97728cf2b70c4
+   * is not available in currently used version 0.6.9
+   */
+  public static void helixTaskDriverWaitToStop(
+      HelixManager helixManager,
+      TaskDriver helixTaskDriver,
+      String queueName,
+      long timeoutInSeconds) throws InterruptedException {
+    helixTaskDriver.stop(queueName);
+    long endTime = System.currentTimeMillis() + timeoutInSeconds*1000;
+    while (System.currentTimeMillis() <= endTime) {
+      WorkflowContext workflowContext = 
TaskDriver.getWorkflowContext(helixManager, queueName);
+      if (workflowContext == null || workflowContext.getWorkflowState()
+          .equals(org.apache.helix.task.TaskState.IN_PROGRESS)) {
+        Thread.sleep(1000);
+      } else {
+        log.info("Successfully stopped the queue");
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
index 080adb0..03d4d42 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.cluster;
 
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -32,6 +33,7 @@ import com.typesafe.config.Config;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
 import org.apache.gobblin.metrics.MetricContext;
@@ -46,8 +48,10 @@ import org.apache.gobblin.util.ConfigUtils;
  * A list of {@link Service} : register any runtime services necessary to run 
the tasks.
  */
 @Slf4j
+@Alpha
 public abstract class TaskRunnerSuiteBase {
   protected TaskFactory taskFactory;
+  protected TaskFactory jobFactory;
   protected MetricContext metricContext;
   protected StandardMetricsBridge.StandardMetrics taskMetrics;
   protected List<Service> services = Lists.newArrayList();
@@ -62,7 +66,7 @@ public abstract class TaskRunnerSuiteBase {
 
   protected abstract StandardMetricsBridge.StandardMetrics getTaskMetrics();
 
-  protected abstract TaskFactory getTaskFactory();
+  protected abstract Map<String, TaskFactory> getTaskFactoryMap();
 
   protected abstract List<Service> getServices();
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java
index f54223f..bf21a4a 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java
@@ -18,10 +18,12 @@
 package org.apache.gobblin.cluster;
 
 import java.util.List;
+import java.util.Map;
 
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskFactory;
 
+import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Service;
 
 import lombok.extern.slf4j.Slf4j;
@@ -51,8 +53,13 @@ class TaskRunnerSuiteProcessModel extends 
TaskRunnerSuiteBase {
   }
 
   @Override
-  protected TaskFactory getTaskFactory() {
-    return this.taskFactory;
+  protected Map<String, TaskFactory> getTaskFactoryMap() {
+    Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap();
+
+    taskFactoryMap.put(GobblinTaskRunner.GOBBLIN_TASK_FACTORY_NAME, 
taskFactory);
+
+    //TODO: taskFactoryMap.put(GOBBLIN_JOB_FACTORY_NAME, jobFactory);
+    return taskFactoryMap;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
index fefa5b6..4f3a1e0 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
@@ -19,11 +19,13 @@ package org.apache.gobblin.cluster;
 
 import java.net.URI;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.helix.task.TaskFactory;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Service;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
@@ -46,6 +48,7 @@ class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase {
     super(builder);
     this.taskExecutor = new 
TaskExecutor(ConfigUtils.configToProperties(builder.getConfig()));
     this.taskFactory = getInProcessTaskFactory(taskExecutor, builder);
+    this.jobFactory = new GobblinHelixJobFactory(builder);
     this.taskMetrics = new 
GobblinTaskRunnerMetrics.InProcessTaskRunnerMetrics(taskExecutor, 
metricContext);
   }
 
@@ -55,8 +58,11 @@ class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase 
{
   }
 
   @Override
-  protected TaskFactory getTaskFactory() {
-    return this.taskFactory;
+  protected Map<String, TaskFactory> getTaskFactoryMap() {
+    Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap();
+    taskFactoryMap.put(GobblinTaskRunner.GOBBLIN_TASK_FACTORY_NAME, 
taskFactory);
+    taskFactoryMap.put(GobblinTaskRunner.GOBBLIN_JOB_FACTORY_NAME, jobFactory);
+    return taskFactoryMap;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
index 408ebe2..6baaa42 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -24,6 +24,7 @@ import org.testng.annotations.Test;
 
 import org.apache.gobblin.cluster.suite.IntegrationBasicSuite;
 import 
org.apache.gobblin.cluster.suite.IntegrationDedicatedManagerClusterSuite;
+import org.apache.gobblin.cluster.suite.IntegrationJobFactorySuite;
 import org.apache.gobblin.cluster.suite.IntegrationJobTagSuite;
 import org.apache.gobblin.cluster.suite.IntegrationSeparateProcessSuite;
 
@@ -59,6 +60,13 @@ public class ClusterIntegrationTest {
     runAndVerify();
   }
 
+  @Test
+  public void testPlanningJobFactory()
+      throws Exception {
+    this.suite = new IntegrationJobFactorySuite();
+    runAndVerify();
+  }
+
   private void runAndVerify()
       throws Exception {
     suite.startCluster();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
new file mode 100644
index 0000000..1fdcda5
--- /dev/null
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+import org.testng.Assert;
+
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.cluster.suite.IntegrationJobFactorySuite;
+import org.apache.gobblin.runtime.util.StateStores;
+import org.apache.gobblin.source.extractor.partition.Partitioner;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+public class TaskRunnerSuiteForJobFactoryTest extends 
TaskRunnerSuiteThreadModel {
+  private TaskFactory testJobFactory;
+  public 
TaskRunnerSuiteForJobFactoryTest(IntegrationJobFactorySuite.TestJobFactorySuiteBuilder
 builder) {
+    super(builder);
+    this.testJobFactory = new TestJobFactory(builder);
+  }
+
+  @Override
+  protected Map<String, TaskFactory> getTaskFactoryMap() {
+    Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap();
+    taskFactoryMap.put(GobblinTaskRunner.GOBBLIN_TASK_FACTORY_NAME, 
taskFactory);
+    taskFactoryMap.put(GobblinTaskRunner.GOBBLIN_JOB_FACTORY_NAME, 
testJobFactory);
+    return taskFactoryMap;
+  }
+
+  public class TestJobFactory extends GobblinHelixJobFactory {
+    public 
TestJobFactory(IntegrationJobFactorySuite.TestJobFactorySuiteBuilder builder) {
+      super (builder);
+    }
+
+    @Override
+    public Task createNewTask(TaskCallbackContext context) {
+      return new TestHelixJobTask(context, this.sysConfig, stateStores);
+    }
+  }
+
+  public class TestHelixJobTask extends GobblinHelixJobTask {
+    public TestHelixJobTask(TaskCallbackContext context,
+        Config sysConfig,
+        StateStores stateStores) {
+      super(context, sysConfig, stateStores);
+    }
+
+    //TODO: change below to Helix UserConentStore
+    protected void setResultToUserContent(Map<String, String> keyValues) 
throws IOException {
+      Map<String, String> customizedKVs = Maps.newHashMap(keyValues);
+      customizedKVs.put("customizedKey_1", "customizedVal_1");
+      customizedKVs.put("customizedKey_2", "customizedVal_2");
+      customizedKVs.put("customizedKey_3", "customizedVal_3");
+      super.setResultToUserContent(customizedKVs);
+    }
+  }
+
+  @Slf4j
+  public static class TestDistributedExecutionLauncher extends 
GobblinHelixDistributeJobExecutionLauncher {
+
+    public 
TestDistributedExecutionLauncher(GobblinHelixDistributeJobExecutionLauncher.Builder
 builder) throws Exception {
+      super(builder);
+    }
+
+    //TODO: change below to Helix UserConentStore
+    protected DistributeJobResult getResultFromUserContent() {
+      DistributeJobResult rst = super.getResultFromUserContent();
+      Properties properties = rst.getProperties().get();
+      Assert.assertTrue(properties.containsKey(Partitioner.IS_EARLY_STOPPED));
+      Assert.assertFalse(PropertiesUtils.getPropAsBoolean(properties, 
Partitioner.IS_EARLY_STOPPED, "false"));
+
+      
Assert.assertTrue(properties.getProperty("customizedKey_1").equals("customizedVal_1"));
+      
Assert.assertTrue(properties.getProperty("customizedKey_2").equals("customizedVal_2"));
+      
Assert.assertTrue(properties.getProperty("customizedKey_3").equals("customizedVal_3"));
+      IntegrationJobFactorySuite.completed.set(true);
+      return rst;
+    }
+
+
+    @Alias("TestDistributedExecutionLauncherBuilder")
+    public static class Builder extends 
GobblinHelixDistributeJobExecutionLauncher.Builder {
+      public TestDistributedExecutionLauncher build() throws Exception {
+        return new TestDistributedExecutionLauncher(this);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobTagTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobTagTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobTagTest.java
index 5f3b2fe..751c314 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobTagTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobTagTest.java
@@ -25,6 +25,8 @@ import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskFactory;
 import org.testng.Assert;
 
+import com.google.common.collect.Maps;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.cluster.suite.IntegrationJobTagSuite;
@@ -45,8 +47,10 @@ public class TaskRunnerSuiteForJobTagTest extends 
TaskRunnerSuiteThreadModel {
   }
 
   @Override
-  protected TaskFactory getTaskFactory() {
-    return this.jobTagTestFactory;
+  protected Map<String, TaskFactory> getTaskFactoryMap() {
+    Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap();
+    taskFactoryMap.put(GobblinTaskRunner.GOBBLIN_TASK_FACTORY_NAME, 
jobTagTestFactory);
+    return taskFactoryMap;
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
index bab8b30..eff11c8 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
@@ -17,9 +17,15 @@
 
 package org.apache.gobblin.cluster.suite;
 
+import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.io.Writer;
 import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -32,10 +38,15 @@ import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
 import org.assertj.core.util.Lists;
 
+import com.google.common.base.Charsets;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.io.Resources;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import com.typesafe.config.ConfigRenderOptions;
+import com.typesafe.config.ConfigSyntax;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -124,10 +135,32 @@ public class IntegrationBasicSuite {
     }
   }
 
-  protected void copyJobConfFromResource() throws IOException {
+  private void copyJobConfFromResource() throws IOException {
+    Map<String, Config> jobConfigs;
     try (InputStream resourceStream = this.jobConfResourceUrl.openStream()) {
-      File targetFile = new File(this.jobConfigPath + "/" + JOB_CONF_NAME);
-      FileUtils.copyInputStreamToFile(resourceStream, targetFile);
+      Reader reader = new InputStreamReader(resourceStream);
+      Config rawJobConfig = ConfigFactory.parseReader(reader, 
ConfigParseOptions.defaults().setSyntax(ConfigSyntax.CONF));
+      jobConfigs = overrideJobConfigs(rawJobConfig);
+      jobConfigs.forEach((jobName, jobConfig)-> {
+        try {
+          writeJobConf(jobName, jobConfig);
+        } catch (IOException e) {
+          log.error("Job " + jobName + " config cannot be written.");
+        }
+      });
+    }
+  }
+
+  protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) {
+    return ImmutableMap.of("HelloWorldJob", rawJobConfig);
+  }
+
+  private void writeJobConf(String jobName, Config jobConfig) throws 
IOException {
+    String targetPath = this.jobConfigPath + "/" + jobName + ".conf";
+    String renderedConfig = 
jobConfig.root().render(ConfigRenderOptions.defaults());
+    try (DataOutputStream os = new DataOutputStream(new 
FileOutputStream(targetPath));
+        Writer writer = new OutputStreamWriter(os, Charsets.UTF_8)) {
+      writer.write(renderedConfig);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java
new file mode 100644
index 0000000..0487c9c
--- /dev/null
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster.suite;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.testng.collections.Lists;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.TaskRunnerSuiteBase;
+import org.apache.gobblin.cluster.TaskRunnerSuiteForJobFactoryTest;
+
+@Slf4j
+public class IntegrationJobFactorySuite extends IntegrationBasicSuite {
+
+  public static AtomicBoolean completed = new AtomicBoolean(false);
+
+  @Override
+  protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) {
+    Config newConfig = ConfigFactory.parseMap(ImmutableMap.of(
+        GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_ENABLED, true,
+        GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER, 
"TestDistributedExecutionLauncherBuilder"));
+    return ImmutableMap.of("HelloWorldJob", newConfig);
+  }
+
+  @Override
+  public Collection<Config> getWorkerConfigs() {
+    Config rawConfig = super.getWorkerConfigs().iterator().next();
+    Config workerConfig = 
ConfigFactory.parseMap(ImmutableMap.of(GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER,
 "TestJobFactorySuiteBuilder"))
+        .withFallback(rawConfig);
+
+    return Lists.newArrayList(workerConfig);
+  }
+
+  public void waitForAndVerifyOutputFiles() throws Exception {
+    while (true) {
+      Thread.sleep(1000);
+      if (completed.get()) {
+        break;
+      } else {
+        log.info("Waiting for job to be finished");
+      }
+    }
+  }
+
+  @Alias("TestJobFactorySuiteBuilder")
+  public static class TestJobFactorySuiteBuilder extends 
TaskRunnerSuiteBase.Builder {
+    public TestJobFactorySuiteBuilder(Config config) {
+      super(config);
+    }
+
+    @Override
+    public TaskRunnerSuiteBase build() {
+      return new TaskRunnerSuiteForJobFactoryTest(this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java
index adaf702..4424cae 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java
@@ -17,31 +17,20 @@
 
 package org.apache.gobblin.cluster.suite;
 
-import java.io.DataOutputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.Reader;
-import java.io.Writer;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.testng.collections.Lists;
+import org.testng.collections.Maps;
 
-import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import com.typesafe.config.ConfigRenderOptions;
-import com.typesafe.config.ConfigSyntax;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -136,41 +125,24 @@ public class IntegrationJobTagSuite extends 
IntegrationBasicSuite {
    * Create different jobs with different tags
    */
   @Override
-  protected void copyJobConfFromResource() throws IOException {
-    try (InputStream resourceStream = this.jobConfResourceUrl.openStream()) {
-      Reader reader = new InputStreamReader(resourceStream);
-      Config jobConfig = ConfigFactory.parseReader(reader, 
ConfigParseOptions.defaults().setSyntax(ConfigSyntax.CONF));
-      for(Map.Entry<String, String> assoc: JOB_TAG_ASSOCIATION.entrySet()) {
-        generateJobConf(jobConfig,assoc.getKey(),assoc.getValue());
-      }
+  protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) {
+    Map<String, Config> jobConfigs = Maps.newHashMap();
+    for(Map.Entry<String, String> assoc: JOB_TAG_ASSOCIATION.entrySet()) {
+      Config newConfig = getConfigOverride(rawJobConfig, assoc.getKey(), 
assoc.getValue());
+      jobConfigs.put(assoc.getKey(), newConfig);
     }
+    return jobConfigs;
   }
 
-  private void generateJobConf(Config jobConfig, String jobName, String tag) 
throws IOException {
-    Config newConfig = addJobTag(jobConfig, tag);
-    newConfig = getConfigOverride(newConfig, jobName);
-
-    String targetPath = this.jobConfigPath + "/" + jobName + ".conf";
-    String renderedConfig = 
newConfig.root().render(ConfigRenderOptions.defaults());
-    try (DataOutputStream os = new DataOutputStream(new 
FileOutputStream(targetPath));
-        Writer writer = new OutputStreamWriter(os, Charsets.UTF_8)) {
-      writer.write(renderedConfig);
-    }
-  }
-
-  private Config getConfigOverride(Config config, String jobName) {
+  private Config getConfigOverride(Config config, String jobName, String 
jobTag) {
     Config newConfig = ConfigFactory.parseMap(ImmutableMap.of(
+        GobblinClusterConfigurationKeys.HELIX_JOB_TAG_KEY, jobTag,
         ConfigurationKeys.JOB_NAME_KEY, jobName,
         ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, this.jobOutputBasePath + 
"/" + jobName))
         .withFallback(config);
     return newConfig;
   }
 
-  private Config addJobTag(Config jobConfig, String jobTag) {
-    return 
ConfigFactory.parseMap(ImmutableMap.of(GobblinClusterConfigurationKeys.HELIX_JOB_TAG_KEY,
 jobTag))
-        .withFallback(jobConfig);
-  }
-
   @Override
   public void waitForAndVerifyOutputFiles() throws Exception {
     AssertWithBackoff asserter = 
AssertWithBackoff.create().logger(log).timeoutMs(60_000)

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index fb1e590..4ab6db8 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -61,6 +61,10 @@ public class PropertiesUtils {
     return Boolean.valueOf(properties.getProperty(key, defaultValue));
   }
 
+  public static long getPropAsLong(Properties properties, String key, long 
defaultValue) {
+    return Long.valueOf(properties.getProperty(key, 
Long.toString(defaultValue)));
+  }
+
   /**
    * Extract all the keys that start with a <code>prefix</code> in {@link 
Properties} to a new {@link Properties}
    * instance.

Reply via email to