homatthew commented on code in PR #3546:
URL: https://github.com/apache/gobblin/pull/3546#discussion_r956272277
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -431,7 +431,7 @@ private void
cancelJobIfRequired(DeleteJobConfigArrivalEvent deleteJobArrival) t
if (jobNameToWorkflowIdMap.containsKey(deleteJobArrival.getJobName())) {
String workflowId =
jobNameToWorkflowIdMap.get(deleteJobArrival.getJobName());
TaskDriver taskDriver = new TaskDriver(this.jobHelixManager);
- taskDriver.waitToStop(workflowId, this.helixJobStopTimeoutMillis);
+ taskDriver.stop(workflowId);
Review Comment:
Based on
https://github.com/apache/gobblin/blob/master/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java#L258
> One of our customer wants an option to cancel a helix job.
>As we discussed the other day, we have switched from using job queue to
work flow and using expiry time for auto cleanup.
>However, on a cancel request, we cannot STOP the workflow as STOPPED is not
a final state and will prevent the cleanup.
> So would you suggest that we should keep using the work around new
TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, 10000L) to
clean the jobs after they are stopped?
>> My understanding is that Gobblin is no longer using JobQueues. It is
using workflows, which don't and cannot be stopped. That is why we closed it.
Let me know if you have any other questions.
It seems like we use this timeout in other places too, particularly in the
helix utils. Do we know why we were using timeouts like this in the first place
instead of this async wait approach?
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java:
##########
@@ -188,7 +184,7 @@ private Properties generateJobProperties(Config baseConfig,
String jobNameSuffix
// expiry time should be more than the time needed for the job to complete
// otherwise JobContext will become null. This is how Helix work flow
works.
-
properties.setProperty(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
"5");
+
properties.setProperty(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
"15");
Review Comment:
What does adjusting this value do? This key corresponds to:
-
https://github.com/apache/helix/blob/386a77d566f1dc0b480c3bcbdb4a2880a8b8a4a9/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java#L409
I have 2 questions:
1. When stopping a workflow, I think we go from running->stopped and then we
delete the workflow (i.e. it never reaches complete state). So will this value
ever be used?
2. If it does do a cleanup, then does increasing the value address our
issue? This code changes says that it will clean up the workflow 15 minutes
after entering `completed` state instead of 5. But the original issue was that
the workflow still existed (after 10 minutes), so the replanner skipped
creating a new workflow.
tldr; What does this value do if we are manually deleting the workflow? When
does helix need to clean up the workflow?
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java:
##########
@@ -188,7 +184,7 @@ private Properties generateJobProperties(Config baseConfig,
String jobNameSuffix
// expiry time should be more than the time needed for the job to complete
// otherwise JobContext will become null. This is how Helix work flow
works.
-
properties.setProperty(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
"5");
+
properties.setProperty(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
"15");
Review Comment:
My understanding of Gobblin helix workflow termination (please correct me if
there are misunderstandings):
1. call stop asynchronously
a. helix sets the workflow to stop
b. helix job launcher is waiting for state to reach stopped
2. When job launcher sees state is stopped, delete workflow (current timeout
is 10 seconds)
a.
https://github.com/apache/gobblin/blob/8c9c8a84ed23c0215c4d80125ac532e97085d76f/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java#L253
b.
https://github.com/apache/gobblin/blob/8c9c8a84ed23c0215c4d80125ac532e97085d76f/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java#L365
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -431,7 +431,7 @@ private void
cancelJobIfRequired(DeleteJobConfigArrivalEvent deleteJobArrival) t
if (jobNameToWorkflowIdMap.containsKey(deleteJobArrival.getJobName())) {
String workflowId =
jobNameToWorkflowIdMap.get(deleteJobArrival.getJobName());
TaskDriver taskDriver = new TaskDriver(this.jobHelixManager);
- taskDriver.waitToStop(workflowId, this.helixJobStopTimeoutMillis);
+ taskDriver.stop(workflowId);
Review Comment:
Main issue is that using a timeout will cause an exception to be thrown and
then the delete never happens right? That portion of the change LGTM
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java:
##########
@@ -175,7 +171,7 @@ public void run() {
this.thread.start();
}
- private Properties generateJobProperties(Config baseConfig, String
jobNameSuffix, String jobIdSuffix) {
+ static public Properties generateJobProperties(Config baseConfig, String
jobNameSuffix, String jobIdSuffix) {
Review Comment:
You made this `static public` for testing right? If so, nit: you can make it
package private and use also use `visiblefortesting` annotation to make that
clearer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]