[
https://issues.apache.org/jira/browse/GOBBLIN-1840?focusedWorklogId=867863&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-867863
]
ASF GitHub Bot logged work on GOBBLIN-1840:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 27/Jun/23 20:53
Start Date: 27/Jun/23 20:53
Worklog Time Spent: 10m
Work Description: ZihanLi58 commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1244336768
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -360,7 +400,7 @@ private void waitForJobCompletion(String jobName) {
}
@Subscribe
- public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent
deleteJobArrival) throws InterruptedException {
+ public synchronized void
handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival)
throws InterruptedException {
Review Comment:
1. In handleUpdateJobConfigArrival, we call handleDeleteJobConfigArrival
directly. So if you want to specifically reset it, remember to distinguish the
two calls here (one is called by handleUpdateJobConfigArrival) and another is
called directly.
2. Unless you change all the cancel APIs in our code to send a delete job
message to trigger this method, then, in that case, resetting the timer can
enable us to start a new job immediately, otherwise it does not make sense to
achieve 1, as we don't explicitly delete anyway...
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,35 @@ public void
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI,
jobUri);
this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+ GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+ new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics,
jobNameToNextSchedulableTime,
+ jobSchedulingThrottleTimeout, clock)
+ : new GobblinHelixJobLauncherListener(this.launcherMetrics);
if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
LOGGER.info("Scheduling job " + jobUri);
- scheduleJob(jobProps,
- new GobblinHelixJobLauncherListener(this.launcherMetrics));
+ scheduleJob(jobProps, listener);
} else {
LOGGER.info("No job schedule found, so running job " + jobUri);
- this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
- new
GobblinHelixJobLauncherListener(this.launcherMetrics)));
+ this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
listener));
}
} catch (JobException je) {
LOGGER.error("Failed to schedule or run job " + jobUri, je);
}
}
@Subscribe
- public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent
updateJobArrival) {
+ public synchronized void
handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
Review Comment:
@homatthew are we sure this change won't affect performance when those
message-handling methods will be called frequently? (That's why initially I
suggested having job level lock)
Issue Time Tracking
-------------------
Worklog Id: (was: 867863)
Time Spent: 3h 10m (was: 3h)
> Helix Job scheduler should not try to replace running workflow if within
> configured time
> ----------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1840
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1840
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Matthew Ho
> Priority: Major
> Time Spent: 3h 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)