umustafi commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1231364395


##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
   @Test
   public void testNewJobAndUpdate()
       throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, 
CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+
+      
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
 "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, 
helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, 
CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodThrottle", 
workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+
+      
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
 "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, 
CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowLongPeriodNoThrottle", 
workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+
+      
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
 "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1));
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, 
helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, 
CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodNoThrottle", 
workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+
+      
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
 "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1));
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, 
helixManager);
+    }
+  }
+
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, 
CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodThrottle1", 
workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodThrottle2", 
workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new 
NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties2);
+      
properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
 "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, 
helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, 
CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodNoThrottle1", 
workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodNoThrottle2", 
workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new 
NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties2);
+      
properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
 "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, 
helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, 
CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodThrottle1", 
workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodThrottle2", 
workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new 
NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties2);
+      
properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
 "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, 
helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, 
CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodNoThrottle1", 
workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodNoThrottle2", 
workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new 
NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties2);
+      
properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
 "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, 
helixManager);
+    }
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager 
helixManager) throws Exception {
+    java.nio.file.Path p = 
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + 
GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, 
java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, 
"1", workflowIdSuffix1);
-    
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
+    return new GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, 
java.util.Optional.empty(),
+        new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog);
+  }
 
+  private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties 
properties, String suffix) {
+    
properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
     NewJobConfigArrivalEvent newJobConfigArrivalEvent =
-        new 
NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1);
-    jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
-    properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
-        "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) + 
workflowIdSuffix2);
-    Map<String, String> workflowIdMap;
-    this.helixManager.connect();
+        new 
NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties);
+    properties.setProperty(ConfigurationKeys.JOB_ID_KEY,
+        "job_" + properties.getProperty(ConfigurationKeys.JOB_NAME_KEY) + 
suffix);
+    return newJobConfigArrivalEvent;
+  }
 
-    String workFlowId = null;
-    long endTime = System.currentTimeMillis() + 30000;
-    while (System.currentTimeMillis() < endTime) {
-      workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
-          Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
-      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
-        break;
-      }
-      Thread.sleep(100);
-    }
+  private void connectAndAssertWorkflowId(String expectedSuffix, 
NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager ) 
throws Exception {
+    helixManager.connect();
+    String workFlowId = getWorkflowID(newJobConfigArrivalEvent, helixManager);
     Assert.assertNotNull(workFlowId);
-    Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
+    Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+  }
 
-    jobScheduler.handleUpdateJobConfigArrival(
-        new 
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1));
-    this.helixManager.connect();
-    endTime = System.currentTimeMillis() + 30000;
+  private String getWorkflowID (NewJobConfigArrivalEvent 
newJobConfigArrivalEvent, HelixManager helixManager )
+      throws Exception {
+    long endTime = System.currentTimeMillis() + 30000;

Review Comment:
   Yes, that will be good to explain why this value was chosen!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to