[
https://issues.apache.org/jira/browse/GOBBLIN-1840?focusedWorklogId=867888&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-867888
]
ASF GitHub Bot logged work on GOBBLIN-1840:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 27/Jun/23 23:34
Start Date: 27/Jun/23 23:34
Worklog Time Spent: 10m
Work Description: homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1244481162
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +140,192 @@ public void setUp()
this.thread.start();
}
+ /***
+ * Time span exceeds throttle timeout, within same workflow, throttle is
enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowLongPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateSameWorkflowLongPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, true);
+ }
+
+ /***
+ * Time span is within throttle timeout, within same workflow, throttle is
enabled
+ * Job will not be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowShortPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateSameWorkflowShortPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+ true, true);
+ }
+
+ /***
+ * Time span exceeds throttle timeout, within same workflow, throttle is not
enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateSameWorkflowLongPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, true);
+ }
+
+ /***
+ * Time span is within throttle timeout, within same workflow, throttle is
not enabled
+ * Job will be updated
+ * @throws Exception
+ */
@Test
- public void testNewJobAndUpdate()
+ public void testUpdateSameWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateSameWorkflowShortPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, true);
+ }
+
+ /***
+ * Time span is within throttle timeout, within different workflow, throttle
is enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ public void testUpdateDiffWorkflowShortPeriodThrottle()
throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateDiffWorkflowShortPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, false);
+ }
+
+ /***
+ * Time span is within throttle timeout, within different workflow, throttle
is not enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateDiffWorkflowShortPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, false);
+ }
+
+ /***
+ * Time span exceeds throttle timeout, within different workflow, throttle
is enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateDiffWorkflowLongPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, false);
+ }
+
+ /***
+ * Time span exceeds throttle timeout, within different workflow, throttle
is not enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateDiffWorkflowLongPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, false);
+ }
+
+ private GobblinHelixJobScheduler createJobScheduler(HelixManager
helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+ java.nio.file.Path p =
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
- ConfigValueFactory.fromAnyRef("/tmp/" +
GobblinHelixJobScheduler.class.getSimpleName()));
+ ConfigValueFactory.fromAnyRef(p.toString()));
SchedulerService schedulerService = new SchedulerService(new Properties());
NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
jobCatalog.startAsync();
- GobblinHelixJobScheduler jobScheduler =
- new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager,
java.util.Optional.empty(),
- new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog);
-
- final Properties properties1 =
- GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig,
"1", workflowIdSuffix1);
-
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
"true");
+ Config helixJobSchedulerConfig =
ConfigFactory.empty().withValue(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+ ConfigValueFactory.fromAnyRef(isThrottleEnabled));
+ GobblinHelixJobScheduler gobblinHelixJobScheduler = new
GobblinHelixJobScheduler(helixJobSchedulerConfig, helixManager,
java.util.Optional.empty(),
+ new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog, clock);
+ return gobblinHelixJobScheduler;
+ }
+ private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties
properties) {
+
properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
"true");
NewJobConfigArrivalEvent newJobConfigArrivalEvent =
- new
NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1);
- jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
- properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
- "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) +
workflowIdSuffix2);
- Map<String, String> workflowIdMap;
- this.helixManager.connect();
+ new
NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties);
+ return newJobConfigArrivalEvent;
+ }
+
+ private void connectAndAssertWorkflowId(String expectedSuffix, String
jobName, HelixManager helixManager) throws Exception {
+ helixManager.connect();
+ String workFlowId = getWorkflowID(jobName, helixManager);
+ Assert.assertNotNull(workFlowId);
+ Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+ }
- String workFlowId = null;
+ private String getWorkflowID (String jobName, HelixManager helixManager)
+ throws Exception {
+ // Poll helix for up to 30 seconds to fetch until a workflow with a
matching job name exists in Helix and then return that workflowID
long endTime = System.currentTimeMillis() + 30000;
+ Map<String, String> workflowIdMap;
while (System.currentTimeMillis() < endTime) {
- workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
- Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
- if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
- workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
- break;
+ try{
+ workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(helixManager,
+ Collections.singletonList(jobName));
+ } catch(GobblinHelixUnexpectedStateException e){
+ continue;
+ }
+ if (workflowIdMap.containsKey(jobName)) {
+ return workflowIdMap.get(jobName);
}
Thread.sleep(100);
}
- Assert.assertNotNull(workFlowId);
- Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
+ return null;
+ }
- jobScheduler.handleUpdateJobConfigArrival(
- new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
- this.helixManager.connect();
- endTime = System.currentTimeMillis() + 30000;
- while (System.currentTimeMillis() < endTime) {
- workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
- Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
- if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
- workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
- break;
- }
- Thread.sleep(100);
+ private void runWorkflowTest(Duration mockedPeriod, String jobSuffix,
Review Comment:
A java doc that describes what these variables are so that future people can
use this method would be helpful.
Also, mockedPeriod is a bit of a weird name. Since you're now using it to
represent the step amount each time clock.instant() is called.
Is this really necessary? In your original implementation it was just
returning a final Instant defined at the beginning which was a bit easier to
reason about. But now we sort of rely on how many times clock.instant() is
called to know what the current time is
Issue Time Tracking
-------------------
Worklog Id: (was: 867888)
Time Spent: 4h 10m (was: 4h)
> Helix Job scheduler should not try to replace running workflow if within
> configured time
> ----------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1840
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1840
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Matthew Ho
> Priority: Major
> Time Spent: 4h 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)