Repository: incubator-gobblin
Updated Branches:
  refs/heads/master ccaa02c6e -> 11a1c46ab


[GOBBLIN-647] Move early stop logic to task driver instance.

Closes #2517 from kyuamazon/td


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/11a1c46a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/11a1c46a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/11a1c46a

Branch: refs/heads/master
Commit: 11a1c46ab9907c403f384b51a81f56c2d281ccaf
Parents: ccaa02c
Author: Kuai Yu <[email protected]>
Authored: Fri Dec 7 18:35:54 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Fri Dec 7 18:35:54 2018 -0800

----------------------------------------------------------------------
 .../GobblinClusterConfigurationKeys.java        |   6 +-
 ...blinHelixDistributeJobExecutionLauncher.java |  58 ++-----
 .../gobblin/cluster/GobblinHelixJobFactory.java |  26 ++--
 .../cluster/GobblinHelixJobLauncher.java        |   6 +-
 .../gobblin/cluster/GobblinHelixJobTask.java    |  80 ++++++----
 .../gobblin/cluster/HelixJobsMapping.java       | 150 +++++++++++++++++++
 .../cluster/HelixRetriggeringJobCallable.java   | 130 +++++++++-------
 .../org/apache/gobblin/cluster/HelixUtils.java  |  25 ++++
 .../TaskRunnerSuiteForJobFactoryTest.java       |  41 ++---
 9 files changed, 356 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11a1c46a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 00a2e91..873793f 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -126,5 +126,9 @@ public class GobblinClusterConfigurationKeys {
   public static final String HELIX_JOB_TIMEOUT_SECONDS = 
"helix.job.timeout.seconds";
   public static final String DEFAULT_HELIX_JOB_TIMEOUT_SECONDS = "10800";
   public static final String HELIX_TASK_TIMEOUT_SECONDS = 
"helix.task.timeout.seconds";
-  public static final String HELIX_MAX_TASK_RETRIES_KEY = 
"helix.task.maxretries";
+  public static final String HELIX_TASK_MAX_ATTEMPTS_KEY = 
"helix.task.maxAttempts";
+
+  public static final String HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS = 
GOBBLIN_CLUSTER_PREFIX + "workflowDeleteTimeoutSeconds";
+  public static final long DEFAULT_HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS = 300;
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11a1c46a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index e01ab28..609639e 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.cluster;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -36,11 +35,9 @@ import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskDriver;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -50,19 +47,16 @@ import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.runtime.api.ExecutionResult;
 import org.apache.gobblin.runtime.api.JobExecutionLauncher;
 import org.apache.gobblin.runtime.api.JobExecutionMonitor;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.MonitoredObject;
-import org.apache.gobblin.runtime.util.StateStores;
-import org.apache.gobblin.source.extractor.partition.Partitioner;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.PropertiesUtils;
 
 
@@ -92,11 +86,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher
   protected TaskDriver helixTaskDriver;
   protected Properties sysProps;
   protected Properties jobPlanningProps;
-  protected StateStores stateStores;
-
-  protected static final String PLANNING_WORK_UNIT_DIR_NAME = 
"_plan_workunits";
-  protected static final String PLANNING_TASK_STATE_DIR_NAME = 
"_plan_taskstates";
-  protected static final String PLANNING_JOB_STATE_DIR_NAME = 
"_plan_jobstates";
+  protected HelixJobsMapping jobsMapping;
 
   protected static final String JOB_PROPS_PREFIX = "gobblin.jobProps.";
 
@@ -123,15 +113,9 @@ class GobblinHelixDistributeJobExecutionLauncher 
implements JobExecutionLauncher
     Config combined = ConfigUtils.propertiesToConfig(jobPlanningProps)
         .withFallback(ConfigUtils.propertiesToConfig(sysProps));
 
-    Config stateStoreJobConfig = combined
-        .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, 
ConfigValueFactory.fromAnyRef(
-            new URI(builder.appWorkDir.toUri().getScheme(), null, 
builder.appWorkDir.toUri().getHost(),
-                builder.appWorkDir.toUri().getPort(), null, null, 
null).toString()));
-
-    this.stateStores = new StateStores(stateStoreJobConfig,
-        builder.appWorkDir, PLANNING_TASK_STATE_DIR_NAME,
-        builder.appWorkDir, PLANNING_WORK_UNIT_DIR_NAME,
-        builder.appWorkDir, PLANNING_JOB_STATE_DIR_NAME);
+    this.jobsMapping = new HelixJobsMapping(combined,
+                                            
PathUtils.getRootPath(builder.appWorkDir).toUri(),
+                                            builder.appWorkDir.toString());
 
     this.workFlowExpiryTimeSeconds = ConfigUtils.getLong(combined,
         GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
@@ -144,17 +128,17 @@ class GobblinHelixDistributeJobExecutionLauncher 
implements JobExecutionLauncher
 
   private void executeCancellation() {
     if (this.jobSubmitted) {
-      String planningName = getPlanningJobId(this.jobPlanningProps);
+      String planningJobId = getPlanningJobId(this.jobPlanningProps);
       try {
         if (this.cancellationRequested && !this.cancellationExecuted) {
           // TODO : fix this when HELIX-1180 is completed
           // work flow should never be deleted explicitly because it has a 
expiry time
           // If cancellation is requested, we should set the job state to 
CANCELLED/ABORT
-          this.helixTaskDriver.waitToStop(planningName, 10000L);
-          log.info("Stopped the workflow ", planningName);
+          this.helixTaskDriver.waitToStop(planningJobId, 10000L);
+          log.info("Stopped the workflow ", planningJobId);
         }
       } catch (InterruptedException e) {
-        throw new RuntimeException("Failed to stop workflow " + planningName + 
" in Helix", e);
+        throw new RuntimeException("Failed to stop workflow " + planningJobId 
+ " in Helix", e);
       }
     }
   }
@@ -262,7 +246,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher
         return waitForJobCompletion(planningId, planningId);
       } catch (Exception e) {
         log.error(planningId + " is not able to submit.");
-        return new DistributeJobResult(Optional.empty(), Optional.of(e));
+        return new DistributeJobResult(false);
       }
     }
   }
