[
https://issues.apache.org/jira/browse/GOBBLIN-1721?focusedWorklogId=818126&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-818126
]
ASF GitHub Bot logged work on GOBBLIN-1721:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 18/Oct/22 19:46
Start Date: 18/Oct/22 19:46
Worklog Time Spent: 10m
Work Description: homatthew commented on code in PR #3580:
URL: https://github.com/apache/gobblin/pull/3580#discussion_r997652378
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java:
##########
@@ -359,13 +369,16 @@ static void handleJobTimeout(String workFlowName, String
jobName, HelixManager h
private static void deleteStoppedHelixJob(HelixManager helixManager, String
workFlowName, String jobName)
throws InterruptedException {
WorkflowContext workflowContext =
TaskDriver.getWorkflowContext(helixManager, workFlowName);
- while
(workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName,
jobName)) != STOPPED) {
+ while (workflowContext != null &&
+
workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName,
jobName)) != STOPPED) {
log.info("Waiting for job {} to stop...", jobName);
workflowContext = TaskDriver.getWorkflowContext(helixManager,
workFlowName);
Thread.sleep(1000);
}
- // deleting the entire workflow, as one workflow contains only one job
- new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName,
10000L);
+ if (workflowContext != null) {
Review Comment:
Maybe add a comment for future readers to understand the subtlety of these
null checks
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java:
##########
@@ -359,13 +369,16 @@ static void handleJobTimeout(String workFlowName, String
jobName, HelixManager h
private static void deleteStoppedHelixJob(HelixManager helixManager, String
workFlowName, String jobName)
throws InterruptedException {
WorkflowContext workflowContext =
TaskDriver.getWorkflowContext(helixManager, workFlowName);
- while
(workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName,
jobName)) != STOPPED) {
+ while (workflowContext != null &&
+
workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName,
jobName)) != STOPPED) {
log.info("Waiting for job {} to stop...", jobName);
workflowContext = TaskDriver.getWorkflowContext(helixManager,
workFlowName);
Thread.sleep(1000);
}
- // deleting the entire workflow, as one workflow contains only one job
- new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName,
10000L);
+ if (workflowContext != null) {
+ // deleting the entire workflow, as one workflow contains only one job
+ new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName,
10000L);
Review Comment:
I am okay with hard coding this timeout value to 10 seconds since the
deletion has typically happened very fast. But can we have a separate variable
and corresponding comment that explains that 10 seconds should be plenty of
time and we should expect deletes to happen a few seconds?
Just to avoid magic numbers and better readability for future readers
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java:
##########
@@ -359,13 +369,16 @@ static void handleJobTimeout(String workFlowName, String
jobName, HelixManager h
private static void deleteStoppedHelixJob(HelixManager helixManager, String
workFlowName, String jobName)
throws InterruptedException {
WorkflowContext workflowContext =
TaskDriver.getWorkflowContext(helixManager, workFlowName);
- while
(workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName,
jobName)) != STOPPED) {
+ while (workflowContext != null &&
+
workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName,
jobName)) != STOPPED) {
log.info("Waiting for job {} to stop...", jobName);
workflowContext = TaskDriver.getWorkflowContext(helixManager,
workFlowName);
Thread.sleep(1000);
}
- // deleting the entire workflow, as one workflow contains only one job
- new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName,
10000L);
+ if (workflowContext != null) {
Review Comment:
These null checks are to make sure the program continues this code doesn't
fail in the delete scenario right?
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java:
##########
@@ -323,6 +323,19 @@ static boolean isJobFinished(String workflowName, String
jobName, HelixManager h
}
}
+ // Cancel the job by calling either Delete or Stop Helix API
+ public static void cancelWorkflow(String workflowName, HelixManager
helixManager, long timeOut, boolean cancelByDelete)
+ throws InterruptedException {
+ TaskDriver taskDriver = new TaskDriver(helixManager);
+ if(cancelByDelete) {
Review Comment:
nit: whitespace in between the if and `(`
We probably should add a linter 😵💫
Issue Time Tracking
-------------------
Worklog Id: (was: 818126)
Time Spent: 1h (was: 50m)
> Give option to cancel helix workflow through Delete API to avoid job hanging
> ----------------------------------------------------------------------------
>
> Key: GOBBLIN-1721
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1721
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-cluster
> Reporter: Hanghang Liu
> Assignee: Hung Tran
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
> Currently when we receive a job restart(handleUpdateJobConfigArrival),
> GobblinHelixJobLauncher will firstly call helixTaskDriver.waitToStop to stop
> the workflow, then initiate the new one. We observe the behavior of Helix
> taking exceptionally long to stop the workflow, making the job state staying
> in STOPPING status. This will make waitToStop timeout and throw exception all
> the time, making the new flow never be able to launch.
> We can utilize Delete API in this case since our job is stateless for Helix,
> to avoid job hanging.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)