[ 
https://issues.apache.org/jira/browse/GOBBLIN-1721?focusedWorklogId=818126&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-818126
 ]

ASF GitHub Bot logged work on GOBBLIN-1721:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Oct/22 19:46
            Start Date: 18/Oct/22 19:46
    Worklog Time Spent: 10m 
      Work Description: homatthew commented on code in PR #3580:
URL: https://github.com/apache/gobblin/pull/3580#discussion_r997652378


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java:
##########
@@ -359,13 +369,16 @@ static void handleJobTimeout(String workFlowName, String 
jobName, HelixManager h
   private static void deleteStoppedHelixJob(HelixManager helixManager, String 
workFlowName, String jobName)
       throws InterruptedException {
     WorkflowContext workflowContext = 
TaskDriver.getWorkflowContext(helixManager, workFlowName);
-    while 
(workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, 
jobName)) != STOPPED) {
+    while (workflowContext != null &&
+        
workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, 
jobName)) != STOPPED) {
       log.info("Waiting for job {} to stop...", jobName);
       workflowContext = TaskDriver.getWorkflowContext(helixManager, 
workFlowName);
       Thread.sleep(1000);
     }
-    // deleting the entire workflow, as one workflow contains only one job
-    new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, 
10000L);
+    if (workflowContext != null) {

Review Comment:
   Maybe add a comment for future readers to understand the subtlety of these 
null checks



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java:
##########
@@ -359,13 +369,16 @@ static void handleJobTimeout(String workFlowName, String 
jobName, HelixManager h
   private static void deleteStoppedHelixJob(HelixManager helixManager, String 
workFlowName, String jobName)
       throws InterruptedException {
     WorkflowContext workflowContext = 
TaskDriver.getWorkflowContext(helixManager, workFlowName);
-    while 
(workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, 
jobName)) != STOPPED) {
+    while (workflowContext != null &&
+        
workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, 
jobName)) != STOPPED) {
       log.info("Waiting for job {} to stop...", jobName);
       workflowContext = TaskDriver.getWorkflowContext(helixManager, 
workFlowName);
       Thread.sleep(1000);
     }
-    // deleting the entire workflow, as one workflow contains only one job
-    new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, 
10000L);
+    if (workflowContext != null) {
+      // deleting the entire workflow, as one workflow contains only one job
+      new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, 
10000L);

Review Comment:
   I am okay with hard coding this timeout value to 10 seconds since the 
deletion has typically happened very fast. But can we have a separate variable 
and corresponding comment that explains that 10 seconds should be plenty of 
time and we should expect deletes to happen a few seconds?
   
   Just to avoid magic numbers and better readability for future readers



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java:
##########
@@ -359,13 +369,16 @@ static void handleJobTimeout(String workFlowName, String 
jobName, HelixManager h
   private static void deleteStoppedHelixJob(HelixManager helixManager, String 
workFlowName, String jobName)
       throws InterruptedException {
     WorkflowContext workflowContext = 
TaskDriver.getWorkflowContext(helixManager, workFlowName);
-    while 
(workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, 
jobName)) != STOPPED) {
+    while (workflowContext != null &&
+        
workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, 
jobName)) != STOPPED) {
       log.info("Waiting for job {} to stop...", jobName);
       workflowContext = TaskDriver.getWorkflowContext(helixManager, 
workFlowName);
       Thread.sleep(1000);
     }
-    // deleting the entire workflow, as one workflow contains only one job
-    new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, 
10000L);
+    if (workflowContext != null) {

Review Comment:
   These null checks are to make sure the program continues this code doesn't 
fail in the delete scenario right?



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java:
##########
@@ -323,6 +323,19 @@ static boolean isJobFinished(String workflowName, String 
jobName, HelixManager h
     }
   }
 
+  // Cancel the job by calling either Delete or Stop Helix API
+  public static void cancelWorkflow(String workflowName, HelixManager 
helixManager, long timeOut, boolean cancelByDelete)
+      throws InterruptedException {
+    TaskDriver taskDriver = new TaskDriver(helixManager);
+    if(cancelByDelete) {

Review Comment:
   nit: whitespace in between the if and `(`
   
   We probably should add a linter 😵‍💫 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 818126)
    Time Spent: 1h  (was: 50m)

> Give option to cancel helix workflow through Delete API to avoid job hanging
> ----------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1721
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1721
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-cluster
>            Reporter: Hanghang Liu
>            Assignee: Hung Tran
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> Currently when we receive a job restart(handleUpdateJobConfigArrival), 
> GobblinHelixJobLauncher will firstly call  helixTaskDriver.waitToStop to stop 
> the workflow, then initiate the new one. We observe the behavior of Helix 
> taking exceptionally long to stop the workflow, making the job state staying 
> in STOPPING status. This will make waitToStop timeout and throw exception all 
> the time, making the new flow never be able to launch.
> We can utilize Delete API in this case since our job is stateless for Helix, 
> to avoid job hanging.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to