homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1243109243


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,35 @@ public void 
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, 
jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, 
jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
-        scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+        scheduleJob(jobProps, listener);
       } else {
         LOGGER.info("No job schedule found, so running job " + jobUri);
-        this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
-                                 new 
GobblinHelixJobLauncherListener(this.launcherMetrics)));
+        this.jobExecutor.execute(new NonScheduledJobRunner(jobProps, 
listener));
       }
     } catch (JobException je) {
       LOGGER.error("Failed to schedule or run job " + jobUri, je);
     }
   }
 
   @Subscribe
-  public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent 
updateJobArrival) {
+  public synchronized void 
handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + 
updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+
+    Instant nextSchedulableTime = 
jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.MIN);
+    if (this.isThrottleEnabled && 
clock.instant().isBefore(nextSchedulableTime)) {
+      LOGGER.info("Replanning is skipped for job {}. Current time is "
+          + clock.instant() + " and the next schedulable time would be "

Review Comment:
   clock.instant() should be using the `{}` syntax. Same for the 
nextSchedulable time. And instead of getting the value from the map, use the 
`nextSchedulableTime` variable



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +140,194 @@ public void setUp()
     this.thread.start();
   }
 
+  /***
+   * Time span exceeds throttle timeout, within same workflow, throttle is 
enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within same workflow, throttle is 
enabled
+   * Job will not be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+        true, true);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within same workflow, throttle is not 
enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within same workflow, throttle is 
not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within different workflow, throttle 
is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  /***
+   * Time span is within throttle timeout, within different workflow, throttle 
is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within different workflow, throttle 
is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within different workflow, throttle 
is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
   @Test
-  public void testNewJobAndUpdate()
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
       throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager 
helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+    java.nio.file.Path p = 
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + 
GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, 
java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, 
"1", workflowIdSuffix1);
-    
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
+    Config helixJobSchedulerConfig = 
ConfigFactory.empty().withValue("helix.job.scheduling.throttle.enabled",
+        ConfigValueFactory.fromAnyRef(isThrottleEnabled));
+    GobblinHelixJobScheduler gobblinHelixJobScheduler = new 
GobblinHelixJobScheduler(helixJobSchedulerConfig, helixManager, 
java.util.Optional.empty(),
+          new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog, clock);
+    return gobblinHelixJobScheduler;
+  }
 
+  private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties 
properties) {
+    
properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
     NewJobConfigArrivalEvent newJobConfigArrivalEvent =
-        new 
NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1);
-    jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
-    properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
-        "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) + 
workflowIdSuffix2);
-    Map<String, String> workflowIdMap;
-    this.helixManager.connect();
+        new 
NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties);
+    return newJobConfigArrivalEvent;
+  }
+
+  private void connectAndAssertWorkflowId(String expectedSuffix, 
NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager ) 
throws Exception {
+    helixManager.connect();
+    String workFlowId = getWorkflowID(newJobConfigArrivalEvent, helixManager);
+    Assert.assertNotNull(workFlowId);
+    Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+  }
 
-    String workFlowId = null;
+  private String getWorkflowID (NewJobConfigArrivalEvent 
newJobConfigArrivalEvent, HelixManager helixManager)
+      throws Exception {
+    // Poll helix for up to 30 seconds to fetch until a workflow with a 
matching job name exists in Helix and then return that workflowID
     long endTime = System.currentTimeMillis() + 30000;
+    Map<String, String> workflowIdMap;
     while (System.currentTimeMillis() < endTime) {
-      workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
-          Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
+      try{
+        workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(helixManager,
+            Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
+      } catch(GobblinHelixUnexpectedStateException e){
+        continue;
+      }
       if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
-        break;
+        return workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
       }
       Thread.sleep(100);
     }
-    Assert.assertNotNull(workFlowId);
-    Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
+    return null;
+  }
 
-    jobScheduler.handleUpdateJobConfigArrival(
-        new 
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1));
-    this.helixManager.connect();
-    endTime = System.currentTimeMillis() + 30000;
-    while (System.currentTimeMillis() < endTime) {
-      workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
-          Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
-      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
-        break;
-      }
-      Thread.sleep(100);
+  private void runWorkflowTest(Instant mockedTime, String jobSuffix,
+    String newJobWorkflowIdSuffix, String updateWorkflowIdSuffix,
+    String assertUpdateWorkflowIdSuffix, boolean isThrottleEnabled, boolean 
isSameWorkflow) throws Exception {
+    Clock mockClock = Mockito.mock(Clock.class);
+    AtomicInteger count = new AtomicInteger(0);
+    when(mockClock.instant()).thenAnswer(invocation -> count.getAndIncrement() 
== 0 ? beginTime : mockedTime);
+
+    // Use GobblinHelixManagerFactory instead of HelixManagerFactory to avoid 
the connection error
+    // helixManager is set to local variable to avoid the HelixManager 
(ZkClient) is not connected error across tests
+    HelixManager helixManager = GobblinHelixManagerFactory
+        .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+            zkConnectingString);
+    GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager, 
isThrottleEnabled, mockClock);
+    final Properties properties =
+        GobblinHelixJobLauncherTest.generateJobProperties(

Review Comment:
   Nit: was this meant to be on a new line? Seems like it would fit fine on the 
line above



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,74 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+@Slf4j
+public class GobblinThrottlingHelixJobLauncherListener extends 
GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = 
LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public 
GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics 
jobLauncherMetrics,
+      ConcurrentHashMap jobNameToNextSchedulableTime, Duration 
helixJobSchedulingThrottleTimeout, Clock clock) {

Review Comment:
   Shouldn't the it should be `ConcurrentHashMap<String, Instant>` instead of 
just `ConcurrentHashMap`?



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +140,194 @@ public void setUp()
     this.thread.start();
   }
 
+  /***
+   * Time span exceeds throttle timeout, within same workflow, throttle is 
enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within same workflow, throttle is 
enabled
+   * Job will not be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+        true, true);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within same workflow, throttle is not 
enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within same workflow, throttle is 
not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within different workflow, throttle 
is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  /***
+   * Time span is within throttle timeout, within different workflow, throttle 
is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within different workflow, throttle 
is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within different workflow, throttle 
is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
   @Test
-  public void testNewJobAndUpdate()
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
       throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager 
helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+    java.nio.file.Path p = 
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + 
GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, 
java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, 
"1", workflowIdSuffix1);
-    
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
+    Config helixJobSchedulerConfig = 
ConfigFactory.empty().withValue("helix.job.scheduling.throttle.enabled",

Review Comment:
   Use the 
`GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY` 
instead of the raw string value



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -360,7 +400,7 @@ private void waitForJobCompletion(String jobName) {
   }
 
   @Subscribe
-  public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent 
deleteJobArrival) throws InterruptedException {
+  public synchronized void 
handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) 
throws InterruptedException {

Review Comment:
   Super minor nit. Not sure if it's even worth implementing:
   
   Would we want to reset the `Instant` in the map to `Instant.EPOCH` if we 
delete a workflow? My understanding is that internally we don't use this delete 
job config method and only rely on update, so this wouldn't really affect our 
own use case.
   
   I am not sure which behavior is more intuitive:
   1. If I explicitly delete, I should be able to reschedule it and bypass the 
throttle time
   2. Regardless of if I deleted the old flow, the throttle time should prevent 
resubmission



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +140,194 @@ public void setUp()
     this.thread.start();
   }
 
+  /***
+   * Time span exceeds throttle timeout, within same workflow, throttle is 
enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within same workflow, throttle is 
enabled
+   * Job will not be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+        true, true);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within same workflow, throttle is not 
enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within same workflow, throttle is 
not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within different workflow, throttle 
is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  /***
+   * Time span is within throttle timeout, within different workflow, throttle 
is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within different workflow, throttle 
is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within different workflow, throttle 
is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
   @Test
-  public void testNewJobAndUpdate()
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
       throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager 
helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+    java.nio.file.Path p = 
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + 
GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, 
java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, 
"1", workflowIdSuffix1);
-    
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
+    Config helixJobSchedulerConfig = 
ConfigFactory.empty().withValue("helix.job.scheduling.throttle.enabled",
+        ConfigValueFactory.fromAnyRef(isThrottleEnabled));
+    GobblinHelixJobScheduler gobblinHelixJobScheduler = new 
GobblinHelixJobScheduler(helixJobSchedulerConfig, helixManager, 
java.util.Optional.empty(),
+          new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog, clock);
+    return gobblinHelixJobScheduler;
+  }
 
+  private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties 
properties) {
+    
properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
     NewJobConfigArrivalEvent newJobConfigArrivalEvent =
-        new 
NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1);
-    jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
-    properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
-        "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) + 
workflowIdSuffix2);
-    Map<String, String> workflowIdMap;
-    this.helixManager.connect();
+        new 
NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties);
+    return newJobConfigArrivalEvent;
+  }
+
+  private void connectAndAssertWorkflowId(String expectedSuffix, 
NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager ) 
throws Exception {

Review Comment:
   Random question, but why do we use `NewJobConfigArrivalEvent` instead of 
just passing a string job name? There were some places in the code where we 
constructed a brand new `NewJobConfigArrivalEvent` just to pass it into this 
method.



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,35 @@ public void 
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, 
jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, 
jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
-        scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+        scheduleJob(jobProps, listener);
       } else {
         LOGGER.info("No job schedule found, so running job " + jobUri);
-        this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
-                                 new 
GobblinHelixJobLauncherListener(this.launcherMetrics)));
+        this.jobExecutor.execute(new NonScheduledJobRunner(jobProps, 
listener));
       }
     } catch (JobException je) {
       LOGGER.error("Failed to schedule or run job " + jobUri, je);
     }
   }
 
   @Subscribe
-  public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent 
updateJobArrival) {
+  public synchronized void 
handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + 
updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+
+    Instant nextSchedulableTime = 
jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.MIN);

Review Comment:
   Random question, but is there a reason we use `Instant.min` as the default 
value here and `Instant.EPOCH` as the placeholder elsewhere?



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -303,7 +332,7 @@ public Object get(long timeout, TimeUnit unit) throws 
InterruptedException, Exec
   }
 
   @Subscribe
-  public void handleNewJobConfigArrival(NewJobConfigArrivalEvent 
newJobArrival) {
+  public synchronized void handleNewJobConfigArrival(NewJobConfigArrivalEvent 
newJobArrival) {

Review Comment:
   I see a race condition here. Although only 1 thread can access this method 
at a time. The scheduling itself is still async. I.e. the `onJobPrepare` method 
is called async.
   
   Suppose we have 2 threads `T1` and `T2`. And consider the following 
interleaving:
   ```
   Synchronous:
     T1 calls handlenewJobConfigArrival
     T1 submits listener to executor
     T1 finishes handlenewJobConfigArrival
   Synchronous:
     T2 calls handlenewJobConfigArrival
     T2 submits listener to executor
     T2 finishes handlenewJobConfigArrival
   
   Executor picks up runnable submitted by T1
   Executor picks up runnable submitted by T2
   ```
   
   This would fail to throttle since both were submitted to the executor. We 
should make sure to update the map before submitting and if there is a 
jobException we should reset the time exactly the same way done in the 
listener. 



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -360,7 +400,7 @@ private void waitForJobCompletion(String jobName) {
   }
 
   @Subscribe
-  public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent 
deleteJobArrival) throws InterruptedException {
+  public synchronized void 
handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) 
throws InterruptedException {

Review Comment:
   The current behavior is (2). And to make the behavior (1), we would:
   1. Store the current time in the map, 
   2. Set the value in the map to `Instant.EPOCH` 
   3. If there is a job exception we reset the value back to the original value 
that was in the map
   
   The delete operations are synchronous and the method is `synchronized`, so 
this approach would be thread safe



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to