@@ -283,35 +267,19 @@ class GobblinHelixDistributeJobExecutionLauncher 
implements JobExecutionLauncher
     } catch (TimeoutException te) {
       HelixUtils.handleJobTimeout(workFlowName, jobName,
           helixManager, this, null);
-      return new DistributeJobResult(Optional.empty(), Optional.of(te));
+      return new DistributeJobResult(false);
     }
   }
 
-  //TODO: change below to Helix UserConentStore
-  @VisibleForTesting
+  //TODO: we will remove this logic after we change to polling model.
   protected DistributeJobResult getResultFromUserContent() {
-    String planningId = getPlanningJobId(this.jobPlanningProps);
-    try {
-      TaskState taskState = 
this.stateStores.getTaskStateStore().get(planningId, planningId, planningId);
-      return new DistributeJobResult(Optional.of(taskState.getProperties()), 
Optional.empty());
-    } catch (IOException e) {
-      return new DistributeJobResult(Optional.empty(), Optional.of(e));
-    }
+    return new DistributeJobResult(false);
   }
 
   @Getter
   @AllArgsConstructor
   static class DistributeJobResult implements ExecutionResult {
     boolean isEarlyStopped = false;
-    Optional<Properties> properties;
-    Optional<Throwable> throwable;
-    public DistributeJobResult(Optional<Properties> properties, 
Optional<Throwable> throwable) {
-      this.properties = properties;
-      this.throwable = throwable;
-      if (properties.isPresent()) {
-        isEarlyStopped = 
PropertiesUtils.getPropAsBoolean(this.properties.get(), 
Partitioner.IS_EARLY_STOPPED, "false");
-      }
-    }
   }
 
   private class DistributeJobMonitor extends FutureTask<ExecutionResult> 
