This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ac0671  [GOBBLIN-1319] fix cancellation in gobblin cluster jobs
0ac0671 is described below

commit 0ac0671251f39f9a922e68267894099cb668e896
Author: Arjun <[email protected]>
AuthorDate: Mon Nov 30 13:57:54 2020 -0800

    [GOBBLIN-1319] fix cancellation in gobblin cluster jobs
    
    Closes #3155 from arjun4084346/flowCancel
---
 .../apache/gobblin/runtime/api/SpecExecutor.java   |   3 +-
 .../gobblin/cluster/FsJobConfigurationManager.java |   3 +
 .../apache/gobblin/cluster/GobblinHelixJob.java    |   4 +-
 .../gobblin/cluster/GobblinHelixJobScheduler.java  |  48 +++++++-
 .../gobblin/cluster/GobblinHelixJobTask.java       |  19 ++--
 .../apache/gobblin/cluster/HelixJobsMapping.java   |  68 ++++++++----
 .../cluster/HelixRetriggeringJobCallable.java      | 123 ++++++++++++++-------
 .../gobblin/cluster/JobConfigurationManager.java   |   6 +
 .../cluster/ScheduledJobConfigurationManager.java  |   6 +-
 .../cluster/StreamingJobConfigurationManager.java  |   5 +-
 .../cluster/event/CancelJobConfigArrivalEvent.java |  38 +++++++
 .../service/StreamingKafkaSpecConsumer.java        |  12 ++
 .../gobblin/runtime/api/JobCatalogListener.java    |  18 +++
 .../gobblin/runtime/api/MutableJobCatalog.java     |   4 +
 .../job_catalog/JobCatalogListenersList.java       |  11 ++
 .../job_catalog/NonObservingFSJobCatalog.java      |  10 ++
 .../runtime/job_monitor/KafkaJobMonitor.java       |   2 +-
 .../runtime/job_monitor/MockedKafkaJobMonitor.java |   2 +-
 18 files changed, 300 insertions(+), 82 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java 
