ZihanLi58 commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1244336768
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -360,7 +400,7 @@ private void waitForJobCompletion(String jobName) {
}
@Subscribe
- public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent
deleteJobArrival) throws InterruptedException {
+ public synchronized void
handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival)
throws InterruptedException {
Review Comment:
1. In handleUpdateJobConfigArrival, we call handleDeleteJobConfigArrival
directly. So if you want to specifically reset it, remember to distinguish the
two calls here (one is called by handleUpdateJobConfigArrival) and another is
called directly.
2. Unless you change all the cancel APIs in our code to send a delete job
message to trigger this method, then, in that case, resetting the timer can
enable us to start a new job immediately, otherwise it does not make sense to
achieve 1, as we don't explicitly delete anyway...
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,35 @@ public void
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI,
jobUri);
this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+ GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+ new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics,
jobNameToNextSchedulableTime,
+ jobSchedulingThrottleTimeout, clock)
+ : new GobblinHelixJobLauncherListener(this.launcherMetrics);
if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
LOGGER.info("Scheduling job " + jobUri);
- scheduleJob(jobProps,
- new GobblinHelixJobLauncherListener(this.launcherMetrics));
+ scheduleJob(jobProps, listener);
} else {
LOGGER.info("No job schedule found, so running job " + jobUri);
- this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
- new
GobblinHelixJobLauncherListener(this.launcherMetrics)));
+ this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
listener));
}
} catch (JobException je) {
LOGGER.error("Failed to schedule or run job " + jobUri, je);
}
}
@Subscribe
- public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent
updateJobArrival) {
+ public synchronized void
handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
Review Comment:
@homatthew are we sure this change won't affect performance when those
message-handling methods will be called frequently? (That's why initially I
suggested having job level lock)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]