homatthew commented on code in PR #3580: URL: https://github.com/apache/gobblin/pull/3580#discussion_r997652378
########## gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java: ########## @@ -359,13 +369,16 @@ static void handleJobTimeout(String workFlowName, String jobName, HelixManager h private static void deleteStoppedHelixJob(HelixManager helixManager, String workFlowName, String jobName) throws InterruptedException { WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName); - while (workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName)) != STOPPED) { + while (workflowContext != null && + workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName)) != STOPPED) { log.info("Waiting for job {} to stop...", jobName); workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName); Thread.sleep(1000); } - // deleting the entire workflow, as one workflow contains only one job - new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, 10000L); + if (workflowContext != null) { Review Comment: Maybe add a comment for future readers to understand the subtlety of these null checks ########## gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java: ########## @@ -359,13 +369,16 @@ static void handleJobTimeout(String workFlowName, String jobName, HelixManager h private static void deleteStoppedHelixJob(HelixManager helixManager, String workFlowName, String jobName) throws InterruptedException { WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName); - while (workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName)) != STOPPED) { + while (workflowContext != null && + workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName)) != STOPPED) { log.info("Waiting for job {} to stop...", jobName); workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName); Thread.sleep(1000); } - // deleting the entire workflow, as one workflow contains only one job - new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, 10000L); + if (workflowContext != null) { + // deleting the entire workflow, as one workflow contains only one job + new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, 10000L); Review Comment: I am okay with hard coding this timeout value to 10 seconds since the deletion has typically happened very fast. But can we have a separate variable and corresponding comment that explains that 10 seconds should be plenty of time and we should expect deletes to happen a few seconds? Just to avoid magic numbers and better readability for future readers ########## gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java: ########## @@ -359,13 +369,16 @@ static void handleJobTimeout(String workFlowName, String jobName, HelixManager h private static void deleteStoppedHelixJob(HelixManager helixManager, String workFlowName, String jobName) throws InterruptedException { WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName); - while (workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName)) != STOPPED) { + while (workflowContext != null && + workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName)) != STOPPED) { log.info("Waiting for job {} to stop...", jobName); workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName); Thread.sleep(1000); } - // deleting the entire workflow, as one workflow contains only one job - new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, 10000L); + if (workflowContext != null) { Review Comment: These null checks are to make sure the program continues this code doesn't fail in the delete scenario right? ########## gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java: ########## @@ -323,6 +323,19 @@ static boolean isJobFinished(String workflowName, String jobName, HelixManager h } } + // Cancel the job by calling either Delete or Stop Helix API + public static void cancelWorkflow(String workflowName, HelixManager helixManager, long timeOut, boolean cancelByDelete) + throws InterruptedException { + TaskDriver taskDriver = new TaskDriver(helixManager); + if(cancelByDelete) { Review Comment: nit: whitespace in between the if and `(` We probably should add a linter 😵💫 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org