b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
index 8569275..c045c5a 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
@@ -59,7 +59,8 @@ public interface SpecExecutor {
     ADD(1, "add"),
     UPDATE(2, "update"),
     DELETE(3, "delete"),
-    UNKNOWN(4, "unknown");
+    UNKNOWN(4, "unknown"),
+    CANCEL(5, "cancel");
 
     private int _id;
     private String _verb;
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsJobConfigurationManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsJobConfigurationManager.java
index 3b65e42..2563201 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsJobConfigurationManager.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsJobConfigurationManager.java
@@ -114,6 +114,9 @@ public class FsJobConfigurationManager extends 
JobConfigurationManager {
           this.jobCatalogOptional.get().remove(jobSpec.getUri());
         }
         postDeleteJobConfigArrival(jobSpec.getUri().toString(), 
jobSpec.getConfigAsProperties());
+      } else if (verb.equals(SpecExecutor.Verb.CANCEL)) {
+          // Handle cancel
+          postCancelJobConfigArrival(jobSpec.getUri().toString());
       }
 
       try {
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
index db34e3e..fe3cc17 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
@@ -44,7 +44,7 @@ import org.apache.gobblin.scheduler.JobScheduler;
 @Alpha
 @Slf4j
 public class GobblinHelixJob extends BaseGobblinJob implements 
InterruptableJob {
-  private Future cancellable = null;
+  private Future<?> cancellable = null;
 
   @Override
   public void executeImpl(JobExecutionContext context) throws 
JobExecutionException {
@@ -56,7 +56,7 @@ public class GobblinHelixJob extends BaseGobblinJob 
implements InterruptableJob
     final JobListener jobListener = (JobListener) 
dataMap.get(JobScheduler.JOB_LISTENER_KEY);
 
     try {
-      if 
(Boolean.valueOf(jobProps.getProperty(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD,
+      if 
(Boolean.parseBoolean(jobProps.getProperty(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD,
               
Boolean.toString(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD_DEFAULT))))
 {
         jobScheduler.runJob(jobProps, jobListener);
       } else {
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 8959378..e26f5dd 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.cluster;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -50,6 +51,7 @@ import com.google.common.util.concurrent.Striped;
 import com.typesafe.config.Config;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.cluster.event.CancelJobConfigArrivalEvent;
 import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent;
 import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
 import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent;
@@ -84,7 +86,7 @@ import org.apache.gobblin.util.PropertiesUtils;
  * <p> More details can be found at {@link HelixRetriggeringJobCallable}.
  */
 @Alpha
-public class GobblinHelixJobScheduler extends JobScheduler implements 
StandardMetricsBridge{
+public class GobblinHelixJobScheduler extends JobScheduler implements 
StandardMetricsBridge {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinHelixJobScheduler.class);
   private static final String COMMON_JOB_PROPS = "gobblin.common.job.props";
@@ -205,9 +207,9 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
 
     if (cleanAllDistJobs) {
       for (org.apache.gobblin.configuration.State state : 
this.jobsMapping.getAllStates()) {
-        String jobName = state.getId();
-        LOGGER.info("Delete mapping for job " + jobName);
-        this.jobsMapping.deleteMapping(jobName);
+        String jobUri = state.getId();
+        LOGGER.info("Delete mapping for job " + jobUri);
+        this.jobsMapping.deleteMapping(jobUri);
       }
     }
   }
@@ -346,7 +348,7 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
   }
 
   private void waitForJobCompletion(String jobName) {
-    while (this.jobRunningMap.getOrDefault(jobName, false) != false) {
+    while (this.jobRunningMap.getOrDefault(jobName, false)) {
       LOGGER.info("Waiting for job {} to stop...", jobName);
       try {
         Thread.sleep(1000);
@@ -367,6 +369,42 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
     }
   }
 
+  @Subscribe
+  public void handleCancelJobConfigArrival(CancelJobConfigArrivalEvent 
cancelJobArrival)
+      throws InterruptedException {
+    String jobUri = cancelJobArrival.getJoburi();
+    LOGGER.info("Received cancel for job configuration of job " + jobUri);
+    Optional<String> distributedJobMode;
+    Optional<String> planningJob = Optional.empty();
+    Optional<String> actualJob = Optional.empty();
+
+    this.jobSchedulerMetrics.numCancellationStart.incrementAndGet();
+
+    try {
+      distributedJobMode = this.jobsMapping.getDistributedJobMode(jobUri);
+      if (distributedJobMode.isPresent() && 
Boolean.parseBoolean(distributedJobMode.get())) {
+        planningJob = this.jobsMapping.getPlanningJobId(jobUri);
+      } else {
+        actualJob = this.jobsMapping.getActualJobId(jobUri);
+      }
+    } catch (IOException e) {
+      LOGGER.warn("jobsMapping could not be retrieved for job {}", jobUri);
+      return;
+    }
+
+    if (planningJob.isPresent()) {
+      LOGGER.info("Cancelling planning job helix workflow: {}", 
planningJob.get());
+      new 
TaskDriver(this.taskDriverHelixManager.get()).waitToStop(planningJob.get(), 
this.helixJobStopTimeoutMillis);
+    }
+
+    if (actualJob.isPresent()) {
+      LOGGER.info("Cancelling actual job helix workflow: {}", actualJob.get());
+      new TaskDriver(this.jobHelixManager).waitToStop(actualJob.get(), 
this.helixJobStopTimeoutMillis);
+    }
+
+    this.jobSchedulerMetrics.numCancellationStart.decrementAndGet();
+  }
+
   private void cancelJobIfRequired(DeleteJobConfigArrivalEvent 
deleteJobArrival) throws InterruptedException {
     Properties jobConfig = deleteJobArrival.getJobConfig();
     if (PropertiesUtils.getPropAsBoolean(jobConfig, 
GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
index aa2d7fa..78127b2 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
@@ -65,7 +65,7 @@ class GobblinHelixJobTask implements Task {
   private final String planningJobId;
   private final HelixManager jobHelixManager;
   private final Path appWorkDir;
-  private final String jobName;
+  private final String jobUri;
   private final List<? extends Tag<?>> metadataTags;
   private GobblinHelixJobLauncher launcher;
   private GobblinHelixJobTaskMetrics jobTaskMetrics;
@@ -100,7 +100,7 @@ class GobblinHelixJobTask implements Task {
       throw new RuntimeException("Job doesn't have planning ID");
     }
 
-    this.jobName = 
jobPlusSysConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY);
+    this.jobUri = 
jobPlusSysConfig.getProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI);
     this.planningJobId = 
jobPlusSysConfig.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY);
     this.jobsMapping = jobsMapping;
     this.appWorkDir = builder.getAppWorkPath();
@@ -149,7 +149,7 @@ class GobblinHelixJobTask implements Task {
     
this.jobTaskMetrics.updateTimeBetweenJobSubmissionAndExecution(this.jobPlusSysConfig);
 
     try (Closer closer = Closer.create()) {
-      Optional<String> planningIdFromStateStore = 
this.jobsMapping.getPlanningJobId(jobName);
+      Optional<String> planningIdFromStateStore = 
this.jobsMapping.getPlanningJobId(jobUri);
 
       long timeOut = PropertiesUtils.getPropAsLong(jobPlusSysConfig,
                                          
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS,
@@ -161,7 +161,7 @@ class GobblinHelixJobTask implements Task {
       }
 
       while (true) {
-        Optional<String> actualJobIdFromStateStore = 
this.jobsMapping.getActualJobId(jobName);
+        Optional<String> actualJobIdFromStateStore = 
this.jobsMapping.getActualJobId(jobUri);
         if (actualJobIdFromStateStore.isPresent()) {
           String previousActualJobId = actualJobIdFromStateStore.get();
           if (HelixUtils.isJobFinished(previousActualJobId, 
previousActualJobId, this.jobHelixManager)) {
@@ -186,7 +186,7 @@ class GobblinHelixJobTask implements Task {
 
         this.launcher = createJobLauncher();
 
-        this.jobsMapping.setActualJobId(jobName, this.planningJobId, 
this.launcher.getJobId());
+        this.jobsMapping.setActualJobId(jobUri, this.planningJobId, 
this.launcher.getJobId());
 
         closer.register(launcher).launchJob(this.jobLauncherListener);
 
@@ -205,9 +205,9 @@ class GobblinHelixJobTask implements Task {
     } finally {
       // always cleanup the job mapping for current job name.
       try {
-        this.jobsMapping.deleteMapping(jobName);
+        this.jobsMapping.deleteMapping(jobUri);
       } catch (Exception e) {
-        return new TaskResult(TaskResult.Status.FAILED,"Cannot delete jobs 
mapping for job : " + jobName);
+        return new TaskResult(TaskResult.Status.FAILED,"Cannot delete jobs 
mapping for job : " + jobUri);
       }
     }
   }
@@ -217,15 +217,16 @@ class GobblinHelixJobTask implements Task {
     log.info("Cancelling planning job {}", this.planningJobId);
     if (launcher != null) {
       try {
+        // this cancel should cancel the helix job which run method submitted, 
right?
         launcher.cancelJob(this.jobLauncherListener);
       } catch (JobException e) {
         throw new RuntimeException("Unable to cancel planning job " + 
this.planningJobId + ": ", e);
       } finally {
         // always cleanup the job mapping for current job name.
         try {
-          this.jobsMapping.deleteMapping(jobName);
+          this.jobsMapping.deleteMapping(jobUri);
         } catch (Exception e) {
-          throw new RuntimeException("Cannot delete jobs mapping for job : " + 
jobName);
+          throw new RuntimeException("Cannot delete jobs mapping for job : " + 
jobUri);
         }
       }
     }
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
index 11a90da..ab4d87c 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
@@ -116,49 +116,73 @@ public class HelixJobsMapping {
     this.stateStore.delete(this.distributedStateStoreName, jobName);
   }
 
-  public void setPlanningJobId (String jobName, String planningJobId) throws 
IOException {
-    State state = getOrCreate(distributedStateStoreName, jobName);
-    state.setId(jobName);
+  public void setPlanningJobId (String jobUri, String planningJobId) throws 
IOException {
+    State state = getOrCreate(distributedStateStoreName, jobUri);
+    state.setId(jobUri);
     state.setProp(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, 
planningJobId);
-    // fs state store use hdfs rename, which assumes the target file doesn't 
exist.
-    if (stateStore instanceof FsStateStore) {
-      this.deleteMapping(jobName);
-    }
-    this.stateStore.put(distributedStateStoreName, jobName, state);
+    writeToStateStore(jobUri, state);
   }
 
-  public void setActualJobId (String jobName, String planningJobId, String 
actualJobId) throws IOException {
-    State state = getOrCreate(distributedStateStoreName, jobName);
-    state.setId(jobName);
-    state.setProp(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, 
planningJobId);
+  public void setActualJobId (String jobUri, String actualJobId) throws 
IOException {
+    setActualJobId(jobUri, null, actualJobId);
+  }
+
+  public void setActualJobId (String jobUri, String planningJobId, String 
actualJobId) throws IOException {
+    State state = getOrCreate(distributedStateStoreName, jobUri);
+    state.setId(jobUri);
+    if (null != planningJobId) {
+      state.setProp(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, 
planningJobId);
+    }
     state.setProp(ConfigurationKeys.JOB_ID_KEY, actualJobId);
+    writeToStateStore(jobUri, state);
+  }
+
+  public void setDistributedJobMode(String jobUri, boolean distributedJobMode) 
throws IOException {
+    State state = getOrCreate(distributedStateStoreName, jobUri);
+    state.setId(jobUri);
+    
state.setProp(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_ENABLED, 
distributedJobMode);
+    writeToStateStore(jobUri, state);
+  }
+
+  private void writeToStateStore(String jobUri, State state) throws 
IOException {
     // fs state store use hdfs rename, which assumes the target file doesn't 
exist.
-    if (stateStore instanceof FsStateStore) {
-      this.deleteMapping(jobName);
+    if (this.stateStore instanceof FsStateStore) {
+      this.deleteMapping(jobUri);
     }
-    this.stateStore.put(distributedStateStoreName, jobName, state);
+    this.stateStore.put(distributedStateStoreName, jobUri, state);
   }
 
-  private Optional<String> getId (String jobName, String idName) throws 
IOException {
-    State state = this.stateStore.get(distributedStateStoreName, jobName, 
jobName);
+  private Optional<String> getId (String jobUri, String idName) throws 
IOException {
+    State state = this.stateStore.get(distributedStateStoreName, jobUri, 
jobUri);
     if (state == null) {
       return Optional.empty();
     }
 
     String id = state.getProp(idName);
 
-    return id == null? Optional.empty() : Optional.of(id);
+    return id == null ? Optional.empty() : Optional.of(id);
   }
 
   public List<State> getAllStates() throws IOException {
     return this.stateStore.getAll(distributedStateStoreName);
   }
 
-  public Optional<String> getActualJobId (String jobName) throws IOException {
-    return getId(jobName, ConfigurationKeys.JOB_ID_KEY);
+  public Optional<String> getActualJobId (String jobUri) throws IOException {
+    return getId(jobUri, ConfigurationKeys.JOB_ID_KEY);
+  }
+
+  public Optional<String> getPlanningJobId (String jobUri) throws IOException {
+    return getId(jobUri, GobblinClusterConfigurationKeys.PLANNING_ID_KEY);
   }
 
-  public Optional<String> getPlanningJobId (String jobName) throws IOException 
{
-    return getId(jobName, GobblinClusterConfigurationKeys.PLANNING_ID_KEY);
+  public Optional<String> getDistributedJobMode(String jobUri) throws 
IOException {
+    State state = this.stateStore.get(distributedStateStoreName, jobUri, 
jobUri);
+    if (state == null) {
+      return Optional.empty();
+    }
+
+    String id = 
state.getProp(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_ENABLED);
+
+    return id == null ? Optional.empty() : Optional.of(id);
   }
 }
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index bfa2dce..251cb0e 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -153,6 +153,12 @@ class HelixRetriggeringJobCallable implements Callable {
         "false");
 
     try {
+        this.jobsMapping.setDistributedJobMode(this.jobUri, 
this.isDistributeJobEnabled);
+    } catch (IOException e) {
+        throw new JobException("Could not update jobsMapping", e);
+    }
+
+    try {
       if (this.isDistributeJobEnabled) {
         runJobExecutionLauncher();
       } else {
@@ -197,8 +203,25 @@ class HelixRetriggeringJobCallable implements Callable {
    */
   private void runJobLauncherLoop() throws JobException {
     try {
+      // Check if any existing planning job is running
+      Optional<String> actualJobIdFromStore = 
jobsMapping.getActualJobId(this.jobUri);
+
+      if (actualJobIdFromStore.isPresent() && 
!canRun(actualJobIdFromStore.get(), this.jobHelixManager)) {
+        return;
+      }
+
+      String actualJobId = jobProps.containsKey(ConfigurationKeys.JOB_ID_KEY)
+              ? jobProps.getProperty(ConfigurationKeys.JOB_ID_KEY) : 
HelixJobsMapping.createActualJobId(jobProps);
+      log.info("Job {} creates actual job {}", this.jobUri, actualJobId);
+
+      jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, actualJobId);
+
+      /* create the job launcher after setting ConfigurationKeys.JOB_ID_KEY */
+      currentJobLauncher = this.jobScheduler.buildJobLauncher(jobProps);
+
+      this.jobsMapping.setActualJobId(this.jobUri, actualJobId);
+
       while (true) {
-        currentJobLauncher = this.jobScheduler.buildJobLauncher(jobProps);
         // in "run once" case, job scheduler will remove current job from the 
scheduler
         boolean isEarlyStopped = this.jobScheduler.runJob(jobProps, 
jobListener, currentJobLauncher);
         boolean isRetriggerEnabled = this.isRetriggeringEnabled();
@@ -207,11 +230,19 @@ class HelixRetriggeringJobCallable implements Callable {
         } else {
           break;
         }
-        currentJobLauncher = null;
       }
+
     } catch (Exception e) {
       log.error("Failed to run job {}", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
       throw new JobException("Failed to run job " + 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
+    } finally {
+      currentJobLauncher = null;
+      // always cleanup the job mapping for current job name.
+      try {
+        this.jobsMapping.deleteMapping(jobUri);
+      } catch (Exception e) {
+        throw new JobException("Cannot delete jobs mapping for job : " + 
jobUri);
+      }
     }
   }
 
@@ -225,7 +256,6 @@ class HelixRetriggeringJobCallable implements Callable {
   private void runJobExecutionLauncher() throws JobException {
     long startTime = 0;
     String newPlanningId;
-    String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
     Closer closer = Closer.create();
     try {
       HelixManager planningJobManager = 
this.taskDriverHelixManager.isPresent()?
@@ -235,42 +265,19 @@ class HelixRetriggeringJobCallable implements Callable {
           GobblinHelixDistributeJobExecutionLauncher.Builder.class.getName());
 
       // Check if any existing planning job is running
-      Optional<String> planningJobIdFromStore = 
jobsMapping.getPlanningJobId(jobName);
+      Optional<String> planningJobIdFromStore = 
jobsMapping.getPlanningJobId(this.jobUri);
       boolean nonblocking = false;
       // start of critical section to check if a job with same job name is 
running
-      Lock jobLock = locks.get(jobName);
+      Lock jobLock = locks.get(this.jobUri);
       jobLock.lock();
 
       try {
-        if (planningJobIdFromStore.isPresent()) {
-          String previousPlanningJobId = planningJobIdFromStore.get();
-
-          if (HelixUtils.isJobFinished(previousPlanningJobId, 
previousPlanningJobId, planningJobManager)) {
-            log.info("Previous planning job {} has reached to the final state. 
Start a new one.", previousPlanningJobId);
-          } else {
-            boolean killDuplicateJob = PropertiesUtils
-                .getPropAsBoolean(this.jobProps, 
GobblinClusterConfigurationKeys.KILL_DUPLICATE_PLANNING_JOB, 
String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_KILL_DUPLICATE_PLANNING_JOB));
-
-            if (!killDuplicateJob) {
-              log.info("Previous planning job {} has not finished yet. Skip 
this job.", previousPlanningJobId);
-              return;
-            } else {
-              log.info("Previous planning job {} has not finished yet. Kill 
it.", previousPlanningJobId);
-              long timeOut = PropertiesUtils.getPropAsLong(sysProps, 
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS,
-                  
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS) 
* 1000;
-              try {
-                HelixUtils.deleteWorkflow(previousPlanningJobId, 
planningJobManager, timeOut);
-              } catch (HelixException e) {
-                log.info("Helix cannot delete previous planning job id {} 
within {} seconds.", previousPlanningJobId,
-                    timeOut / 1000);
-                throw new JobException("Helix cannot delete previous planning 
job id " + previousPlanningJobId, e);
-              }
-            }
-          }
-        } else {
-          log.info("Planning job for {} does not exist. First time run.", 
jobName);
+        if (planningJobIdFromStore.isPresent() && 
!canRun(planningJobIdFromStore.get(), planningJobManager)) {
+          return;
         }
 
+        log.info("Planning job for {} does not exist. First time run.", 
this.jobUri);
+
         GobblinHelixDistributeJobExecutionLauncher.Builder builder = 
GobblinConstructorUtils.<GobblinHelixDistributeJobExecutionLauncher.Builder>invokeLongestConstructor(
             new 
ClassAliasResolver(GobblinHelixDistributeJobExecutionLauncher.Builder.class).resolveClass(builderStr));
 
@@ -297,13 +304,14 @@ class HelixRetriggeringJobCallable implements Callable {
         Config combined = ConfigUtils.propertiesToConfig(jobPlanningProps)
             .withFallback(ConfigUtils.propertiesToConfig(sysProps));
 
-        nonblocking = ConfigUtils.getBoolean(combined, 
GobblinClusterConfigurationKeys.NON_BLOCKING_PLANNING_JOB_ENABLED,
-            
GobblinClusterConfigurationKeys.DEFAULT_NON_BLOCKING_PLANNING_JOB_ENABLED);
+        nonblocking = ConfigUtils
+            .getBoolean(combined, 
GobblinClusterConfigurationKeys.NON_BLOCKING_PLANNING_JOB_ENABLED,
+                
GobblinClusterConfigurationKeys.DEFAULT_NON_BLOCKING_PLANNING_JOB_ENABLED);
 
         log.info("Planning job {} started.", newPlanningId);
         GobblinHelixDistributeJobExecutionLauncher launcher = builder.build();
         closer.register(launcher);
-        this.jobsMapping.setPlanningJobId(jobName, newPlanningId);
+        this.jobsMapping.setPlanningJobId(this.jobUri, newPlanningId);
         startTime = System.currentTimeMillis();
         this.currentJobMonitor = launcher.launchJob(null);
 
@@ -311,7 +319,6 @@ class HelixRetriggeringJobCallable implements Callable {
         // so that the same critical section check (querying Helix for job 
completeness)
         // can be applied.
         HelixUtils.waitJobInitialization(planningJobManager, newPlanningId, 
newPlanningId);
-
       } finally {
         // end of the critical section to check if a job with same job name is 
running
         jobLock.unlock();
@@ -334,15 +341,53 @@ class HelixRetriggeringJobCallable implements Callable {
       if (startTime != 0) {
         
this.planningJobLauncherMetrics.updateTimeForFailedPlanningJobs(startTime);
       }
-      log.error("Failed to run planning job {}", jobName, e);
-      throw new JobException("Failed to run planning job " + jobName, e);
+      log.error("Failed to run planning job for {}", this.jobUri, e);
+      throw new JobException("Failed to run planning job for " + this.jobUri, 
e);
     } finally {
       try {
         closer.close();
       } catch (IOException e) {
-        throw new JobException("Cannot properly close planning job " + 
jobName, e);
+        throw new JobException("Cannot properly close planning job for " + 
this.jobUri, e);
+      }
+    }
+  }
+
+    /**
+     * This method checks if a job can be submitted to helix for execution.
+     * A job can run, 1) if the previous job with the same job id is finished,
+     * 2) if the previous job is running but can be killed specified by 
property
+     * {@link GobblinClusterConfigurationKeys#KILL_DUPLICATE_PLANNING_JOB}, 
default being true
+     * @param previousJobId job id from the previous execution
+     * @param helixManager helix manager
+     * @return true if the job can run on Helix
+     * @throws JobException if previous job deletion fails
+     * @throws InterruptedException
+     */
+  private boolean canRun(String previousJobId, HelixManager helixManager) 
throws JobException, InterruptedException {
+    if (HelixUtils.isJobFinished(previousJobId, previousJobId, helixManager)) {
+      log.info("Previous planning job {} has reached to the final state. Start 
a new one.", previousJobId);
+    } else {
+      boolean killDuplicateJob = PropertiesUtils
+          .getPropAsBoolean(this.jobProps, 
GobblinClusterConfigurationKeys.KILL_DUPLICATE_PLANNING_JOB, 
String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_KILL_DUPLICATE_PLANNING_JOB));
+
+      if (!killDuplicateJob) {
+        log.info("Previous planning job {} has not finished yet. Skip this 
job.", previousJobId);
+        return false;
+      } else {
+        log.info("Previous planning job {} has not finished yet. Kill it.", 
previousJobId);
+        long timeOut = PropertiesUtils
+            .getPropAsLong(sysProps, 
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS,
+                
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS) 
* 1000;
+        try {
+          HelixUtils.deleteWorkflow(previousJobId, helixManager, timeOut);
+        } catch (HelixException e) {
+          log.info("Helix cannot delete previous planning job id {} within {} 
seconds.", previousJobId,
+              timeOut / 1000);
+          throw new JobException("Helix cannot delete previous planning job id 
" + previousJobId, e);
+        }
       }
     }
+    return true;
   }
 
   void cancel() throws JobException {
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
index c2ba8a4..86b5219 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.gobblin.cluster.event.CancelJobConfigArrivalEvent;
 import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -132,4 +133,9 @@ public class JobConfigurationManager extends 
AbstractIdleService implements Stan
     LOGGER.info(String.format("Posting delete JobConfig with name: %s and 
config: %s", jobName, jobConfig));
     this.eventBus.post(new DeleteJobConfigArrivalEvent(jobName, jobConfig));
   }
+
+  protected void postCancelJobConfigArrival(String jobUri) {
+    LOGGER.info(String.format("Posting cancel JobConfig with name: %s", 
jobUri));
+    this.eventBus.post(new CancelJobConfigArrivalEvent(jobUri));
+  }
 }
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
index e311507..601ed90 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
@@ -140,7 +140,11 @@ public class ScheduledJobConfigurationManager extends 
JobConfigurationManager {
         Spec anonymousSpec = (Spec) entry.getValue();
         postDeleteJobConfigArrival(anonymousSpec.getUri().toString(), new 
Properties());
         jobSpecs.remove(entry.getValue().getUri());
-      }
+      } else if (verb.equals(SpecExecutor.Verb.CANCEL)) {
+        // Handle cancel
+        Spec anonymousSpec = entry.getValue();
+        postCancelJobConfigArrival(anonymousSpec.getUri().toString());
+        }
     }
   }
 
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
index a18e759..d731e80 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
@@ -158,8 +158,11 @@ public class StreamingJobConfigurationManager extends 
JobConfigurationManager {
         postUpdateJobConfigArrival(jobSpec.getUri().toString(), 
jobSpec.getConfigAsProperties());
       } else if (verb.equals(SpecExecutor.Verb.DELETE)) {
         // Handle delete
-        Spec anonymousSpec = (Spec) entry.getValue();
+        Spec anonymousSpec = entry.getValue();
         postDeleteJobConfigArrival(anonymousSpec.getUri().toString(), new 
Properties());
+      } else if (verb.equals(SpecExecutor.Verb.CANCEL)) {
+        Spec anonymousSpec = entry.getValue();
+        postCancelJobConfigArrival(anonymousSpec.getUri().toString());
       }
     }
   }
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/CancelJobConfigArrivalEvent.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/CancelJobConfigArrivalEvent.java
new file mode 100644
index 0000000..6be9e8f
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/CancelJobConfigArrivalEvent.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster.event;
+
+import java.util.Properties;
+
+
+public class CancelJobConfigArrivalEvent {
+  private final String jobUri;
+
+  public CancelJobConfigArrivalEvent(String jobUri) {
+    this.jobUri = jobUri;
+  }
+
+  /**
+   * Get the job uri.
+   *
+   * @return the job uri
+   */
+  public String getJoburi() {
+    return this.jobUri;
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
 
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
index f7a0614..515a108 100644
--- 
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
+++ 
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
@@ -179,6 +179,18 @@ public class StreamingKafkaSpecConsumer extends 
AbstractIdleService implements S
       }
     }
 
+    @Override
+    public void onCancelJob(URI cancelledJobURI) {
+      super.onCancelJob(cancelledJobURI);
+      try {
+        JobSpec.Builder jobSpecBuilder = JobSpec.builder(cancelledJobURI);
+        jobSpecBuilder.withConfigAsProperties(new Properties());
+        _jobSpecQueue.put(new ImmutablePair<>(SpecExecutor.Verb.CANCEL, 
jobSpecBuilder.build()));
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
     @Override public void onUpdateJob(JobSpec updatedJob) {
       super.onUpdateJob(updatedJob);
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalogListener.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalogListener.java
index c670573..7ef97e6 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalogListener.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalogListener.java
@@ -37,6 +37,9 @@ public interface JobCatalogListener {
    */
   public void onDeleteJob(URI deletedJobURI, String deletedJobVersion);
 
+  default void onCancelJob(URI cancelledJobURI) {
+  }
+
   /**
    * Invoked when the contents of a JobSpec gets updated in the catalog.
    */
@@ -76,6 +79,21 @@ public interface JobCatalogListener {
     }
   }
 
+  public static class CancelJobCallback extends Callback<JobCatalogListener, 
Void> {
+    private final URI _cancelledJobURI;
+
+    public CancelJobCallback(URI cancelledJobURI) {
+      super(Objects.toStringHelper("onCancelJob").add("cancelJob", 
cancelledJobURI).toString());
+      _cancelledJobURI = cancelledJobURI;
+    }
+
+    @Override
+    public Void apply(JobCatalogListener listener) {
+      listener.onCancelJob(_cancelledJobURI);
+      return null;
+    }
+  }
+
   public static class UpdateJobCallback extends Callback<JobCatalogListener, 
Void> {
     private final JobSpec _updatedJob;
     public UpdateJobCallback(JobSpec updatedJob) {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
index 57dfce5..4c12140 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
@@ -48,6 +48,10 @@ public interface MutableJobCatalog extends JobCatalog {
    * */
   public void put(JobSpec jobSpec);
 
+  default void remove(URI uri, boolean alwaysTriggerListeners) {
+    throw new UnsupportedOperationException("Method not implemented by " + 
this.getClass());
+  }
+
   /**
    * Removes an existing JobSpec with the given URI. A no-op if such JobSpec 
does not exist.
    */
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogListenersList.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogListenersList.java
index 404d8f2..48fb737 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogListenersList.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogListenersList.java
@@ -99,6 +99,17 @@ public class JobCatalogListenersList implements 
JobCatalogListener, JobCatalogLi
   }
 
   @Override
+  public synchronized void onCancelJob(URI cancelledJobURI) {
+    Preconditions.checkNotNull(cancelledJobURI);
+
+    try {
+      _disp.execCallbacks(new CancelJobCallback(cancelledJobURI));
+    } catch (InterruptedException e) {
+      getLog().warn("onCancelJob interrupted.");
+    }
+  }
+
+  @Override
   public void close()
       throws IOException {
     _disp.close();
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
index e031d75..0cde6c0 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
@@ -102,6 +102,10 @@ public class NonObservingFSJobCatalog extends FSJobCatalog 
{
    */
   @Override
   public synchronized void remove(URI jobURI) {
+      remove(jobURI, false);
+  }
+
+  public synchronized void remove(URI jobURI, boolean alwaysTriggerListeners) {
     Preconditions.checkState(state() == State.RUNNING, String.format("%s is 
not running.", this.getClass().getName()));
     try {
       long startTime = System.currentTimeMillis();
@@ -119,6 +123,12 @@ public class NonObservingFSJobCatalog extends FSJobCatalog 
{
       throw new RuntimeException("When removing a JobConf. file, issues 
unexpected happen:" + e.getMessage());
     } catch (SpecNotFoundException e) {
       LOGGER.warn("No file with URI:" + jobURI + " is found. Deletion 
failed.");
+    } finally {
+      // HelixRetriggeringJobCallable deletes the job file after submitting it 
to helix,
+      // so we will have to trigger listeners regardless the existence of job 
file to cancel the job
+      if (alwaysTriggerListeners) {
+        this.listeners.onCancelJob(jobURI);
+      }
     }
   }
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
index ba9f06f..38c3c68 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
@@ -108,7 +108,7 @@ public abstract class KafkaJobMonitor extends 
HighLevelConsumer<byte[], byte[]>
         } else if (parsedMessage instanceof Either.Right) {
           this.removedSpecs.inc();
           URI jobSpecUri = ((Either.Right<JobSpec, URI>) 
parsedMessage).getRight();
-          this.jobCatalog.remove(jobSpecUri);
+          this.jobCatalog.remove(jobSpecUri, true);
           // Delete the job state if it is a delete spec request
           deleteStateStore(jobSpecUri);
         }
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
index 0562534..b68bc1a 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
@@ -90,7 +90,7 @@ public class MockedKafkaJobMonitor extends KafkaJobMonitor {
         jobSpecs.remove(uri);
         return null;
       }
-    }).when(jobCatalog).remove(Mockito.any(URI.class));
+    }).when(jobCatalog).remove(Mockito.any(URI.class), Mockito.anyBoolean());
 
     return jobCatalog;
   }

Reply via email to