[ 
https://issues.apache.org/jira/browse/GOBBLIN-1840?focusedWorklogId=865824&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-865824
 ]

ASF GitHub Bot logged work on GOBBLIN-1840:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Jun/23 16:49
            Start Date: 15/Jun/23 16:49
    Worklog Time Spent: 10m 
      Work Description: Peiyingy commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1231302778


##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
   @Test
   public void testNewJobAndUpdate()
       throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, 
CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+
+      
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
 "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, 
helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, 
CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodThrottle", 
workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+
+      
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
 "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, 
CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowLongPeriodNoThrottle", 
workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+
+      
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
 "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1));
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, 
helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, 
CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodNoThrottle", 
workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+
+      
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
 "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1));
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, 
helixManager);
+    }
+  }
+
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, 
CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodThrottle1", 
workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodThrottle2", 
workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new 
NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties2);
+      
properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
 "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, 
helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, 
CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodNoThrottle1", 
workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodNoThrottle2", 
workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new 
NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties2);
+      
properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
 "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, 
helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, 
CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodThrottle1", 
workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodThrottle2", 
workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new 
NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties2);
+      
properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
 "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, 
helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, 
CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodNoThrottle1", 
workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodNoThrottle2", 
workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new 
NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties2);
+      
properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
 "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, 
helixManager);
+    }
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager 
helixManager) throws Exception {
+    java.nio.file.Path p = 
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + 
GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, 
java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, 
"1", workflowIdSuffix1);
-    
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
+    return new GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, 
java.util.Optional.empty(),
+        new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog);
+  }
 
+  private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties 
properties, String suffix) {
+    
properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
     NewJobConfigArrivalEvent newJobConfigArrivalEvent =
-        new 
NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1);
-    jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
-    properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
-        "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) + 
workflowIdSuffix2);
-    Map<String, String> workflowIdMap;
-    this.helixManager.connect();
+        new 
NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties);
+    properties.setProperty(ConfigurationKeys.JOB_ID_KEY,
+        "job_" + properties.getProperty(ConfigurationKeys.JOB_NAME_KEY) + 
suffix);
+    return newJobConfigArrivalEvent;
+  }
 
-    String workFlowId = null;
-    long endTime = System.currentTimeMillis() + 30000;
-    while (System.currentTimeMillis() < endTime) {
-      workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
-          Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
-      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
-        break;
-      }
-      Thread.sleep(100);
-    }
+  private void connectAndAssertWorkflowId(String expectedSuffix, 
NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager ) 
throws Exception {
+    helixManager.connect();
+    String workFlowId = getWorkflowID(newJobConfigArrivalEvent, helixManager);
     Assert.assertNotNull(workFlowId);
-    Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
+    Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+  }
 
-    jobScheduler.handleUpdateJobConfigArrival(
-        new 
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1));
-    this.helixManager.connect();
-    endTime = System.currentTimeMillis() + 30000;
+  private String getWorkflowID (NewJobConfigArrivalEvent 
newJobConfigArrivalEvent, HelixManager helixManager )
+      throws Exception {
+    long endTime = System.currentTimeMillis() + 30000;

Review Comment:
   The endTime setting is based on the original testNewJobAndUpdate approach. 
Maybe I should add a comment here to explain this hardcoded time is the period 
we allow workflowIdMap to fetch workflowIdMap before timeout?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 865824)
    Time Spent: 40m  (was: 0.5h)

> Helix Job scheduler should not try to replace running workflow if within 
> configured time
> ----------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1840
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1840
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Matthew Ho
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to