[ 
https://issues.apache.org/jira/browse/GOBBLIN-1319?focusedWorklogId=515659&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-515659
 ]

ASF GitHub Bot logged work on GOBBLIN-1319:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Nov/20 17:04
            Start Date: 23/Nov/20 17:04
    Worklog Time Spent: 10m 
      Work Description: sv2000 commented on a change in pull request #3155:
URL: https://github.com/apache/incubator-gobblin/pull/3155#discussion_r528846286



##########
File path: 
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
##########
@@ -367,6 +369,42 @@ public void 
handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobAr
     }
   }
 
+  @Subscribe
+  public void handleCancelJobConfigArrival(CancelJobConfigArrivalEvent 
cancelJobArrival)
+      throws InterruptedException {
+    String jobUri = cancelJobArrival.getJoburi();
+    LOGGER.info("Received cancel for job configuration of job " + jobUri);
+    Optional<String> distributedJobMode;
+    Optional<String> planningJob = Optional.empty();
+    Optional<String> actualJob = Optional.empty();
+
+    this.jobSchedulerMetrics.numCancellationStart.incrementAndGet();
+
+    try {
+      distributedJobMode = this.jobsMapping.getDistributedJobMode(jobUri);
+      if (distributedJobMode.isPresent() && 
Boolean.parseBoolean(distributedJobMode.get())) {
+        planningJob = this.jobsMapping.getPlanningJobId(jobUri);
+      } else {
+        actualJob = this.jobsMapping.getActualJobId(jobUri);
+      }
+    } catch (IOException e) {
+      LOGGER.warn("jobsMapping could not be retrieved for job {}", jobUri);
+      return;
+    }
+
+    if (planningJob.isPresent()) {
+      LOGGER.info("Cancelling planning job helix workflow: {}", 
planningJob.get());
+      new 
TaskDriver(this.taskDriverHelixManager.get()).waitToStop(planningJob.get(), 
this.helixJobStopTimeoutMillis);

Review comment:
       We should log the outcome of the cancellation attempt. Was the job 
stopped successfully? If not, should the attempt be re-tried? 

##########
File path: 
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
##########
@@ -367,6 +369,42 @@ public void 
handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobAr
     }
   }
 
+  @Subscribe
+  public void handleCancelJobConfigArrival(CancelJobConfigArrivalEvent 
cancelJobArrival)
+      throws InterruptedException {
+    String jobUri = cancelJobArrival.getJoburi();
+    LOGGER.info("Received cancel for job configuration of job " + jobUri);
+    Optional<String> distributedJobMode;
+    Optional<String> planningJob = Optional.empty();
+    Optional<String> actualJob = Optional.empty();
+
+    this.jobSchedulerMetrics.numCancellationStart.incrementAndGet();
+
+    try {
+      distributedJobMode = this.jobsMapping.getDistributedJobMode(jobUri);
+      if (distributedJobMode.isPresent() && 
Boolean.parseBoolean(distributedJobMode.get())) {
+        planningJob = this.jobsMapping.getPlanningJobId(jobUri);
+      } else {
+        actualJob = this.jobsMapping.getActualJobId(jobUri);
+      }
+    } catch (IOException e) {
+      LOGGER.warn("jobsMapping could not be retrieved for job {}", jobUri);
+      return;
+    }
+
+    if (planningJob.isPresent()) {
+      LOGGER.info("Cancelling planning job helix workflow: {}", 
planningJob.get());
+      new 
TaskDriver(this.taskDriverHelixManager.get()).waitToStop(planningJob.get(), 
this.helixJobStopTimeoutMillis);
+    }
+
+    if (actualJob.isPresent()) {
+      LOGGER.info("Cancelling actual job helix workflow: {}", actualJob.get());
+      new TaskDriver(this.jobHelixManager).waitToStop(actualJob.get(), 
this.helixJobStopTimeoutMillis);

Review comment:
       Same comment as above.

##########
File path: 
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
##########
@@ -196,22 +202,57 @@ private void deleteJobSpec() throws JobException {
    * the job launcher determines it is safe to stop.
    */
   private void runJobLauncherLoop() throws JobException {
+    String actualJobId;
+    Lock jobLock = locks.get(this.jobUri);

Review comment:
       Why do we need a lock now when previously it was not needed? What has 
changed? Or is there a race condition you are trying to address? 

##########
File path: 
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
##########
@@ -334,15 +351,44 @@ private void runJobExecutionLauncher() throws 
JobException {
       if (startTime != 0) {
         
this.planningJobLauncherMetrics.updateTimeForFailedPlanningJobs(startTime);
       }
-      log.error("Failed to run planning job {}", jobName, e);
-      throw new JobException("Failed to run planning job " + jobName, e);
+      log.error("Failed to run planning job for {}", this.jobUri, e);
+      throw new JobException("Failed to run planning job for " + this.jobUri, 
e);
     } finally {
       try {
         closer.close();
       } catch (IOException e) {
-        throw new JobException("Cannot properly close planning job " + 
jobName, e);
+        throw new JobException("Cannot properly close planning job for " + 
this.jobUri, e);
+      }
+    }
+  }
+
+  private boolean canRun(Optional<String> jobIdFromStore, HelixManager 
helixManager) throws JobException, InterruptedException {

Review comment:
       Can you document the behavior of canRun? Also, can this be unit tested?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 515659)
    Time Spent: 2h  (was: 1h 50m)

> fix helix job cancellation in gobblin cluster
> ---------------------------------------------
>
>                 Key: GOBBLIN-1319
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1319
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 2h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to