[
https://issues.apache.org/jira/browse/GOBBLIN-1840?focusedWorklogId=865838&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-865838
]
ASF GitHub Bot logged work on GOBBLIN-1840:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 15/Jun/23 17:53
Start Date: 15/Jun/23 17:53
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1231364395
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
@Test
public void testNewJobAndUpdate()
throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"true");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
+
+ connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateSameWorkflowShortPeriodThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateSameWorkflowShortPeriodThrottle",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"true");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
+
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateSameWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateSameWorkflowLongPeriodNoThrottle",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"false");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
+ connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateSameWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateSameWorkflowShortPeriodNoThrottle",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"false");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
+ connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent,
helixManager);
+ }
+ }
+
+ public void testUpdateDiffWorkflowShortPeriodThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateDiffWorkflowShortPeriodThrottle1",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+ final Properties properties2 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateDiffWorkflowShortPeriodThrottle2",
workflowIdSuffix3);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+ new
NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2);
+
properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"true");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2));
+ connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateDiffWorkflowShortPeriodNoThrottle1",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+ final Properties properties2 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateDiffWorkflowShortPeriodNoThrottle2",
workflowIdSuffix3);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+ new
NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2);
+
properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"false");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2));
+ connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateDiffWorkflowLongPeriodThrottle1",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+ final Properties properties2 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateDiffWorkflowLongPeriodThrottle2",
workflowIdSuffix3);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+ new
NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2);
+
properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"true");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2));
+ connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateDiffWorkflowLongPeriodNoThrottle1",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+ final Properties properties2 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateDiffWorkflowLongPeriodNoThrottle2",
workflowIdSuffix3);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+ new
NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2);
+
properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"false");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2));
+ connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2,
helixManager);
+ }
+ }
+
+ private GobblinHelixJobScheduler createJobScheduler(HelixManager
helixManager) throws Exception {
+ java.nio.file.Path p =
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
- ConfigValueFactory.fromAnyRef("/tmp/" +
GobblinHelixJobScheduler.class.getSimpleName()));
+ ConfigValueFactory.fromAnyRef(p.toString()));
SchedulerService schedulerService = new SchedulerService(new Properties());
NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
jobCatalog.startAsync();
- GobblinHelixJobScheduler jobScheduler =
- new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager,
java.util.Optional.empty(),
- new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog);
-
- final Properties properties1 =
- GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig,
"1", workflowIdSuffix1);
-
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
"true");
+ return new GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager,
java.util.Optional.empty(),
+ new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog);
+ }
+ private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties
properties, String suffix) {
+
properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
"true");
NewJobConfigArrivalEvent newJobConfigArrivalEvent =
- new
NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1);
- jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
- properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
- "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) +
workflowIdSuffix2);
- Map<String, String> workflowIdMap;
- this.helixManager.connect();
+ new
NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties);
+ properties.setProperty(ConfigurationKeys.JOB_ID_KEY,
+ "job_" + properties.getProperty(ConfigurationKeys.JOB_NAME_KEY) +
suffix);
+ return newJobConfigArrivalEvent;
+ }
- String workFlowId = null;
- long endTime = System.currentTimeMillis() + 30000;
- while (System.currentTimeMillis() < endTime) {
- workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
- Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
- if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
- workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
- break;
- }
- Thread.sleep(100);
- }
+ private void connectAndAssertWorkflowId(String expectedSuffix,
NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager )
throws Exception {
+ helixManager.connect();
+ String workFlowId = getWorkflowID(newJobConfigArrivalEvent, helixManager);
Assert.assertNotNull(workFlowId);
- Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
+ Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+ }
- jobScheduler.handleUpdateJobConfigArrival(
- new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
- this.helixManager.connect();
- endTime = System.currentTimeMillis() + 30000;
+ private String getWorkflowID (NewJobConfigArrivalEvent
newJobConfigArrivalEvent, HelixManager helixManager )
+ throws Exception {
+ long endTime = System.currentTimeMillis() + 30000;
Review Comment:
Yes, that will be good to explain why this value was chosen!
Issue Time Tracking
-------------------
Worklog Id: (was: 865838)
Time Spent: 1h 40m (was: 1.5h)
> Helix Job scheduler should not try to replace running workflow if within
> configured time
> ----------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1840
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1840
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Matthew Ho
> Priority: Major
> Time Spent: 1h 40m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)