[ 
https://issues.apache.org/jira/browse/GOBBLIN-1840?focusedWorklogId=865899&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-865899
 ]

ASF GitHub Bot logged work on GOBBLIN-1840:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Jun/23 23:42
            Start Date: 15/Jun/23 23:42
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1231628803


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void 
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
   @Subscribe
   public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent 
updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + 
updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+    boolean isThrottleEnabled = 
PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),
+        
GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+        
String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY));
+
+    if (isThrottleEnabled && this.jobNameToStartTimeMap.containsKey(jobName)) {
+      Instant jobStartTime = this.jobNameToStartTimeMap.get(jobName);
+      Duration workflowRunningDuration = Duration.between(jobStartTime, 
Instant.now());
+      if 
(workflowRunningDuration.minus(throttleTimeoutDurationSecs).isNegative()) {
+        LOGGER.info("Replanning is skipped for job {} ", jobName);

Review Comment:
   nit: can add "skipped due to job being started close to another start" or 
"due to scheduling throttle enabled..."



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
   @Test
   public void testNewJobAndUpdate()
       throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, 
CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+
+      
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
 "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, 
helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, 
CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodThrottle", 
workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+
+      
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
 "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, 
CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowLongPeriodNoThrottle", 
workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, 
helixManager);
+
+      
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
 "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1));
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, 
helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()

Review Comment:
   nice work in updating all the tests



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -52,16 +55,23 @@
 import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
 import org.apache.gobblin.scheduler.SchedulerService;
 
+import static org.mockito.Mockito.*;
+
 
 /**
  * Unit tests for {@link org.apache.gobblin.cluster.GobblinHelixJobScheduler}.
  *
  */
+
+/**
+ * In all test cases, we use HelixManager as a local variable to avoid

Review Comment:
   nice helpful description, let's put this with the other comment above in one 
block





Issue Time Tracking
-------------------

    Worklog Id:     (was: 865899)
    Time Spent: 2.5h  (was: 2h 20m)

> Helix Job scheduler should not try to replace running workflow if within 
> configured time
> ----------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1840
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1840
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Matthew Ho
>            Priority: Major
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to