ZihanLi58 commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1244336768


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -360,7 +400,7 @@ private void waitForJobCompletion(String jobName) {
   }
 
   @Subscribe
-  public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent 
deleteJobArrival) throws InterruptedException {
+  public synchronized void 
handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) 
throws InterruptedException {

Review Comment:
   1. In handleUpdateJobConfigArrival, we call handleDeleteJobConfigArrival 
directly. So if you want to specifically reset it, remember to distinguish the 
two calls here (one is called by handleUpdateJobConfigArrival) and another is 
called directly. 
   2. Unless you change all the cancel APIs in our code to send a delete job 
message to trigger this method, then, in that case, resetting the timer can 
enable us to start a new job immediately, otherwise it does not make sense to 
achieve 1, as we don't explicitly delete anyway... 



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,35 @@ public void 
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, 
jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, 
jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
-        scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+        scheduleJob(jobProps, listener);
       } else {
         LOGGER.info("No job schedule found, so running job " + jobUri);
-        this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
-                                 new 
GobblinHelixJobLauncherListener(this.launcherMetrics)));
+        this.jobExecutor.execute(new NonScheduledJobRunner(jobProps, 
listener));
       }
     } catch (JobException je) {
       LOGGER.error("Failed to schedule or run job " + jobUri, je);
     }
   }
 
   @Subscribe
-  public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent 
updateJobArrival) {
+  public synchronized void 
handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {

Review Comment:
   @homatthew are we sure this change won't affect performance when those 
message-handling methods will be called frequently? (That's why initially I 
suggested having job level lock)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to