homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1244470463


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -522,7 +534,7 @@ public void run() {
         
GobblinHelixJobScheduler.this.jobSchedulerMetrics.updateTimeBetweenJobSchedulingAndJobLaunching(this.creationTimeInMillis,
 System.currentTimeMillis());
         GobblinHelixJobScheduler.this.runJob(this.jobProps, this.jobListener);
       } catch (JobException je) {
-        LOGGER.error("Failed to run job " + 
this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
+        LOGGER.error("Failed to schedule or run job to run job " + 
this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);

Review Comment:
   Typo / wording. `schedule or run job to run job`



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +140,192 @@ public void setUp()
     this.thread.start();
   }
 
+  /***
+   * Time span exceeds throttle timeout, within same workflow, throttle is 
enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within same workflow, throttle is 
enabled
+   * Job will not be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+        true, true);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within same workflow, throttle is not 
enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within same workflow, throttle is 
not enabled
+   * Job will be updated
+   * @throws Exception
+   */
   @Test
-  public void testNewJobAndUpdate()
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within different workflow, throttle 
is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
       throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  /***
+   * Time span is within throttle timeout, within different workflow, throttle 
is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within different workflow, throttle 
is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within different workflow, throttle 
is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager 
helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+    java.nio.file.Path p = 
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + 
GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, 
java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, 
"1", workflowIdSuffix1);
-    
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
+    Config helixJobSchedulerConfig = 
ConfigFactory.empty().withValue(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+        ConfigValueFactory.fromAnyRef(isThrottleEnabled));
+    GobblinHelixJobScheduler gobblinHelixJobScheduler = new 
GobblinHelixJobScheduler(helixJobSchedulerConfig, helixManager, 
java.util.Optional.empty(),
+          new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog, clock);
+    return gobblinHelixJobScheduler;
+  }
 
+  private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties 
properties) {
+    
properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
     NewJobConfigArrivalEvent newJobConfigArrivalEvent =
-        new 
NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1);
-    jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
-    properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
-        "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) + 
workflowIdSuffix2);
-    Map<String, String> workflowIdMap;
-    this.helixManager.connect();
+        new 
NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties);
+    return newJobConfigArrivalEvent;
+  }
+
+  private void connectAndAssertWorkflowId(String expectedSuffix, String 
jobName, HelixManager helixManager) throws Exception {
+    helixManager.connect();
+    String workFlowId = getWorkflowID(jobName, helixManager);
+    Assert.assertNotNull(workFlowId);
+    Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+  }
 
-    String workFlowId = null;
+  private String getWorkflowID (String jobName, HelixManager helixManager)
+      throws Exception {
+    // Poll helix for up to 30 seconds to fetch until a workflow with a 
matching job name exists in Helix and then return that workflowID
     long endTime = System.currentTimeMillis() + 30000;
+    Map<String, String> workflowIdMap;
     while (System.currentTimeMillis() < endTime) {
-      workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
-          Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
-      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
-        break;
+      try{
+        workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(helixManager,
+            Collections.singletonList(jobName));
+      } catch(GobblinHelixUnexpectedStateException e){
+        continue;
+      }
+      if (workflowIdMap.containsKey(jobName)) {
+        return workflowIdMap.get(jobName);
       }
       Thread.sleep(100);
     }
-    Assert.assertNotNull(workFlowId);
-    Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
+    return null;
+  }
 
-    jobScheduler.handleUpdateJobConfigArrival(
-        new 
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1));
-    this.helixManager.connect();
-    endTime = System.currentTimeMillis() + 30000;
-    while (System.currentTimeMillis() < endTime) {
-      workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
-          Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
-      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
-        break;
-      }
-      Thread.sleep(100);
+  private void runWorkflowTest(Duration mockedPeriod, String jobSuffix,
+    String newJobWorkflowIdSuffix, String updateWorkflowIdSuffix,
+    String assertUpdateWorkflowIdSuffix, boolean isThrottleEnabled, boolean 
isSameWorkflow) throws Exception {
+    Clock mockClock = Mockito.mock(Clock.class);
+    AtomicReference<Instant> nextInstant = new AtomicReference<>(beginTime);
+    when(mockClock.instant()).thenAnswer(invocation -> 
nextInstant.getAndAccumulate(nextInstant.get(), (currentInstant, x) -> 
currentInstant.plus(mockedPeriod)));

Review Comment:
   nextInstant.get() is not used and is just a placeholder right? Since it 
seems like just a placeholder value you can use something like null



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to