umustafi commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1231628803


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void 
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
   @Subscribe
   public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent 
updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + 
updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+    boolean isThrottleEnabled = 
PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),
+        
GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+        
String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY));
+
+    if (isThrottleEnabled && this.jobNameToStartTimeMap.containsKey(jobName)) {
+      Instant jobStartTime = this.jobNameToStartTimeMap.get(jobName);
+      Duration workflowRunningDuration = Duration.between(jobStartTime, 
Instant.now());
+      if 
(workflowRunningDuration.minus(throttleTimeoutDurationSecs).isNegative()) {
+        LOGGER.info("Replanning is skipped for job {} ", jobName);

Review Comment:
   nit: can add "skipped due to job being started close to another start" or 
"due to scheduling throttle enabled..."



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
   @Test
   public void testNewJobAndUpdate()
       throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, 
CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+
+      
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
 "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, 
helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, 
CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodThrottle", 
workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+
+      
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
 "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, 
CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowLongPeriodNoThrottle", 
workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+
+      
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
 "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1));
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, 
helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()

Review Comment:
   nice work in updating all the tests



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -52,16 +55,23 @@
 import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
 import org.apache.gobblin.scheduler.SchedulerService;
 
+import static org.mockito.Mockito.*;
+
 
 /**
  * Unit tests for {@link org.apache.gobblin.cluster.GobblinHelixJobScheduler}.
  *
  */
+
+/**
+ * In all test cases, we use HelixManager as a local variable to avoid

Review Comment:
   nice helpful description, let's put this with the other comment above in one 
block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to