This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a6dcfc8f give option to cancel helix workflow through Delete API 
(#3580)
7a6dcfc8f is described below

commit 7a6dcfc8f98229914c75d38d480b6e8af80a1085
Author: Hanghang Nate Liu <[email protected]>
AuthorDate: Mon Oct 24 15:46:03 2022 -0700

    give option to cancel helix workflow through Delete API (#3580)
    
    change log string
    
    change all waitToStop to new cancel method
    
    update imports
    
    address comments
    
    checkstyle
---
 .../cluster/GobblinClusterConfigurationKeys.java   |  6 +++++
 ...GobblinHelixDistributeJobExecutionLauncher.java | 25 +++++++++----------
 .../gobblin/cluster/GobblinHelixJobLauncher.java   | 15 ++++++------
 .../gobblin/cluster/GobblinHelixJobScheduler.java  | 14 ++++++-----
 .../org/apache/gobblin/cluster/HelixUtils.java     | 28 ++++++++++++++++------
 5 files changed, 55 insertions(+), 33 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 4ae21c1d3..d2a60781d 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -175,6 +175,12 @@ public class GobblinClusterConfigurationKeys {
   public static final String CANCEL_RUNNING_JOB_ON_DELETE = 
GOBBLIN_CLUSTER_PREFIX + "job.cancelRunningJobOnDelete";
   public static final String DEFAULT_CANCEL_RUNNING_JOB_ON_DELETE = "false";
 
+  // By default we cancel job by calling helix stop API. In some cases, jobs 
just hang in STOPPING state and preventing
+  // new job being launched. We provide this config to give an option to 
cancel jobs by calling Delete API. Directly delete
+  // a Helix workflow should be safe in Gobblin world, as Gobblin job is 
stateless for Helix since we implement our own state store
+  public static final String CANCEL_HELIX_JOB_BY_DELETE = 
GOBBLIN_CLUSTER_PREFIX + "job.cancelHelixJobByDelete";
+  public static final boolean DEFAULT_CANCEL_HELIX_JOB_BY_DELETE = false;
+
   public static final String HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS = 
GOBBLIN_CLUSTER_PREFIX + "job.stoppingStateTimeoutSeconds";
   public static final long DEFAULT_HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS = 
300;
   public static final String CONTAINER_HEALTH_METRICS_SERVICE_ENABLED = 
GOBBLIN_CLUSTER_PREFIX + "container.health.metrics.service.enabled" ;
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index 2f0322549..f2ba083c5 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -110,6 +110,8 @@ class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher
   @Getter
   private DistributeJobMonitor jobMonitor;
 
+  private final Config combinedConfigs;
+
   public GobblinHelixDistributeJobExecutionLauncher(Builder builder) {
     this.planningJobHelixManager = builder.planningJobHelixManager;
 
@@ -118,19 +120,19 @@ class GobblinHelixDistributeJobExecutionLauncher 
implements JobExecutionLauncher
     this.jobPlanningProps = builder.jobPlanningProps;
     this.jobSubmitted = false;
 
-    Config combined = ConfigUtils.propertiesToConfig(jobPlanningProps)
+    combinedConfigs = ConfigUtils.propertiesToConfig(jobPlanningProps)
         .withFallback(ConfigUtils.propertiesToConfig(sysProps));
 
-    this.workFlowExpiryTimeSeconds = ConfigUtils.getLong(combined,
+    this.workFlowExpiryTimeSeconds = ConfigUtils.getLong(this.combinedConfigs,
         GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
         
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS);
     this.planningJobLauncherMetrics = builder.planningJobLauncherMetrics;
-    this.nonBlockingMode = ConfigUtils.getBoolean(combined,
+    this.nonBlockingMode = ConfigUtils.getBoolean(this.combinedConfigs,
         GobblinClusterConfigurationKeys.NON_BLOCKING_PLANNING_JOB_ENABLED,
         
GobblinClusterConfigurationKeys.DEFAULT_NON_BLOCKING_PLANNING_JOB_ENABLED);
     this.helixMetrics = builder.helixMetrics;
     this.jobsMapping = builder.jobsMapping;
-    this.helixJobStopTimeoutSeconds = ConfigUtils.getLong(combined,
+    this.helixJobStopTimeoutSeconds = ConfigUtils.getLong(this.combinedConfigs,
         GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS,
         
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS);
   }
@@ -144,18 +146,17 @@ class GobblinHelixDistributeJobExecutionLauncher 
implements JobExecutionLauncher
       String planningJobId = getPlanningJobId(this.jobPlanningProps);
       try {
         if (this.cancellationRequested && !this.cancellationExecuted) {
-          // TODO : fix this when HELIX-1180 is completed
-          // work flow should never be deleted explicitly because it has a 
expiry time
-          // If cancellation is requested, we should set the job state to 
CANCELLED/ABORT
-          this.helixTaskDriver.waitToStop(planningJobId, 
this.helixJobStopTimeoutSeconds * 1000);
-          log.info("Stopped the workflow {}", planningJobId);
+          boolean cancelByDelete = 
ConfigUtils.getBoolean(this.combinedConfigs, 
GobblinClusterConfigurationKeys.CANCEL_HELIX_JOB_BY_DELETE,
+              
GobblinClusterConfigurationKeys.DEFAULT_CANCEL_HELIX_JOB_BY_DELETE);
+          HelixUtils.cancelWorkflow(planningJobId, 
this.planningJobHelixManager, helixJobStopTimeoutSeconds * 1000, 
cancelByDelete);
+          log.info("Canceled the workflow {}", planningJobId);
         }
       } catch (HelixException e) {
-        // Cancellation may throw an exception, but Helix set the job state to 
STOP and it should eventually stop
+        // Cancellation may throw an exception, but Helix set the job state to 
STOP/DELETE and it should eventually be cleaned up
         // We will keep this.cancellationExecuted and 
this.cancellationRequested to true and not propagate the exception
-        log.error("Failed to stop workflow {} in Helix", planningJobId, e);
+        log.error("Failed to cancel workflow {} in Helix", planningJobId, e);
       } catch (InterruptedException e) {
-        log.error("Thread interrupted while trying to stop the workflow {} in 
Helix", planningJobId);
+        log.error("Thread interrupted while trying to cancel the workflow {} 
in Helix", planningJobId);
         Thread.currentThread().interrupt();
       }
     }
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 933ec64ce..6308ddc2d 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -255,18 +255,17 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     if (this.jobSubmitted) {
       try {
         if (this.cancellationRequested && !this.cancellationExecuted) {
-          // TODO : fix this when HELIX-1180 is completed
-          // work flow should never be deleted explicitly because it has a 
expiry time
-          // If cancellation is requested, we should set the job state to 
CANCELLED/ABORT
-          this.helixTaskDriver.waitToStop(this.helixWorkFlowName, 
this.helixJobStopTimeoutSeconds * 1000);
-          log.info("stopped the workflow {}", this.helixWorkFlowName);
+          boolean cancelByDelete = ConfigUtils.getBoolean(jobConfig, 
GobblinClusterConfigurationKeys.CANCEL_HELIX_JOB_BY_DELETE,
+              
GobblinClusterConfigurationKeys.DEFAULT_CANCEL_HELIX_JOB_BY_DELETE);
+          HelixUtils.cancelWorkflow(this.helixWorkFlowName, this.helixManager, 
helixJobStopTimeoutSeconds * 1000, cancelByDelete);
+          log.info("Canceled the workflow {}", this.helixWorkFlowName);
         }
       } catch (RuntimeException e) {
-        // Cancellation may throw an exception, but Helix set the job state to 
STOP and it should eventually stop
+        // Cancellation may throw an exception, but Helix set the job state to 
STOP/DELETE and it should eventually be cleaned up
         // We will keep this.cancellationExecuted and 
this.cancellationRequested to true and not propagate the exception
-        log.error("Failed to stop workflow {} in Helix", helixWorkFlowName, e);
+        log.error("Failed to cancel workflow {} in Helix", helixWorkFlowName, 
e);
       } catch (InterruptedException e) {
-        log.error("Thread interrupted while trying to stop the workflow {} in 
Helix", helixWorkFlowName);
+        log.error("Thread interrupted while trying to cancel the workflow {} 
in Helix", helixWorkFlowName);
         Thread.currentThread().interrupt();
       }
     }
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 7e43c1689..20f47da3e 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -34,7 +34,6 @@ import java.util.concurrent.locks.Lock;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
-import org.apache.helix.task.TaskDriver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -379,6 +378,8 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
     Optional<String> distributedJobMode;
     Optional<String> planningJob = Optional.empty();
     Optional<String> actualJob = Optional.empty();
+    boolean cancelByDelete = 
PropertiesUtils.getPropAsBoolean(this.commonJobProperties, 
GobblinClusterConfigurationKeys.CANCEL_HELIX_JOB_BY_DELETE,
+        
String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_CANCEL_HELIX_JOB_BY_DELETE));
 
     this.jobSchedulerMetrics.numCancellationStart.incrementAndGet();
 
@@ -396,12 +397,12 @@ public class GobblinHelixJobScheduler extends 
JobScheduler implements StandardMe
 
     if (planningJob.isPresent()) {
       LOGGER.info("Cancelling planning job helix workflow: {}", 
planningJob.get());
-      new 
TaskDriver(this.taskDriverHelixManager.get()).waitToStop(planningJob.get(), 
this.helixJobStopTimeoutMillis);
+      HelixUtils.cancelWorkflow(planningJob.get(), 
this.taskDriverHelixManager.get(), this.helixJobStopTimeoutMillis, 
cancelByDelete);
     }
 
     if (actualJob.isPresent()) {
       LOGGER.info("Cancelling actual job helix workflow: {}", actualJob.get());
-      new TaskDriver(this.jobHelixManager).waitToStop(actualJob.get(), 
this.helixJobStopTimeoutMillis);
+      HelixUtils.cancelWorkflow(actualJob.get(), this.jobHelixManager, 
this.helixJobStopTimeoutMillis, cancelByDelete);
     }
 
     this.jobSchedulerMetrics.numCancellationStart.decrementAndGet();
@@ -430,9 +431,10 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
 
       if (jobNameToWorkflowIdMap.containsKey(deleteJobArrival.getJobName())) {
         String workflowId = 
jobNameToWorkflowIdMap.get(deleteJobArrival.getJobName());
-        TaskDriver taskDriver = new TaskDriver(this.jobHelixManager);
-        taskDriver.waitToStop(workflowId, this.helixJobStopTimeoutMillis);
-        LOGGER.info("Stopped workflow: {}", deleteJobArrival.getJobName());
+        boolean cancelByDelete = PropertiesUtils.getPropAsBoolean(jobConfig, 
GobblinClusterConfigurationKeys.CANCEL_HELIX_JOB_BY_DELETE,
+            
String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_CANCEL_HELIX_JOB_BY_DELETE));
+        HelixUtils.cancelWorkflow(workflowId, this.jobHelixManager, 
helixJobStopTimeoutMillis, cancelByDelete);
+        LOGGER.info("Cancelled workflow: {}", deleteJobArrival.getJobName());
         //Wait until the cancelled job is complete.
         waitForJobCompletion(deleteJobArrival.getJobName());
       } else {
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 185726c6b..ce217ed20 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -323,6 +323,19 @@ public class HelixUtils {
     }
   }
 
+  // Cancel the job by calling either Delete or Stop Helix API
+  public static void cancelWorkflow(String workflowName, HelixManager 
helixManager, long timeOut, boolean cancelByDelete)
+      throws InterruptedException {
+    TaskDriver taskDriver = new TaskDriver(helixManager);
+    if (cancelByDelete) {
+      taskDriver.deleteAndWaitForCompletion(workflowName, timeOut);
+      log.info("Canceling Helix workflow: {} through delete API", 
workflowName);
+    } else {
+      taskDriver.waitToStop(workflowName, timeOut);
+      log.info("Canceling Helix workflow: {} through stop API", workflowName);
+    }
+  }
+
   static void deleteWorkflow (String workflowName, HelixManager helixManager, 
long timeOut) throws InterruptedException {
     TaskDriver taskDriver = new TaskDriver(helixManager);
     taskDriver.deleteAndWaitForCompletion(workflowName, timeOut);
@@ -340,10 +353,7 @@ public class HelixUtils {
     } catch (JobException e) {
       throw new RuntimeException("Unable to cancel job " + jobName + ": ", e);
     }
-    // TODO : fix this when HELIX-1180 is completed
-    // We should not be deleting a workflow explicitly.
-    // Workflow state should be set to a final state, which will remove it 
automatically because expiry time is set.
-    // After that, all delete calls can be replaced by something like 
HelixUtils.setStateToFinal();
+    // Make sure the job is fully cleaned up
     HelixUtils.deleteStoppedHelixJob(helixManager, workFlowName, jobName);
     log.info("Stopped and deleted the workflow {}", workFlowName);
   }
@@ -358,14 +368,18 @@ public class HelixUtils {
    */
   private static void deleteStoppedHelixJob(HelixManager helixManager, String 
workFlowName, String jobName)
       throws InterruptedException {
+    long deleteTimeout = 10000L;
     WorkflowContext workflowContext = 
TaskDriver.getWorkflowContext(helixManager, workFlowName);
-    while 
(workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, 
jobName)) != STOPPED) {
+    while (workflowContext != null &&
+        
workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, 
jobName)) != STOPPED) {
       log.info("Waiting for job {} to stop...", jobName);
       workflowContext = TaskDriver.getWorkflowContext(helixManager, 
workFlowName);
       Thread.sleep(1000);
     }
-    // deleting the entire workflow, as one workflow contains only one job
-    new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, 
10000L);
+    if (workflowContext != null) {
+      // deleting the entire workflow, as one workflow contains only one job
+      new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, 
deleteTimeout);
+    }
     log.info("Workflow deleted.");
   }
 

Reply via email to