homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1244481162


##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +140,192 @@ public void setUp()
     this.thread.start();
   }
 
+  /***
+   * Time span exceeds throttle timeout, within same workflow, throttle is 
enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within same workflow, throttle is 
enabled
+   * Job will not be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+        true, true);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within same workflow, throttle is not 
enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within same workflow, throttle is 
not enabled
+   * Job will be updated
+   * @throws Exception
+   */
   @Test
-  public void testNewJobAndUpdate()
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within different workflow, throttle 
is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
       throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  /***
+   * Time span is within throttle timeout, within different workflow, throttle 
is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within different workflow, throttle 
is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within different workflow, throttle 
is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager 
helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+    java.nio.file.Path p = 
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + 
GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, 
java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, 
"1", workflowIdSuffix1);
-    
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
+    Config helixJobSchedulerConfig = 
ConfigFactory.empty().withValue(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+        ConfigValueFactory.fromAnyRef(isThrottleEnabled));
+    GobblinHelixJobScheduler gobblinHelixJobScheduler = new 
GobblinHelixJobScheduler(helixJobSchedulerConfig, helixManager, 
java.util.Optional.empty(),
+          new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog, clock);
+    return gobblinHelixJobScheduler;
+  }
 
+  private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties 
properties) {
+    
properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
     NewJobConfigArrivalEvent newJobConfigArrivalEvent =
-        new 
NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1);
-    jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
-    properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
-        "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) + 
workflowIdSuffix2);
-    Map<String, String> workflowIdMap;
-    this.helixManager.connect();
+        new 
NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties);
+    return newJobConfigArrivalEvent;
+  }
+
+  private void connectAndAssertWorkflowId(String expectedSuffix, String 
jobName, HelixManager helixManager) throws Exception {
+    helixManager.connect();
+    String workFlowId = getWorkflowID(jobName, helixManager);
+    Assert.assertNotNull(workFlowId);
+    Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+  }
 
-    String workFlowId = null;
+  private String getWorkflowID (String jobName, HelixManager helixManager)
+      throws Exception {
+    // Poll helix for up to 30 seconds to fetch until a workflow with a 
matching job name exists in Helix and then return that workflowID
     long endTime = System.currentTimeMillis() + 30000;
+    Map<String, String> workflowIdMap;
     while (System.currentTimeMillis() < endTime) {
-      workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
-          Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
-      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
-        break;
+      try{
+        workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(helixManager,
+            Collections.singletonList(jobName));
+      } catch(GobblinHelixUnexpectedStateException e){
+        continue;
+      }
+      if (workflowIdMap.containsKey(jobName)) {
+        return workflowIdMap.get(jobName);
       }
       Thread.sleep(100);
     }
-    Assert.assertNotNull(workFlowId);
-    Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
+    return null;
+  }
 
-    jobScheduler.handleUpdateJobConfigArrival(
-        new 
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1));
-    this.helixManager.connect();
-    endTime = System.currentTimeMillis() + 30000;
-    while (System.currentTimeMillis() < endTime) {
-      workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
-          Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
-      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
-        break;
-      }
-      Thread.sleep(100);
+  private void runWorkflowTest(Duration mockedPeriod, String jobSuffix,

Review Comment:
   A java doc that describes what these variables are so that future people can 
use this method would be helpful. 
   
   Also, mockedPeriod is a bit of a weird name. Since you're now using it to 
represent the step amount each time clock.instant() is called. 
   
   Is this really necessary? In your original implementation it was just 
returning a final Instant defined at the beginning which was a bit easier to 
reason about. But now we sort of rely on how many times clock.instant() is 
called to know what the current time is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to