Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 92f4255bf -> 754b06696


[GOBBLIN-661] Prevent jobs resubmission after manager failure

Closes #2532 from kyuamazon/catalog


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/754b0669
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/754b0669
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/754b0669

Branch: refs/heads/master
Commit: 754b066960e9cb1b9fd4c93f2238d3efc056b550
Parents: 92f4255
Author: Kuai Yu <[email protected]>
Authored: Mon Jan 14 11:04:06 2019 -0800
Committer: Hung Tran <[email protected]>
Committed: Mon Jan 14 11:04:06 2019 -0800

----------------------------------------------------------------------
 .../GobblinClusterConfigurationKeys.java        |  11 +
 ...blinHelixDistributeJobExecutionLauncher.java |  13 +-
 .../cluster/GobblinHelixJobScheduler.java       |  84 ++++----
 .../gobblin/cluster/GobblinHelixJobTask.java    |  31 ++-
 .../cluster/GobblinHelixMultiManager.java       |  49 +++--
 .../GobblinHelixPlanningJobLauncherMetrics.java |  23 ++-
 .../gobblin/cluster/HelixJobsMapping.java       |  30 ++-
 .../cluster/HelixRetriggeringJobCallable.java   | 202 +++++++++++++------
 .../org/apache/gobblin/cluster/HelixUtils.java  |  38 ++--
 .../TaskRunnerSuiteForJobFactoryTest.java       |   6 +-
 .../job_catalog/NonObservingFSJobCatalog.java   |   3 +-
 11 files changed, 327 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/754b0669/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 6791d52..faeaf17 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -90,6 +90,9 @@ public class GobblinClusterConfigurationKeys {
   public static final String PLANNING_ID_KEY = PLANNING_CONF_PREFIX + "idKey";
   public static final String PLANNING_JOB_CREATE_TIME = PLANNING_CONF_PREFIX + 
"createTime";
 
+  // Actual job properties
+  public static final String ACTUAL_JOB_NAME_PREFIX = "ActualJob";
+
   // job spec operation
   public static final String JOB_ALWAYS_DELETE = GOBBLIN_CLUSTER_PREFIX + 
"job.alwaysDelete";
 
@@ -114,6 +117,8 @@ public class GobblinClusterConfigurationKeys {
   public static final String JOB_CONFIGURATION_MANAGER_KEY = 
GOBBLIN_CLUSTER_PREFIX + "job.configuration.manager";
 
   public static final String JOB_SPEC_REFRESH_INTERVAL = 
GOBBLIN_CLUSTER_PREFIX + "job.spec.refresh.interval";
+  public static final String JOB_SPEC_URI = GOBBLIN_CLUSTER_PREFIX + 
"job.spec.uri";
+
   public static final String SPEC_CONSUMER_CLASS_KEY = GOBBLIN_CLUSTER_PREFIX 
+ "specConsumer.class";
   public static final String DEFAULT_SPEC_CONSUMER_CLASS =
       "org.apache.gobblin.service.SimpleKafkaSpecConsumer";
@@ -143,4 +148,10 @@ public class GobblinClusterConfigurationKeys {
   public static final String HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS = 
GOBBLIN_CLUSTER_PREFIX + "workflowDeleteTimeoutSeconds";
   public static final long DEFAULT_HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS = 300;
 
+  public static final String CLEAN_ALL_DIST_JOBS = GOBBLIN_CLUSTER_PREFIX + 
"bootup.clean.dist.jobs";
+  public static final boolean DEFAULT_CLEAN_ALL_DIST_JOBS = false;
+
+  public static final String KILL_DUPLICATE_PLANNING_JOB = 
GOBBLIN_CLUSTER_PREFIX + "kill.duplicate.planningJob";
+  public static final boolean DEFAULT_KILL_DUPLICATE_PLANNING_JOB = true;
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/754b0669/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index 9424ca8..b14b700 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -122,16 +122,12 @@ class GobblinHelixDistributeJobExecutionLauncher 
implements JobExecutionLauncher
     Config combined = ConfigUtils.propertiesToConfig(jobPlanningProps)
         .withFallback(ConfigUtils.propertiesToConfig(sysProps));
 
-    this.jobsMapping = new HelixJobsMapping(combined,
-                                            
PathUtils.getRootPath(builder.appWorkDir).toUri(),
-                                            builder.appWorkDir.toString());
-
     this.workFlowExpiryTimeSeconds = ConfigUtils.getLong(combined,
         GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
         
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS);
     this.planningJobLauncherMetrics = builder.planningJobLauncherMetrics;
     this.helixMetrics = builder.helixMetrics;
-
+    this.jobsMapping = builder.jobsMapping;
     this.helixJobStopTimeoutSeconds = ConfigUtils.getLong(combined,
         GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS,
         
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS);
@@ -172,6 +168,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher
     Path appWorkDir;
     GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics;
     GobblinHelixMetrics helixMetrics;
+    HelixJobsMapping jobsMapping;
     public GobblinHelixDistributeJobExecutionLauncher build() throws Exception 
{
       return new GobblinHelixDistributeJobExecutionLauncher(this);
     }
@@ -288,9 +285,11 @@ class GobblinHelixDistributeJobExecutionLauncher 
implements JobExecutionLauncher
   }
 
   private DistributeJobResult waitForJobCompletion(String workFlowName, String 
jobName) throws InterruptedException {
-    boolean timeoutEnabled = 
Boolean.parseBoolean(this.jobPlanningProps.getProperty(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
+    boolean timeoutEnabled = 
Boolean.parseBoolean(this.jobPlanningProps.getProperty(
+        GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
         GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED));
-    long timeoutInSeconds = 
Long.parseLong(this.jobPlanningProps.getProperty(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
+    long timeoutInSeconds = Long.parseLong(this.jobPlanningProps.getProperty(
+        GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
         GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS));
 
     try {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/754b0669/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 9cf757c..79b088b 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -17,8 +17,6 @@
 
 package org.apache.gobblin.cluster;
 
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
@@ -28,6 +26,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Lock;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
@@ -38,6 +37,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.Striped;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent;
@@ -54,6 +54,7 @@ import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.scheduler.JobScheduler;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.PropertiesUtils;
 
 
@@ -91,6 +92,8 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
   final GobblinHelixJobSchedulerMetrics jobSchedulerMetrics;
   final GobblinHelixJobLauncherMetrics launcherMetrics;
   final GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics;
+  final HelixJobsMapping jobsMapping;
+  final Striped<Lock> locks = Striped.lazyWeakLock(256);
 
   private boolean startServicesCompleted;
 
@@ -125,9 +128,13 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
                                                                    
this.metricContext,
                                                                    
metricsWindowSizeInMin);
 
+    this.jobsMapping = new 
HelixJobsMapping(ConfigUtils.propertiesToConfig(properties),
+        PathUtils.getRootPath(appWorkDir).toUri(),
+        appWorkDir.toString());
+
     this.planningJobLauncherMetrics = new 
GobblinHelixPlanningJobLauncherMetrics("planningLauncherInScheduler",
                                                                           
this.metricContext,
-                                                                          
metricsWindowSizeInMin);
+                                                                          
metricsWindowSizeInMin, this.jobsMapping);
 
     this.helixMetrics = new GobblinHelixMetrics("helixMetricsInJobScheduler",
                                                   this.metricContext,
@@ -171,11 +178,24 @@ public class GobblinHelixJobScheduler extends 
JobScheduler implements StandardMe
 
   @Override
   protected void startServices() throws Exception {
+
+    boolean cleanAllDistJobs = 
PropertiesUtils.getPropAsBoolean(this.properties,
+        GobblinClusterConfigurationKeys.CLEAN_ALL_DIST_JOBS,
+        
String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_CLEAN_ALL_DIST_JOBS));
+
+    if (cleanAllDistJobs) {
+      for (org.apache.gobblin.configuration.State state : 
this.jobsMapping.getAllStates()) {
+        String jobName = state.getId();
+        LOGGER.info("Delete mapping for job " + jobName);
+        this.jobsMapping.deleteMapping(jobName);
+      }
+    }
   }
 
   @Override
   public void runJob(Properties jobProps, JobListener jobListener) throws 
JobException {
     new HelixRetriggeringJobCallable(this,
+        this.jobCatalog,
         this.properties,
         jobProps,
         jobListener,
@@ -183,7 +203,9 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
         this.helixMetrics,
         this.appWorkDir,
         this.jobHelixManager,
-        this.taskDriverHelixManager).call();
+        this.taskDriverHelixManager,
+        this.jobsMapping,
+        this.locks).call();
   }
 
   @Override
@@ -203,6 +225,7 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
 
   public Future<?> scheduleJobImmediately(Properties jobProps, JobListener 
jobListener) {
     HelixRetriggeringJobCallable retriggeringJob = new 
HelixRetriggeringJobCallable(this,
+        this.jobCatalog,
         this.properties,
         jobProps,
         jobListener,
@@ -210,7 +233,9 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
         this.helixMetrics,
         this.appWorkDir,
         this.jobHelixManager,
-        this.taskDriverHelixManager);
+        this.taskDriverHelixManager,
+        this.jobsMapping,
+        this.locks);
 
     final Future<?> future = this.jobExecutor.submit(retriggeringJob);
     return new Future() {
@@ -256,24 +281,28 @@ public class GobblinHelixJobScheduler extends 
JobScheduler implements StandardMe
 
   @Subscribe
   public void handleNewJobConfigArrival(NewJobConfigArrivalEvent 
newJobArrival) {
-    LOGGER.info("Received new job configuration of job " + 
newJobArrival.getJobName());
+    String jobUri = newJobArrival.getJobName();
+    LOGGER.info("Received new job configuration of job " + jobUri);
     try {
       Properties jobProps = new Properties();
       jobProps.putAll(newJobArrival.getJobConfig());
 
+      // set uri so that we can delete it later
+      jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, 
jobUri);
+
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
 
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
-        LOGGER.info("Scheduling job " + newJobArrival.getJobName());
+        LOGGER.info("Scheduling job " + jobUri);
         scheduleJob(jobProps,
                     new GobblinHelixJobLauncherListener(this.launcherMetrics));
       } else {
-        LOGGER.info("No job schedule found, so running job " + 
newJobArrival.getJobName());
-        this.jobExecutor.execute(new 
NonScheduledJobRunner(newJobArrival.getJobName(), jobProps,
+        LOGGER.info("No job schedule found, so running job " + jobUri);
+        this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
                                  new 
GobblinHelixJobLauncherListener(this.launcherMetrics)));
       }
     } catch (JobException je) {
-      LOGGER.error("Failed to schedule or run job " + 
newJobArrival.getJobName(), je);
+      LOGGER.error("Failed to schedule or run job " + jobUri, je);
     }
   }
 
@@ -308,59 +337,26 @@ public class GobblinHelixJobScheduler extends 
JobScheduler implements StandardMe
    * This class is responsible for running non-scheduled jobs.
    */
   class NonScheduledJobRunner implements Runnable {
-
-    private final String jobUri;
     private final Properties jobProps;
     private final GobblinHelixJobLauncherListener jobListener;
     private final Long creationTimeInMillis;
 
-    public NonScheduledJobRunner(String jobUri,
-                                 Properties jobProps,
+    public NonScheduledJobRunner(Properties jobProps,
                                  GobblinHelixJobLauncherListener jobListener) {
 
-      this.jobUri = jobUri;
       this.jobProps = jobProps;
       this.jobListener = jobListener;
       this.creationTimeInMillis = System.currentTimeMillis();
     }
 
-    private void deleteJobSpec(boolean alwaysDelete, boolean isDeleted) {
-      if (alwaysDelete && !isDeleted) {
-        try {
-          GobblinHelixJobScheduler.this.jobCatalog.remove(new URI(jobUri));
-        } catch (URISyntaxException e) {
-          LOGGER.error("Always delete " + jobUri + ". Failed to remove job 
with bad uri " + jobUri, e);
-        }
-      }
-    }
-
     @Override
     public void run() {
-      boolean alwaysDelete = PropertiesUtils.getPropAsBoolean(this.jobProps,
-                                                              
GobblinClusterConfigurationKeys.JOB_ALWAYS_DELETE,
-                                                              "false");
-      boolean isDeleted = false;
-
       try {
         
GobblinHelixJobScheduler.this.jobSchedulerMetrics.updateTimeBeforeJobLaunching(this.jobProps);
         
GobblinHelixJobScheduler.this.jobSchedulerMetrics.updateTimeBetweenJobSchedulingAndJobLaunching(this.creationTimeInMillis,
 System.currentTimeMillis());
         GobblinHelixJobScheduler.this.runJob(this.jobProps, this.jobListener);
-
-        // remove non-scheduled job catalog once done so it won't be 
re-executed
-        if (GobblinHelixJobScheduler.this.jobCatalog != null) {
-          try {
-            GobblinHelixJobScheduler.this.jobCatalog.remove(new URI(jobUri));
-            isDeleted = true;
-          } catch (URISyntaxException e) {
-            LOGGER.error("Failed to remove job with bad uri " + jobUri, e);
-          }
-        }
       } catch (JobException je) {
-        deleteJobSpec(alwaysDelete, isDeleted);
         LOGGER.error("Failed to run job " + 
this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
-      } catch (Exception e) {
-        deleteJobSpec(alwaysDelete, isDeleted);
-        throw e;
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/754b0669/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
index ff61ea6..aa2d7fa 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
@@ -65,6 +65,7 @@ class GobblinHelixJobTask implements Task {
   private final String planningJobId;
   private final HelixManager jobHelixManager;
   private final Path appWorkDir;
+  private final String jobName;
   private final List<? extends Tag<?>> metadataTags;
   private GobblinHelixJobLauncher launcher;
   private GobblinHelixJobTaskMetrics jobTaskMetrics;
@@ -99,6 +100,7 @@ class GobblinHelixJobTask implements Task {
       throw new RuntimeException("Job doesn't have planning ID");
     }
 
+    this.jobName = 
jobPlusSysConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY);
     this.planningJobId = 
jobPlusSysConfig.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY);
     this.jobsMapping = jobsMapping;
     this.appWorkDir = builder.getAppWorkPath();
@@ -147,8 +149,6 @@ class GobblinHelixJobTask implements Task {
     
this.jobTaskMetrics.updateTimeBetweenJobSubmissionAndExecution(this.jobPlusSysConfig);
 
     try (Closer closer = Closer.create()) {
-      String jobName = 
jobPlusSysConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY);
-
       Optional<String> planningIdFromStateStore = 
this.jobsMapping.getPlanningJobId(jobName);
 
       long timeOut = PropertiesUtils.getPropAsLong(jobPlusSysConfig,
@@ -171,14 +171,19 @@ class GobblinHelixJobTask implements Task {
             try {
               HelixUtils.deleteWorkflow(previousActualJobId, 
this.jobHelixManager, timeOut);
             } catch (HelixException e) {
-              log.error("Helix cannot delete previous actual job id {} within 
5 min.", previousActualJobId);
+              log.error("Helix cannot delete previous actual job id {} within 
{} seconds.", previousActualJobId, timeOut / 1000);
               return new TaskResult(TaskResult.Status.FAILED, 
ExceptionUtils.getFullStackTrace(e));
             }
           }
         } else {
-          log.info("Actual job {} does not exist. First time run.", 
this.planningJobId);
+          log.info("No previous actual job [plan: {}]. First time run.", 
this.planningJobId);
         }
 
+        String actualJobId = 
HelixJobsMapping.createActualJobId(jobPlusSysConfig);
+        log.info("Planning job {} creates actual job {}", this.planningJobId, 
actualJobId);
+
+        this.jobPlusSysConfig.setProperty(ConfigurationKeys.JOB_ID_KEY, 
actualJobId);
+
         this.launcher = createJobLauncher();
 
         this.jobsMapping.setActualJobId(jobName, this.planningJobId, 
this.launcher.getJobId());
@@ -191,13 +196,20 @@ class GobblinHelixJobTask implements Task {
           log.info("Planning job {} has more runs due to early stop.", 
this.planningJobId);
         }
       }
+      log.info("Completing planning job {}", this.planningJobId);
+      return new TaskResult(TaskResult.Status.COMPLETED, "");
     } catch (Exception e) {
       log.info("Failing planning job {}", this.planningJobId);
       return new TaskResult(TaskResult.Status.FAILED, "Exception occurred for 
job " + planningJobId + ":" + ExceptionUtils
           .getFullStackTrace(e));
+    } finally {
+      // always cleanup the job mapping for current job name.
+      try {
+        this.jobsMapping.deleteMapping(jobName);
+      } catch (Exception e) {
+        return new TaskResult(TaskResult.Status.FAILED,"Cannot delete jobs 
mapping for job : " + jobName);
+      }
     }
-    log.info("Completing planning job {}", this.planningJobId);
-    return new TaskResult(TaskResult.Status.COMPLETED, "");
   }
 
   @Override
@@ -208,6 +220,13 @@ class GobblinHelixJobTask implements Task {
         launcher.cancelJob(this.jobLauncherListener);
       } catch (JobException e) {
         throw new RuntimeException("Unable to cancel planning job " + 
this.planningJobId + ": ", e);
+      } finally {
+        // always cleanup the job mapping for current job name.
+        try {
+          this.jobsMapping.deleteMapping(jobName);
+        } catch (Exception e) {
+          throw new RuntimeException("Cannot delete jobs mapping for job : " + 
jobName);
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/754b0669/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
index 81ed7ce..03ae69f 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
@@ -205,7 +205,6 @@ public class GobblinHelixMultiManager implements 
StandardMetricsBridge {
                 InstanceType.CONTROLLER));
       }
 
-      // This will creat a dedicated controller for planning job distribution
       if (this.dedicatedTaskDriverCluster) {
         // This will create a Helix administrator to dispatch jobs to ZooKeeper
         this.taskDriverHelixManager = 
Optional.of(buildHelixManager(this.config,
@@ -218,6 +217,7 @@ public class GobblinHelixMultiManager implements 
StandardMetricsBridge {
             
GobblinClusterConfigurationKeys.DEDICATED_TASK_DRIVER_CLUSTER_CONTROLLER_ENABLED,
             true);
 
+        // This will creat a dedicated controller for planning job distribution
         if (this.dedicatedTaskDriverClusterController) {
           this.taskDriverClusterController = 
Optional.of(GobblinHelixMultiManager
               .buildHelixManager(this.config, zkConnectionString, 
GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY,
@@ -339,20 +339,10 @@ public class GobblinHelixMultiManager implements 
StandardMetricsBridge {
       if (!isLeader) {
         log.info("New Helix Controller leader {}", 
this.managerClusterHelixManager.getInstanceName());
 
-        // Clean up existing jobs
-        TaskDriver taskDriver = new TaskDriver(this.jobClusterHelixManager);
-        Map<String, WorkflowConfig> workflows = taskDriver.getWorkflows();
+        cleanUpJobs(this.jobClusterHelixManager);
 
-        for (Map.Entry<String, WorkflowConfig> entry : workflows.entrySet()) {
-          String queueName = entry.getKey();
-          WorkflowConfig workflowConfig = entry.getValue();
-
-          // request delete if not already requested
-          if (workflowConfig.getTargetState() != TargetState.DELETE) {
-            taskDriver.delete(queueName);
-
-            log.info("Requested delete of queue {}", queueName);
-          }
+        if (this.taskDriverHelixManager.isPresent()) {
+          cleanUpJobs(this.taskDriverHelixManager.get());
         }
 
         for (LeadershipChangeAwareComponent c: 
this.leadershipChangeAwareComponents) {
@@ -373,6 +363,37 @@ public class GobblinHelixMultiManager implements 
StandardMetricsBridge {
     }
   }
 
+  private void cleanUpJobs(HelixManager helixManager) {
+    // Clean up existing jobs
+    TaskDriver taskDriver = new TaskDriver(helixManager);
+
+    Map<String, WorkflowConfig> workflows = taskDriver.getWorkflows();
+
+    boolean cleanupDistJobs = ConfigUtils.getBoolean(this.config,
+        GobblinClusterConfigurationKeys.CLEAN_ALL_DIST_JOBS,
+        GobblinClusterConfigurationKeys.DEFAULT_CLEAN_ALL_DIST_JOBS);
+
+    for (Map.Entry<String, WorkflowConfig> entry : workflows.entrySet()) {
+      String workflowName = entry.getKey();
+
+      if 
(workflowName.contains(GobblinClusterConfigurationKeys.PLANNING_CONF_PREFIX)
+          || 
workflowName.contains(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX)) {
+        if (!cleanupDistJobs) {
+          log.info("Distributed job {} won't be deleted.", workflowName);
+          continue;
+        }
+      }
+
+      WorkflowConfig workflowConfig = entry.getValue();
+
+      // request delete if not already requested
+      if (workflowConfig.getTargetState() != TargetState.DELETE) {
+        taskDriver.delete(workflowName);
+
+        log.info("Requested delete of workflowName {}", workflowName);
+      }
+    }
+  }
 
   /**
    * A custom {@link MessageHandlerFactory} for {@link MessageHandler}s that 
handle messages of type

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/754b0669/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java
index 593e72d..8758c9b 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java
@@ -17,34 +17,47 @@
 
 package org.apache.gobblin.cluster;
 
+import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.ContextAwareTimer;
 import org.apache.gobblin.metrics.MetricContext;
 
 
 public class GobblinHelixPlanningJobLauncherMetrics extends 
StandardMetricsBridge.StandardMetrics {
   private final String metricsName;
-
+  private final HelixJobsMapping jobsMapping;
   public static final String TIMER_FOR_COMPLETED_PLANNING_JOBS = 
"timeForCompletedPlanningJobs";
   public static final String TIMER_FOR_FAILED_PLANNING_JOBS = 
"timeForFailedPlanningJobs";
-
+  public static final String NUM_ACTIVE_PLANNING_JOBS = 
"numActivePlanningJobs";
   final ContextAwareTimer timeForCompletedPlanningJobs;
   final ContextAwareTimer timeForFailedPlanningJobs;
+  final ContextAwareGauge<Integer> numActivePlanningJobs;
 
   public GobblinHelixPlanningJobLauncherMetrics(String metricsName,
       final MetricContext metricContext,
-      int windowSizeInMin) {
+      int windowSizeInMin,
+      HelixJobsMapping jobsMapping) {
 
     this.metricsName = metricsName;
-
+    this.jobsMapping = jobsMapping;
     this.timeForCompletedPlanningJobs = 
metricContext.contextAwareTimer(TIMER_FOR_COMPLETED_PLANNING_JOBS, 
windowSizeInMin, TimeUnit.MINUTES);
     this.timeForFailedPlanningJobs = 
metricContext.contextAwareTimer(TIMER_FOR_FAILED_PLANNING_JOBS, 
windowSizeInMin, TimeUnit.MINUTES);
-
+    this.numActivePlanningJobs =  
metricContext.newContextAwareGauge(NUM_ACTIVE_PLANNING_JOBS, 
()->getNumOfMappings());
     this.contextAwareMetrics.add(timeForCompletedPlanningJobs);
     this.contextAwareMetrics.add(timeForFailedPlanningJobs);
+    this.contextAwareMetrics.add(numActivePlanningJobs);
+  }
+
+  private int getNumOfMappings() {
+    try {
+      return this.jobsMapping.getAllStates().size();
+    } catch (IOException e) {
+      return 0;
+    }
   }
 
   public void updateTimeForCompletedPlanningJobs(long startTime) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/754b0669/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
index 8c2a836..8124ba1 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
@@ -19,7 +19,9 @@ package org.apache.gobblin.cluster;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.List;
 import java.util.Optional;
+import java.util.Properties;
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
@@ -30,8 +32,10 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metastore.FsStateStore;
 import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JobLauncherUtils;
 
 
 /**
@@ -58,8 +62,8 @@ public class HelixJobsMapping {
 
   public static final String DISTRIBUTED_STATE_STORE_NAME_KEY = 
"jobs.mapping.distributed.state.store.name";
   public static final String DEFAULT_DISTRIBUTED_STATE_STORE_NAME = 
"distributedState";
- 
-  private StateStore stateStore;
+
+  private StateStore<State> stateStore;
   private String distributedStateStoreName;
 
   public HelixJobsMapping(Config sysConfig, URI fsUri, String rootDir) {
@@ -94,6 +98,16 @@ public class HelixJobsMapping {
     this.stateStore = stateStoreFactory.createStateStore(stateStoreJobConfig, 
State.class);
   }
 
+  public static String createPlanningJobId (Properties jobPlanningProps) {
+    return 
JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX
+        + JobState.getJobNameFromProps(jobPlanningProps));
+  }
+
+  public static String createActualJobId (Properties jobProps) {
+    return 
JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX
+        + JobState.getJobNameFromProps(jobProps));
+  }
+
   @Nullable
   private State getOrCreate (String storeName, String jobName) throws 
IOException {
     if (this.stateStore.exists(storeName, jobName)) {
@@ -102,8 +116,8 @@ public class HelixJobsMapping {
     return new State();
   }
 
-  private void delete (String storeName, String jobName) throws IOException {
-    this.stateStore.delete(storeName, jobName);
+  public void deleteMapping (String jobName) throws IOException {
+    this.stateStore.delete(this.distributedStateStoreName, jobName);
   }
 
   public void setPlanningJobId (String jobName, String planningJobId) throws 
IOException {
@@ -112,7 +126,7 @@ public class HelixJobsMapping {
     state.setProp(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, 
planningJobId);
     // fs state store use hdfs rename, which assumes the target file doesn't 
exist.
     if (stateStore instanceof FsStateStore) {
-      this.delete(distributedStateStoreName, jobName);
+      this.deleteMapping(jobName);
     }
     this.stateStore.put(distributedStateStoreName, jobName, state);
   }
@@ -124,7 +138,7 @@ public class HelixJobsMapping {
     state.setProp(ConfigurationKeys.JOB_ID_KEY, actualJobId);
     // fs state store use hdfs rename, which assumes the target file doesn't 
exist.
     if (stateStore instanceof FsStateStore) {
-      this.delete(distributedStateStoreName, jobName);
+      this.deleteMapping(jobName);
     }
     this.stateStore.put(distributedStateStoreName, jobName, state);
   }
@@ -140,6 +154,10 @@ public class HelixJobsMapping {
     return id == null? Optional.empty() : Optional.of(id);
   }
 
+  public List<State> getAllStates() throws IOException {
+    return this.stateStore.getAll(distributedStateStoreName);
+  }
+
   public Optional<String> getActualJobId (String jobName) throws IOException {
     return getId(jobName, ConfigurationKeys.JOB_ID_KEY);
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/754b0669/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index f563198..5223c71 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -17,29 +17,30 @@
 
 package org.apache.gobblin.cluster;
 
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 
 import com.google.common.io.Closer;
+import com.google.common.util.concurrent.Striped;
 
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.runtime.JobException;
-import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.api.JobExecutionMonitor;
+import org.apache.gobblin.runtime.api.MutableJobCatalog;
 import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.util.ClassAliasResolver;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.JobLauncherUtils;
-import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.PropertiesUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
@@ -83,6 +84,7 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 @Alpha
 class HelixRetriggeringJobCallable implements Callable {
   private final GobblinHelixJobScheduler jobScheduler;
+  private final MutableJobCatalog jobCatalog;
   private final Properties sysProps;
   private final Properties jobProps;
   private final JobListener jobListener;
@@ -95,9 +97,13 @@ class HelixRetriggeringJobCallable implements Callable {
   private GobblinHelixJobLauncher currentJobLauncher = null;
   private JobExecutionMonitor currentJobMonitor = null;
   private boolean isDistributeJobEnabled = false;
+  private final String jobUri;
+  private boolean jobDeleteAttempted = false;
+  private final Striped<Lock> locks;
 
   public HelixRetriggeringJobCallable(
       GobblinHelixJobScheduler jobScheduler,
+      MutableJobCatalog jobCatalog,
       Properties sysProps,
       Properties jobProps,
       JobListener jobListener,
@@ -105,8 +111,11 @@ class HelixRetriggeringJobCallable implements Callable {
       GobblinHelixMetrics helixMetrics,
       Path appWorkDir,
       HelixManager jobHelixManager,
-      Optional<HelixManager> taskDriverHelixManager) {
+      Optional<HelixManager> taskDriverHelixManager,
+      HelixJobsMapping jobsMapping,
+      Striped<Lock> locks) {
     this.jobScheduler = jobScheduler;
+    this.jobCatalog = jobCatalog;
     this.sysProps = sysProps;
     this.jobProps = jobProps;
     this.jobListener = jobListener;
@@ -116,9 +125,9 @@ class HelixRetriggeringJobCallable implements Callable {
     this.jobHelixManager = jobHelixManager;
     this.taskDriverHelixManager = taskDriverHelixManager;
     this.isDistributeJobEnabled = isDistributeJobEnabled();
-    this.jobsMapping = new 
HelixJobsMapping(ConfigUtils.propertiesToConfig(sysProps),
-                                            
PathUtils.getRootPath(appWorkDir).toUri(),
-                                            appWorkDir.toString());
+    this.jobUri = 
jobProps.getProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI);
+    this.jobsMapping = jobsMapping;
+    this.locks = locks;
   }
 
   private boolean isRetriggeringEnabled() {
@@ -137,15 +146,47 @@ class HelixRetriggeringJobCallable implements Callable {
 
   @Override
   public Void call() throws JobException {
-    if (this.isDistributeJobEnabled) {
-      runJobExecutionLauncher();
-    } else {
-      runJobLauncherLoop();
+    boolean deleteJobWhenException = 
PropertiesUtils.getPropAsBoolean(this.jobProps,
+        GobblinClusterConfigurationKeys.JOB_ALWAYS_DELETE,
+        "false");
+
+    try {
+      if (this.isDistributeJobEnabled) {
+        runJobExecutionLauncher();
+      } else {
+        runJobLauncherLoop();
+      }
+
+      deleteJobSpec();
+    } catch (Exception e) { // delete job spec when exception occurred
+      if (deleteJobWhenException) {
+        deleteJobSpec();
+      }
+      throw e;
     }
 
     return null;
   }
 
+  private void deleteJobSpec() throws JobException {
+    boolean runOnce = 
Boolean.valueOf(jobProps.getProperty(ConfigurationKeys.JOB_RUN_ONCE_KEY, 
"false"));
+    boolean hasSchedule = 
jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY);
+    if (runOnce || !hasSchedule) {
+      if (this.jobCatalog != null) {
+        try {
+          if (!this.jobDeleteAttempted) {
+            log.info("Deleting job spec on {}", this.jobUri);
+            this.jobScheduler.unscheduleJob(this.jobUri);
+            this.jobCatalog.remove(new URI(jobUri));
+            this.jobDeleteAttempted = true;
+          }
+        } catch (URISyntaxException e) {
+          log.error("Failed to remove job with bad uri " + jobUri, e);
+        }
+      }
+    }
+  }
+
   /**
    * <p> In some cases, the job launcher will be early stopped.
    * It can be due to the large volume of input source data.
@@ -181,74 +222,111 @@ class HelixRetriggeringJobCallable implements Callable {
    */
   private void runJobExecutionLauncher() throws JobException {
     long startTime = 0;
+    String newPlanningId;
+    String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
+    Closer closer = Closer.create();
     try {
+      HelixManager planningJobManager = 
this.taskDriverHelixManager.isPresent()?
+          this.taskDriverHelixManager.get() : this.jobHelixManager;
+
       String builderStr = 
jobProps.getProperty(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER,
           GobblinHelixDistributeJobExecutionLauncher.Builder.class.getName());
 
       // Check if any existing planning job is running
-      String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
       Optional<String> planningJobIdFromStore = 
jobsMapping.getPlanningJobId(jobName);
 
-      if (planningJobIdFromStore.isPresent()) {
-        String previousPlanningJobId = planningJobIdFromStore.get();
-        HelixManager planningJobManager = 
this.taskDriverHelixManager.isPresent()?
-            this.taskDriverHelixManager.get() : this.jobHelixManager;
+      // start of critical section to check if a job with same job name is 
running
+      Lock jobLock = locks.get(jobName);
+      jobLock.lock();
+
+      try {
+        if (planningJobIdFromStore.isPresent()) {
+          String previousPlanningJobId = planningJobIdFromStore.get();
 
-        if (HelixUtils.isJobFinished(previousPlanningJobId, 
previousPlanningJobId, planningJobManager)) {
-          log.info("Previous planning job {} has reached to the final state. 
Start a new one.", previousPlanningJobId);
+          if (HelixUtils.isJobFinished(previousPlanningJobId, 
previousPlanningJobId, planningJobManager)) {
+            log.info("Previous planning job {} has reached to the final state. 
Start a new one.", previousPlanningJobId);
+          } else {
+            boolean killDuplicateJob = PropertiesUtils
+                .getPropAsBoolean(this.jobProps, 
GobblinClusterConfigurationKeys.KILL_DUPLICATE_PLANNING_JOB, 
String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_KILL_DUPLICATE_PLANNING_JOB));
+
+            if (!killDuplicateJob) {
+              log.info("Previous planning job {} has not finished yet. Skip 
this job.", previousPlanningJobId);
+              return;
+            } else {
+              log.info("Previous planning job {} has not finished yet. Kill 
it.", previousPlanningJobId);
+              long timeOut = PropertiesUtils.getPropAsLong(sysProps, 
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS,
+                  
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS) 
* 1000;
+              try {
+                HelixUtils.deleteWorkflow(previousPlanningJobId, 
planningJobManager, timeOut);
+              } catch (HelixException e) {
+                log.info("Helix cannot delete previous planning job id {} 
within {} seconds.", previousPlanningJobId,
+                    timeOut / 1000);
+                throw new JobException("Helix cannot delete previous planning 
job id " + previousPlanningJobId, e);
+              }
+            }
+          }
         } else {
-          log.info("Previous planning job {} has not finished yet. Skip it.", 
previousPlanningJobId);
-          return;
+          log.info("Planning job for {} does not exist. First time run.", 
jobName);
         }
-      } else {
-        log.info("Planning job for {} does not exist. First time run.", 
jobName);
-      }
 
-      GobblinHelixDistributeJobExecutionLauncher.Builder builder = 
GobblinConstructorUtils
-          
.<GobblinHelixDistributeJobExecutionLauncher.Builder>invokeLongestConstructor(new
 ClassAliasResolver(
-              
GobblinHelixDistributeJobExecutionLauncher.Builder.class).resolveClass(builderStr));
-
-      // Make a separate copy because we could update some of attributes in 
job properties (like adding planning id).
-      Properties jobPlanningProps = new Properties();
-      jobPlanningProps.putAll(this.jobProps);
-
-      // Inject planning id and start time
-      String planningId = 
JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX
-          + JobState.getJobNameFromProps(jobPlanningProps));
-      
jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, 
planningId);
-      
jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_JOB_CREATE_TIME,
 String.valueOf(System.currentTimeMillis()));
-
-      builder.setSysProps(this.sysProps);
-      builder.setJobPlanningProps(jobPlanningProps);
-      builder.setJobHelixManager(this.jobHelixManager);
-      builder.setTaskDriverHelixManager(this.taskDriverHelixManager);
-      builder.setAppWorkDir(this.appWorkDir);
-      builder.setPlanningJobLauncherMetrics(this.planningJobLauncherMetrics);
-      builder.setHelixMetrics(this.helixMetrics);
-
-      try (Closer closer = Closer.create()) {
-        log.info("Planning job {} started.", planningId);
+        GobblinHelixDistributeJobExecutionLauncher.Builder builder = 
GobblinConstructorUtils.<GobblinHelixDistributeJobExecutionLauncher.Builder>invokeLongestConstructor(
+            new 
ClassAliasResolver(GobblinHelixDistributeJobExecutionLauncher.Builder.class).resolveClass(builderStr));
+
+        // Make a separate copy because we could update some of attributes in 
job properties (like adding planning id).
+        Properties jobPlanningProps = new Properties();
+        jobPlanningProps.putAll(this.jobProps);
+
+        // Inject planning id and start time
+        newPlanningId = HelixJobsMapping.createPlanningJobId(jobPlanningProps);
+        
jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, 
newPlanningId);
+        
jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_JOB_CREATE_TIME,
 String.valueOf(System.currentTimeMillis()));
+
+        builder.setSysProps(this.sysProps);
+        builder.setJobPlanningProps(jobPlanningProps);
+        builder.setJobHelixManager(this.jobHelixManager);
+        builder.setTaskDriverHelixManager(this.taskDriverHelixManager);
+        builder.setAppWorkDir(this.appWorkDir);
+        builder.setJobsMapping(this.jobsMapping);
+        builder.setPlanningJobLauncherMetrics(this.planningJobLauncherMetrics);
+        builder.setHelixMetrics(this.helixMetrics);
+
+        log.info("Planning job {} started.", newPlanningId);
         GobblinHelixDistributeJobExecutionLauncher launcher = builder.build();
         closer.register(launcher);
-        this.jobsMapping.setPlanningJobId(jobName, planningId);
+        this.jobsMapping.setPlanningJobId(jobName, newPlanningId);
         startTime = System.currentTimeMillis();
         this.currentJobMonitor = launcher.launchJob(null);
-        this.currentJobMonitor.get();
-        this.currentJobMonitor = null;
-        log.info("Planning job {} finished.", planningId);
-        
this.planningJobLauncherMetrics.updateTimeForCompletedPlanningJobs(startTime);
-      } catch (Throwable t) {
-        if (startTime != 0) {
-          
this.planningJobLauncherMetrics.updateTimeForFailedPlanningJobs(startTime);
-        }
-        throw new JobException("Failed to launch and run planning job " + 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), t);
+
+        // make sure the planning job will be visible to other parallel 
running threads,
+        // so that the same critical section check (querying Helix for job 
completeness)
+        // can be applied.
+        HelixUtils.waitJobInitialization(planningJobManager, newPlanningId, 
newPlanningId, 300_000);
+
+      } finally {
+        // end of the critical section to check if a job with same job name is 
running
+        jobLock.unlock();
       }
+
+      // we can remove the job spec from the catalog because Helix will drive 
this job to the end.
+      this.deleteJobSpec();
+
+      this.currentJobMonitor.get();
+      this.currentJobMonitor = null;
+      log.info("Planning job {} finished.", newPlanningId);
+      
this.planningJobLauncherMetrics.updateTimeForCompletedPlanningJobs(startTime);
+
     } catch (Exception e) {
       if (startTime != 0) {
         
this.planningJobLauncherMetrics.updateTimeForFailedPlanningJobs(startTime);
       }
-      log.error("Failed to run planning job {}", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
-      throw new JobException("Failed to run planning job " + 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
+      log.error("Failed to run planning job {}", jobName, e);
+      throw new JobException("Failed to run planning job " + jobName, e);
+    } finally {
+      try {
+        closer.close();
+      } catch (IOException e) {
+        throw new JobException("Cannot properly close planning job " + 
jobName, e);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/754b0669/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index f67e77c..69ae823 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -107,19 +107,11 @@ public class HelixUtils {
     submitJobToWorkFlow(jobConfigBuilder, queueName, jobName, helixTaskDriver, 
helixManager, jobQueueDeleteTimeoutSeconds);
   }
 
-  public static void submitJobToWorkFlow(JobConfig.Builder jobConfigBuilder,
+  static void waitJobInitialization(
+      HelixManager helixManager,
       String workFlowName,
       String jobName,
-      TaskDriver helixTaskDriver,
-      HelixManager helixManager,
-      long workFlowExpiryTime) throws Exception {
-
-    WorkflowConfig workFlowConfig = new 
WorkflowConfig.Builder().setExpiry(workFlowExpiryTime, 
TimeUnit.SECONDS).build();
-    // Create a work flow for each job with the name being the queue name
-    Workflow workFlow = new 
Workflow.Builder(workFlowName).setWorkflowConfig(workFlowConfig).addJob(jobName,
 jobConfigBuilder).build();
-    // start the workflow
-    helixTaskDriver.start(workFlow);
-    log.info("Created a work flow {}", workFlowName);
+      long timeoutMillis) throws Exception {
     WorkflowContext workflowContext = 
TaskDriver.getWorkflowContext(helixManager, workFlowName);
 
     // If the helix job is deleted from some other thread or a completely 
external process,
@@ -127,15 +119,37 @@ public class HelixUtils {
     // 1) workflowContext did not get initialized ever, in which case we need 
to keep waiting, or
     // 2) it did get initialized but deleted soon after, in which case we 
should stop waiting
     // To overcome this issue, we wait here till workflowContext gets 
initialized
-
+    long start = System.currentTimeMillis();
     while (workflowContext == null || 
workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, 
jobName)) == null) {
+      if (System.currentTimeMillis() - start > timeoutMillis) {
+        log.error("Job cannot be initialized within {} milliseconds, 
considered as an error", timeoutMillis);
+        throw new JobException("Job cannot be initialized within {} 
milliseconds, considered as an error");
+      }
       workflowContext = TaskDriver.getWorkflowContext(helixManager, 
workFlowName);
       Thread.sleep(1000);
       log.info("Waiting for work flow initialization.");
     }
+
     log.info("Work flow {} initialized", workFlowName);
   }
 
+  public static void submitJobToWorkFlow(JobConfig.Builder jobConfigBuilder,
+      String workFlowName,
+      String jobName,
+      TaskDriver helixTaskDriver,
+      HelixManager helixManager,
+      long workFlowExpiryTime) throws Exception {
+
+    WorkflowConfig workFlowConfig = new 
WorkflowConfig.Builder().setExpiry(workFlowExpiryTime, 
TimeUnit.SECONDS).build();
+    // Create a work flow for each job with the name being the queue name
+    Workflow workFlow = new 
Workflow.Builder(workFlowName).setWorkflowConfig(workFlowConfig).addJob(jobName,
 jobConfigBuilder).build();
+    // start the workflow
+    helixTaskDriver.start(workFlow);
+    log.info("Created a work flow {}", workFlowName);
+
+    waitJobInitialization(helixManager, workFlowName, jobName, Long.MAX_VALUE);
+  }
+
   static void waitJobCompletion(HelixManager helixManager, String 
workFlowName, String jobName,
       Optional<Long> timeoutInSeconds) throws InterruptedException, 
TimeoutException {
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/754b0669/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
index dc88a4e..0d973af 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
@@ -95,12 +95,8 @@ public class TaskRunnerSuiteForJobFactoryTest extends 
TaskRunnerSuiteThreadModel
       DistributeJobResult rst = super.getResultFromUserContent();
       Assert.assertTrue(!rst.isEarlyStopped());
       String jobName = 
this.jobPlanningProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
-      String planningJobId = 
this.jobPlanningProps.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY);
-
       try {
-        String planningJobFromStore = 
this.jobsMapping.getPlanningJobId(jobName).get();
-        Assert.assertTrue(planningJobFromStore.equals(planningJobId));
-
+        
Assert.assertFalse(this.jobsMapping.getPlanningJobId(jobName).isPresent());
       } catch (Exception e) {
         Assert.fail(e.toString());
       }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/754b0669/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
index 76adca4..e031d75 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
@@ -111,11 +111,10 @@ public class NonObservingFSJobCatalog extends 
FSJobCatalog {
       if (fs.exists(jobSpecPath)) {
         fs.delete(jobSpecPath, false);
         this.mutableMetrics.updateRemoveJobTime(startTime);
+        this.listeners.onDeleteJob(jobURI, jobSpec.getVersion());
       } else {
         LOGGER.warn("No file with URI:" + jobSpecPath + " is found. Deletion 
failed.");
       }
-
-      this.listeners.onDeleteJob(jobURI, jobSpec.getVersion());
     } catch (IOException e) {
       throw new RuntimeException("When removing a JobConf. file, issues 
unexpected happen:" + e.getMessage());
     } catch (SpecNotFoundException e) {

Reply via email to