[
https://issues.apache.org/jira/browse/GOBBLIN-1840?focusedWorklogId=865899&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-865899
]
ASF GitHub Bot logged work on GOBBLIN-1840:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 15/Jun/23 23:42
Start Date: 15/Jun/23 23:42
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1231628803
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
@Subscribe
public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent
updateJobArrival) {
LOGGER.info("Received update for job configuration of job " +
updateJobArrival.getJobName());
+ String jobName = updateJobArrival.getJobName();
+ boolean isThrottleEnabled =
PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),
+
GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+
String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY));
+
+ if (isThrottleEnabled && this.jobNameToStartTimeMap.containsKey(jobName)) {
+ Instant jobStartTime = this.jobNameToStartTimeMap.get(jobName);
+ Duration workflowRunningDuration = Duration.between(jobStartTime,
Instant.now());
+ if
(workflowRunningDuration.minus(throttleTimeoutDurationSecs).isNegative()) {
+ LOGGER.info("Replanning is skipped for job {} ", jobName);
Review Comment:
nit: can add "skipped due to job being started close to another start" or
"due to scheduling throttle enabled..."
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
@Test
public void testNewJobAndUpdate()
throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"true");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
+
+ connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateSameWorkflowShortPeriodThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateSameWorkflowShortPeriodThrottle",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"true");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
+
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateSameWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateSameWorkflowLongPeriodNoThrottle",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"false");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
+ connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateSameWorkflowShortPeriodNoThrottle()
Review Comment:
nice work in updating all the tests
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -52,16 +55,23 @@
import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
import org.apache.gobblin.scheduler.SchedulerService;
+import static org.mockito.Mockito.*;
+
/**
* Unit tests for {@link org.apache.gobblin.cluster.GobblinHelixJobScheduler}.
*
*/
+
+/**
+ * In all test cases, we use HelixManager as a local variable to avoid
Review Comment:
nice helpful description, let's put this with the other comment above in one
block
Issue Time Tracking
-------------------
Worklog Id: (was: 865899)
Time Spent: 2.5h (was: 2h 20m)
> Helix Job scheduler should not try to replace running workflow if within
> configured time
> ----------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1840
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1840
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Matthew Ho
> Priority: Major
> Time Spent: 2.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)