implements JobExecutionMonitor {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11a1c46a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
index 5bee4e0..f631abf 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java
@@ -25,24 +25,23 @@ import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskFactory;
 
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.runtime.util.StateStores;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
 
 
 /**
- * An implementation of Helix's {@link TaskFactory} for {@link 
GobblinHelixJobTask}s.
+ * <p> A {@link TaskFactory} that creates {@link GobblinHelixJobTask}
+ * to run task driver logic.
  */
 @Slf4j
 class GobblinHelixJobFactory implements TaskFactory {
-  protected StateStores stateStores;
+  protected HelixJobsMapping jobsMapping;
 
   protected TaskRunnerSuiteBase.Builder builder;
   @Getter
@@ -50,25 +49,20 @@ class GobblinHelixJobFactory implements TaskFactory {
   @Getter
   protected GobblinHelixJobTask.GobblinHelixJobTaskMetrics jobTaskMetrics;
 
-  private void initializeStateStore(TaskRunnerSuiteBase.Builder builder) {
+  private void initJobMapping(TaskRunnerSuiteBase.Builder builder) {
     Config sysConfig = builder.getConfig();
     Path appWorkDir = builder.getAppWorkPath();
     URI rootPathUri = PathUtils.getRootPath(appWorkDir).toUri();
-    Config stateStoreJobConfig = sysConfig
-        .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY,
-            ConfigValueFactory.fromAnyRef(rootPathUri.toString()));
-
-    this.stateStores = new StateStores(stateStoreJobConfig,
-        appWorkDir, 
GobblinHelixDistributeJobExecutionLauncher.PLANNING_TASK_STATE_DIR_NAME,
-        appWorkDir, 
GobblinHelixDistributeJobExecutionLauncher.PLANNING_WORK_UNIT_DIR_NAME,
-        appWorkDir, 
GobblinHelixDistributeJobExecutionLauncher.PLANNING_JOB_STATE_DIR_NAME);
+    this.jobsMapping = new HelixJobsMapping(sysConfig,
+                                            rootPathUri,
+                                            appWorkDir.toString());
   }
 
   public GobblinHelixJobFactory(TaskRunnerSuiteBase.Builder builder, 
MetricContext metricContext) {
 
     this.builder = builder;
-    // TODO: We can remove below initialization once Helix allow us to persist 
job resut in userContentStore
-    initializeStateStore(this.builder);
+    initJobMapping(this.builder);
+
     // initialize job related metrics (planning jobs)
     int metricsWindowSizeInMin = ConfigUtils.getInt(builder.getConfig(),
         ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
@@ -84,7 +78,7 @@ class GobblinHelixJobFactory implements TaskFactory {
   @Override
   public Task createNewTask(TaskCallbackContext context) {
     return new GobblinHelixJobTask(context,
-        this.stateStores,
+        this.jobsMapping,
         this.builder,
         this.launcherMetrics,
         this.jobTaskMetrics);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11a1c46a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 8d6d7b2..5798da0 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -184,6 +184,10 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     }
   }
 
+  public String getJobId() {
+    return this.jobContext.getJobId();
+  }
+
   @Override
   protected void runWorkUnits(List<WorkUnit> workUnits) throws Exception {
     try {
@@ -270,7 +274,7 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
 
     // Helix task attempts = retries + 1 (fallback to general task retry for 
backward compatibility)
     
jobConfigBuilder.setMaxAttemptsPerTask(this.jobContext.getJobState().getPropAsInt(
-        GobblinClusterConfigurationKeys.HELIX_MAX_TASK_RETRIES_KEY,
+        GobblinClusterConfigurationKeys.HELIX_TASK_MAX_ATTEMPTS_KEY,
         this.jobContext.getJobState().getPropAsInt(
             ConfigurationKeys.MAX_TASK_RETRIES_KEY,
             ConfigurationKeys.DEFAULT_MAX_TASK_RETRIES)) + 1);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11a1c46a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
index 2f5bcdf..0266413 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
@@ -17,23 +17,22 @@
 
 package org.apache.gobblin.cluster;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.fs.Path;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskResult;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.io.Closer;
 import com.typesafe.config.Config;
@@ -41,17 +40,15 @@ import com.typesafe.config.Config;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
 import org.apache.gobblin.metrics.ContextAwareTimer;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.JobException;
-import org.apache.gobblin.runtime.TaskState;
-import org.apache.gobblin.runtime.util.StateStores;
-import org.apache.gobblin.source.extractor.partition.Partitioner;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PropertiesUtils;
+
 
 /**
  * An implementation of Helix's {@link org.apache.helix.task.Task} that runs 
original {@link GobblinHelixJobLauncher}.
@@ -62,7 +59,7 @@ class GobblinHelixJobTask implements Task {
   private final TaskConfig taskConfig;
   private final Config sysConfig;
   private final Properties jobPlusSysConfig;
-  private final StateStores stateStores;
+  private final HelixJobsMapping jobsMapping;
   private final String planningJobId;
   private final HelixManager helixManager;
   private final Path appWorkDir;
@@ -72,7 +69,7 @@ class GobblinHelixJobTask implements Task {
   private GobblinHelixJobLauncherListener jobLauncherListener;
 
   public GobblinHelixJobTask (TaskCallbackContext context,
-                              StateStores stateStores,
+                              HelixJobsMapping jobsMapping,
                               TaskRunnerSuiteBase.Builder builder,
                               GobblinHelixJobLauncherMetrics launcherMetrics,
                               GobblinHelixJobTaskMetrics jobTaskMetrics) {
@@ -96,7 +93,7 @@ class GobblinHelixJobTask implements Task {
     }
 
     this.planningJobId = 
jobPlusSysConfig.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY);
-    this.stateStores = stateStores;
+    this.jobsMapping = jobsMapping;
     this.appWorkDir = builder.getAppWorkPath();
     this.metadataTags = Tag.fromMap(new ImmutableMap.Builder<String, Object>()
         .put(GobblinClusterMetricTagNames.APPLICATION_NAME, 
builder.getApplicationName())
@@ -117,7 +114,7 @@ class GobblinHelixJobTask implements Task {
     public void updateTimeBetweenJobSubmissionAndExecution(Properties 
jobProps) {
       long jobSubmitTime = 
Long.parseLong(jobProps.getProperty(GobblinClusterConfigurationKeys.PLANNING_JOB_CREATE_TIME,
 "0"));
       if (jobSubmitTime != 0) {
-        
Instrumented.updateTimer(Optional.of(this.timeBetweenJobSubmissionAndExecution),
+        
Instrumented.updateTimer(com.google.common.base.Optional.of(this.timeBetweenJobSubmissionAndExecution),
             System.currentTimeMillis() - jobSubmitTime,
             TimeUnit.MILLISECONDS);
       }
@@ -140,10 +137,52 @@ class GobblinHelixJobTask implements Task {
   public TaskResult run() {
     log.info("Running planning job {}", this.planningJobId);
     
this.jobTaskMetrics.updateTimeBetweenJobSubmissionAndExecution(this.jobPlusSysConfig);
+
     try (Closer closer = Closer.create()) {
-      this.launcher = createJobLauncher();
-      closer.register(launcher).launchJob(this.jobLauncherListener);
-      setResultToUserContent(ImmutableMap.of(Partitioner.IS_EARLY_STOPPED, 
"false"));
+      String jobName = 
jobPlusSysConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY);
+
+      Optional<String> planningIdFromStateStore = 
this.jobsMapping.getPlanningJobId(jobName);
+
+      long timeOut = PropertiesUtils.getPropAsLong(jobPlusSysConfig,
+                                         
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS,
+                                         
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS) 
* 1000;
+
+      if (planningIdFromStateStore.isPresent() && 
!planningIdFromStateStore.get().equals(this.planningJobId)) {
+        return new TaskResult(TaskResult.Status.FAILED, "Exception occurred 
for job " + planningJobId
+            + ": because planning job in state store has different id (" + 
planningIdFromStateStore.get() + ")");
+      }
+
+      while (true) {
+        Optional<String> actualJobIdFromStateStore = 
this.jobsMapping.getActualJobId(jobName);
+        if (actualJobIdFromStateStore.isPresent()) {
+          String previousActualJobId = actualJobIdFromStateStore.get();
+          if (HelixUtils.isJobFinished(previousActualJobId, 
previousActualJobId, this.helixManager)) {
+            log.info("Previous actual job {} [plan: {}] finished, will launch 
a new job.", previousActualJobId, this.planningJobId);
+          } else {
+            log.info("Previous actual job {} [plan: {}] not finished, kill it 
now.", previousActualJobId, this.planningJobId);
+            try {
+              HelixUtils.deleteWorkflow(previousActualJobId, 
this.helixManager, timeOut);
+            } catch (HelixException e) {
+              log.error("Helix cannot delete previous actual job id {} within 
5 min.", previousActualJobId);
+              return new TaskResult(TaskResult.Status.FAILED, 
ExceptionUtils.getFullStackTrace(e));
+            }
+          }
+        } else {
+          log.info("Actual job {} does not exist. First time run.", 
this.planningJobId);
+        }
+
+        this.launcher = createJobLauncher();
+
+        this.jobsMapping.setActualJobId(jobName, this.planningJobId, 
this.launcher.getJobId());
+
+        closer.register(launcher).launchJob(this.jobLauncherListener);
+
+        if (!this.launcher.isEarlyStopped()) {
+          break;
+        } else {
+          log.info("Planning job {} has more runs due to early stop.", 
this.planningJobId);
+        }
+      }
     } catch (Exception e) {
       return new TaskResult(TaskResult.Status.FAILED, "Exception occurred for 
job " + planningJobId + ":" + ExceptionUtils
           .getFullStackTrace(e));
@@ -151,19 +190,6 @@ class GobblinHelixJobTask implements Task {
     return new TaskResult(TaskResult.Status.COMPLETED, "");
   }
 
-  //TODO: change below to Helix UserConentStore
-  @VisibleForTesting
-  protected void setResultToUserContent(Map<String, String> keyValues) throws 
IOException {
-    WorkUnitState wus = new WorkUnitState();
-    wus.setProp(ConfigurationKeys.JOB_ID_KEY, this.planningJobId);
-    wus.setProp(ConfigurationKeys.TASK_ID_KEY, this.planningJobId);
-    wus.setProp(ConfigurationKeys.TASK_KEY_KEY, this.planningJobId);
-    keyValues.forEach((key, value)->wus.setProp(key, value));
-    TaskState taskState = new TaskState(wus);
-
-    this.stateStores.getTaskStateStore().put(this.planningJobId, 
this.planningJobId, taskState);
-  }
-
   @Override
   public void cancel() {
     log.info("Cancelling planning job {}", this.planningJobId);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11a1c46a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
new file mode 100644
index 0000000..8c2a836
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Optional;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import javax.annotation.Nullable;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * <p> Any job that submitted to Helix will have a unique id.
+ * We need some mapping between the job name and the job id,
+ * in order to perform:
+ *
+ *  1) cancel a running job
+ *  2) delete a running job
+ *  3) block any incoming job with same name.
+ *
+ * <p> More complexity comes into the picture in the distributed
+ * ask driver mode, where we will have a job name, which maps to a
+ * planning job id and further maps to a real job id.
+ *
+ * <p> We will leverage the state store functionality. We will save
+ * job name as a storeName, and tableName. The planning job id and
+ * real job id will be saved in the state object.
+ */
+public class HelixJobsMapping {
+
+  public static final String JOBS_MAPPING_DB_TABLE_KEY = 
"jobs.mapping.db.table.key";
+  public static final String DEFAULT_JOBS_MAPPING_DB_TABLE_KEY_NAME = 
"JobsMapping";
+
+  public static final String DISTRIBUTED_STATE_STORE_NAME_KEY = 
"jobs.mapping.distributed.state.store.name";
+  public static final String DEFAULT_DISTRIBUTED_STATE_STORE_NAME = 
"distributedState";
+ 
+  private StateStore stateStore;
+  private String distributedStateStoreName;
+
+  public HelixJobsMapping(Config sysConfig, URI fsUri, String rootDir) {
+    String stateStoreType = ConfigUtils.getString(sysConfig,
+                                                  
ConfigurationKeys.INTERMEDIATE_STATE_STORE_TYPE_KEY,
+                                                  
ConfigUtils.getString(sysConfig,
+                                                                        
ConfigurationKeys.STATE_STORE_TYPE_KEY,
+                                                                        
ConfigurationKeys.DEFAULT_STATE_STORE_TYPE));
+
+    ClassAliasResolver<StateStore.Factory> resolver =
+        new ClassAliasResolver<>(StateStore.Factory.class);
+    StateStore.Factory stateStoreFactory;
+
+    try {
+      stateStoreFactory = resolver.resolveClass(stateStoreType).newInstance();
+    } catch (ClassNotFoundException cnfe) {
+      throw new RuntimeException(cnfe);
+    } catch (InstantiationException ie) {
+      throw new RuntimeException(ie);
+    } catch (IllegalAccessException iae) {
+      throw new RuntimeException(iae);
+    }
+
+    String dbTableKey = ConfigUtils.getString(sysConfig, 
JOBS_MAPPING_DB_TABLE_KEY, DEFAULT_JOBS_MAPPING_DB_TABLE_KEY_NAME);
+    this.distributedStateStoreName = ConfigUtils.getString(sysConfig, 
DISTRIBUTED_STATE_STORE_NAME_KEY, DEFAULT_DISTRIBUTED_STATE_STORE_NAME);
+
+    Config stateStoreJobConfig = sysConfig
+        .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, 
ConfigValueFactory.fromAnyRef(fsUri.toString()))
+        .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
ConfigValueFactory.fromAnyRef(rootDir))
+        .withValue(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, 
ConfigValueFactory.fromAnyRef(dbTableKey));
+
+    this.stateStore = stateStoreFactory.createStateStore(stateStoreJobConfig, 
State.class);
+  }
+
+  @Nullable
+  private State getOrCreate (String storeName, String jobName) throws 
IOException {
+    if (this.stateStore.exists(storeName, jobName)) {
+      return this.stateStore.get(storeName, jobName, jobName);
+    }
+    return new State();
+  }
+
+  private void delete (String storeName, String jobName) throws IOException {
+    this.stateStore.delete(storeName, jobName);
+  }
+
+  public void setPlanningJobId (String jobName, String planningJobId) throws 
IOException {
+    State state = getOrCreate(distributedStateStoreName, jobName);
+    state.setId(jobName);
+    state.setProp(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, 
planningJobId);
+    // fs state store use hdfs rename, which assumes the target file doesn't 
exist.
+    if (stateStore instanceof FsStateStore) {
+      this.delete(distributedStateStoreName, jobName);
+    }
+    this.stateStore.put(distributedStateStoreName, jobName, state);
+  }
+
+  public void setActualJobId (String jobName, String planningJobId, String 
actualJobId) throws IOException {
+    State state = getOrCreate(distributedStateStoreName, jobName);
+    state.setId(jobName);
+    state.setProp(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, 
planningJobId);
+    state.setProp(ConfigurationKeys.JOB_ID_KEY, actualJobId);
+    // fs state store use hdfs rename, which assumes the target file doesn't 
exist.
+    if (stateStore instanceof FsStateStore) {
+      this.delete(distributedStateStoreName, jobName);
+    }
+    this.stateStore.put(distributedStateStoreName, jobName, state);
+  }
+
+  private Optional<String> getId (String jobName, String idName) throws 
IOException {
+    State state = this.stateStore.get(distributedStateStoreName, jobName, 
jobName);
+    if (state == null) {
+      return Optional.empty();
+    }
+
+    String id = state.getProp(idName);
+
+    return id == null? Optional.empty() : Optional.of(id);
+  }
+
+  public Optional<String> getActualJobId (String jobName) throws IOException {
+    return getId(jobName, ConfigurationKeys.JOB_ID_KEY);
+  }
+
+  public Optional<String> getPlanningJobId (String jobName) throws IOException 
{
+    return getId(jobName, GobblinClusterConfigurationKeys.PLANNING_ID_KEY);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11a1c46a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index 6b400d3..f9c70f7 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.cluster;
 
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.Callable;
 
@@ -31,11 +32,12 @@ import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.JobState;
-import org.apache.gobblin.runtime.api.ExecutionResult;
 import org.apache.gobblin.runtime.api.JobExecutionMonitor;
 import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.PropertiesUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
@@ -50,9 +52,9 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
  *
  * <p> Non-Distribution Mode:
  *    If {@link 
GobblinClusterConfigurationKeys#DISTRIBUTED_JOB_LAUNCHER_ENABLED} is false, the 
job will be handled
- *    by {@link HelixRetriggeringJobCallable#launchJobLauncherLoop()}, which 
simply launches {@link GobblinHelixJobLauncher}
+ *    by {@link HelixRetriggeringJobCallable#runJobLauncherLoop()}, which 
simply launches {@link GobblinHelixJobLauncher}
  *    and submit the work units to Helix. Helix will dispatch the work units 
to different worker nodes. The worker node will
- *    handle the work units by {@link GobblinHelixTask}.
+ *    handle the work units via launching {@link GobblinHelixTask}.
  *
  *    See {@link GobblinHelixJobLauncher} for job launcher details.
  *    See {@link GobblinHelixTask} for work unit handling details.
@@ -60,13 +62,14 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
  *
  * <p> Distribution Mode:
  *   If {@link 
GobblinClusterConfigurationKeys#DISTRIBUTED_JOB_LAUNCHER_ENABLED} is true, the 
job will be handled
- *   by {@link 
HelixRetriggeringJobCallable#launchJobExecutionLauncherLoop()}}, which simply 
launches
+ *   by {@link HelixRetriggeringJobCallable#runJobExecutionLauncher()}, which 
simply launches
  *   {@link GobblinHelixDistributeJobExecutionLauncher} and submit a planning 
job to Helix. Helix will dispatch this
- *   planning job to a worker node. The worker node will handle this planning 
job by {@link GobblinHelixJobTask}.
+ *   planning job to a task-driver node. The task-driver node will handle this 
planning job via launching
+ *   {@link GobblinHelixJobTask}.
  *
- *   The {@link GobblinHelixJobTask} will launch {@link 
GobblinHelixJobLauncher} and it will again submit the actual
- *   work units to Helix. Helix will dispatch the work units to other worker 
nodes. Similar to Non-Distribution Node,
- *   some worker nodes will handle those work units by {@link 
GobblinHelixTask}.
+ *   The {@link GobblinHelixJobTask} will again launch {@link 
GobblinHelixJobLauncher} to submit the actual job
+ *   to Helix. Helix will dispatch the work units to other worker nodes. 
Similar to Non-Distribution Node,
+ *   some worker nodes will handle those work units by launching {@link 
GobblinHelixTask}.
  *
  *    See {@link GobblinHelixDistributeJobExecutionLauncher} for planning job 
launcher details.
  *    See {@link GobblinHelixJobTask} for planning job handling details.
@@ -83,7 +86,7 @@ class HelixRetriggeringJobCallable implements Callable {
   private final JobListener jobListener;
   private final Path appWorkDir;
   private final HelixManager helixManager;
-
+  protected HelixJobsMapping jobsMapping;
   private GobblinHelixJobLauncher currentJobLauncher = null;
   private JobExecutionMonitor currentJobMonitor = null;
   private boolean isDistributeJobEnabled = false;
@@ -102,6 +105,9 @@ class HelixRetriggeringJobCallable implements Callable {
     this.appWorkDir = appWorkDir;
     this.helixManager = helixManager;
     this.isDistributeJobEnabled = isDistributeJobEnabled();
+    this.jobsMapping = new 
HelixJobsMapping(ConfigUtils.propertiesToConfig(sysProps),
+                                            
PathUtils.getRootPath(appWorkDir).toUri(),
+                                            appWorkDir.toString());
   }
 
   private boolean isRetriggeringEnabled() {
@@ -121,18 +127,25 @@ class HelixRetriggeringJobCallable implements Callable {
   @Override
   public Void call() throws JobException {
     if (this.isDistributeJobEnabled) {
-      launchJobExecutionLauncherLoop();
+      runJobExecutionLauncher();
     } else {
-      launchJobLauncherLoop();
+      runJobLauncherLoop();
     }
 
     return null;
   }
 
-  private void launchJobLauncherLoop() throws JobException {
+  /**
+   * <p> In some cases, the job launcher will be early stopped.
+   * It can be due to the large volume of input source data.
+   * In such case, we need to re-launch the same job until
+   * the job launcher determines it is safe to stop.
+   */
+  private void runJobLauncherLoop() throws JobException {
     try {
       while (true) {
         currentJobLauncher = this.jobScheduler.buildJobLauncher(jobProps);
+        // in "run once" case, job scheduler will remove current job from the 
scheduler
         boolean isEarlyStopped = this.jobScheduler.runJob(jobProps, 
jobListener, currentJobLauncher);
         boolean isRetriggerEnabled = this.isRetriggeringEnabled();
         if (isEarlyStopped && isRetriggerEnabled) {
@@ -148,47 +161,62 @@ class HelixRetriggeringJobCallable implements Callable {
     }
   }
 
-  private void launchJobExecutionLauncherLoop() throws JobException {
+  /**
+   * <p> Launch a planning job. The actual job will be launched
+   * on task driver instance, which will handle the early-stop case
+   * by a single while-loop.
+   *
+   * @see {@link GobblinHelixJobTask#run()} for the task driver logic.
+   */
+  private void runJobExecutionLauncher() throws JobException {
     try {
-      while (true) {
-        String builderStr = 
jobProps.getProperty(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER,
-            
GobblinHelixDistributeJobExecutionLauncher.Builder.class.getName());
-
-        GobblinHelixDistributeJobExecutionLauncher.Builder builder = 
GobblinConstructorUtils
-            
.<GobblinHelixDistributeJobExecutionLauncher.Builder>invokeLongestConstructor(new
 ClassAliasResolver(
-                
GobblinHelixDistributeJobExecutionLauncher.Builder.class).resolveClass(builderStr));
-
-        // Make a separate copy because we could update some of attributes in 
job properties (like adding planning id).
-        Properties jobPlanningProps = new Properties();
-        jobPlanningProps.putAll(this.jobProps);
-
-        // Inject planning id and start time
-        String planningId = 
JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX
-            + JobState.getJobNameFromProps(jobPlanningProps));
-        
jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, 
planningId);
-        
jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_JOB_CREATE_TIME,
 String.valueOf(System.currentTimeMillis()));
-
-        builder.setSysProps(this.sysProps);
-        builder.setJobPlanningProps(jobPlanningProps);
-        builder.setManager(this.helixManager);
-        builder.setAppWorkDir(this.appWorkDir);
-
-        try (Closer closer = Closer.create()) {
-          GobblinHelixDistributeJobExecutionLauncher launcher = 
builder.build();
-          closer.register(launcher);
-          this.currentJobMonitor = launcher.launchJob(null);
-          ExecutionResult result = this.currentJobMonitor.get();
-          boolean isEarlyStopped = 
((GobblinHelixDistributeJobExecutionLauncher.DistributeJobResult) 
result).isEarlyStopped();
-          boolean isRetriggerEnabled = this.isRetriggeringEnabled();
-          if (isEarlyStopped && isRetriggerEnabled) {
-            log.info("DistributeJob {} will be re-triggered.", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
-          } else {
-            break;
-          }
-          currentJobMonitor = null;
-        } catch (Throwable t) {
-          throw new JobException("Failed to launch and run job " + 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), t);
+      String builderStr = 
jobProps.getProperty(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER,
+          GobblinHelixDistributeJobExecutionLauncher.Builder.class.getName());
+
+      // Check if any existing planning job is running
+      String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
+      Optional<String> planningJobIdFromStore = 
jobsMapping.getPlanningJobId(jobName);
+
+      if (planningJobIdFromStore.isPresent()) {
+        String previousPlanningJobId = planningJobIdFromStore.get();
+        if (HelixUtils.isJobFinished(previousPlanningJobId, 
previousPlanningJobId, this.helixManager)) {
+          log.info("Previous planning job {} has reached to the final state. 
Start a new one.", previousPlanningJobId);
+        } else {
+          log.info("Previous planning job {} has not finished yet. Skip it.", 
previousPlanningJobId);
+          return;
         }
+      } else {
+        log.info("Planning job for {} does not exist. First time run.", 
jobName);
+      }
+
+      GobblinHelixDistributeJobExecutionLauncher.Builder builder = 
GobblinConstructorUtils
+          
.<GobblinHelixDistributeJobExecutionLauncher.Builder>invokeLongestConstructor(new
 ClassAliasResolver(
+              
GobblinHelixDistributeJobExecutionLauncher.Builder.class).resolveClass(builderStr));
+
+      // Make a separate copy because we could update some of attributes in 
job properties (like adding planning id).
+      Properties jobPlanningProps = new Properties();
+      jobPlanningProps.putAll(this.jobProps);
+
+      // Inject planning id and start time
+      String planningId = 
JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX
+          + JobState.getJobNameFromProps(jobPlanningProps));
+      
jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, 
planningId);
+      
jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_JOB_CREATE_TIME,
 String.valueOf(System.currentTimeMillis()));
+
+      builder.setSysProps(this.sysProps);
+      builder.setJobPlanningProps(jobPlanningProps);
+      builder.setManager(this.helixManager);
+      builder.setAppWorkDir(this.appWorkDir);
+
+      try (Closer closer = Closer.create()) {
+        GobblinHelixDistributeJobExecutionLauncher launcher = builder.build();
+        closer.register(launcher);
+        this.jobsMapping.setPlanningJobId(jobName, planningId);
+        this.currentJobMonitor = launcher.launchJob(null);
+        this.currentJobMonitor.get();
+        this.currentJobMonitor = null;
+      } catch (Throwable t) {
+        throw new JobException("Failed to launch and run job " + 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), t);
       }
     } catch (Exception e) {
       log.error("Failed to run job {}", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11a1c46a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 6fbc7c5..29539f0 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -173,6 +173,31 @@ public class HelixUtils {
     throw new TimeoutException("task driver wait time [" + timeoutInSeconds + 
" sec] is expired.");
   }
 
+  static boolean isJobFinished(String workflowName, String jobName, 
HelixManager helixManager) {
+    WorkflowContext workflowContext = 
TaskDriver.getWorkflowContext(helixManager, workflowName);
+    if (workflowContext == null) {
+      // this workflow context doesn't exist, considered as finished.
+      return true;
+    }
+
+    TaskState jobState = 
workflowContext.getJobState(TaskUtil.getNamespacedJobName(workflowName, 
jobName));
+    switch (jobState) {
+      case STOPPED:
+      case FAILED:
+      case COMPLETED:
+      case ABORTED:
+      case TIMED_OUT:
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  static void deleteWorkflow (String workflowName, HelixManager helixManager, 
long timeOut) throws InterruptedException {
+    TaskDriver taskDriver = new TaskDriver(helixManager);
+    taskDriver.deleteAndWaitForCompletion(workflowName, timeOut);
+  }
+
   static void handleJobTimeout(String workFlowName, String jobName, 
HelixManager helixManager, Object jobLauncher,
       JobListener jobListener) throws InterruptedException {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11a1c46a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
index 155304a..2e98cfb 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
@@ -17,9 +17,7 @@
 
 package org.apache.gobblin.cluster;
 
-import java.io.IOException;
 import java.util.Map;
-import java.util.Properties;
 
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
@@ -32,14 +30,14 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alias;
 import org.apache.gobblin.cluster.suite.IntegrationJobFactorySuite;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.runtime.util.StateStores;
-import org.apache.gobblin.source.extractor.partition.Partitioner;
-import org.apache.gobblin.util.PropertiesUtils;
 
 
 public class TaskRunnerSuiteForJobFactoryTest extends 
TaskRunnerSuiteThreadModel {
+
   private TaskFactory testJobFactory;
+
   public 
TaskRunnerSuiteForJobFactoryTest(IntegrationJobFactorySuite.TestJobFactorySuiteBuilder
 builder) {
     super(builder);
     this.testJobFactory = new TestJobFactory(builder, this.metricContext);
@@ -62,7 +60,7 @@ public class TaskRunnerSuiteForJobFactoryTest extends 
TaskRunnerSuiteThreadModel
     @Override
     public Task createNewTask(TaskCallbackContext context) {
       return new TestHelixJobTask(context,
-          stateStores,
+          jobsMapping,
           builder,
           new GobblinHelixJobLauncherMetrics("launcherInJobFactory", 
metricContext, 5),
           new GobblinHelixJobTask.GobblinHelixJobTaskMetrics(metricContext, 
5));
@@ -71,25 +69,16 @@ public class TaskRunnerSuiteForJobFactoryTest extends 
TaskRunnerSuiteThreadModel
 
   public class TestHelixJobTask extends GobblinHelixJobTask {
     public TestHelixJobTask(TaskCallbackContext context,
-                            StateStores stateStores,
+                            HelixJobsMapping jobsMapping,
                             TaskRunnerSuiteBase.Builder builder,
                             GobblinHelixJobLauncherMetrics launcherMetrics,
                             GobblinHelixJobTaskMetrics jobTaskMetrics) {
       super(context,
-            stateStores,
+            jobsMapping,
             builder,
             launcherMetrics,
             jobTaskMetrics);
     }
-
-    //TODO: change below to Helix UserConentStore
-    protected void setResultToUserContent(Map<String, String> keyValues) 
throws IOException {
-      Map<String, String> customizedKVs = Maps.newHashMap(keyValues);
-      customizedKVs.put("customizedKey_1", "customizedVal_1");
-      customizedKVs.put("customizedKey_2", "customizedVal_2");
-      customizedKVs.put("customizedKey_3", "customizedVal_3");
-      super.setResultToUserContent(customizedKVs);
-    }
   }
 
   @Slf4j
@@ -99,21 +88,23 @@ public class TaskRunnerSuiteForJobFactoryTest extends 
TaskRunnerSuiteThreadModel
       super(builder);
     }
 
-    //TODO: change below to Helix UserConentStore
     protected DistributeJobResult getResultFromUserContent() {
       DistributeJobResult rst = super.getResultFromUserContent();
-      Properties properties = rst.getProperties().get();
-      Assert.assertTrue(properties.containsKey(Partitioner.IS_EARLY_STOPPED));
-      Assert.assertFalse(PropertiesUtils.getPropAsBoolean(properties, 
Partitioner.IS_EARLY_STOPPED, "false"));
+      Assert.assertTrue(!rst.isEarlyStopped());
+      String jobName = 
this.jobPlanningProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
+      String planningJobId = 
this.jobPlanningProps.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY);
 
-      
Assert.assertTrue(properties.getProperty("customizedKey_1").equals("customizedVal_1"));
-      
Assert.assertTrue(properties.getProperty("customizedKey_2").equals("customizedVal_2"));
-      
Assert.assertTrue(properties.getProperty("customizedKey_3").equals("customizedVal_3"));
+      try {
+        String planningJobFromStore = 
this.jobsMapping.getPlanningJobId(jobName).get();
+        Assert.assertTrue(planningJobFromStore.equals(planningJobId));
+
+      } catch (Exception e) {
+        Assert.fail(e.toString());
+      }
       IntegrationJobFactorySuite.completed.set(true);
       return rst;
     }
 
-
     @Alias("TestDistributedExecutionLauncherBuilder")
     public static class Builder extends 
GobblinHelixDistributeJobExecutionLauncher.Builder {
       public TestDistributedExecutionLauncher build() throws Exception {

Reply via email to