[
https://issues.apache.org/jira/browse/GOBBLIN-1840?focusedWorklogId=867887&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-867887
]
ASF GitHub Bot logged work on GOBBLIN-1840:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 27/Jun/23 23:31
Start Date: 27/Jun/23 23:31
Worklog Time Spent: 10m
Work Description: homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1244470463
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -522,7 +534,7 @@ public void run() {
GobblinHelixJobScheduler.this.jobSchedulerMetrics.updateTimeBetweenJobSchedulingAndJobLaunching(this.creationTimeInMillis,
System.currentTimeMillis());
GobblinHelixJobScheduler.this.runJob(this.jobProps, this.jobListener);
} catch (JobException je) {
- LOGGER.error("Failed to run job " +
this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
+ LOGGER.error("Failed to schedule or run job to run job " +
this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
Review Comment:
Typo / wording. `schedule or run job to run job`
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +140,192 @@ public void setUp()
this.thread.start();
}
+ /***
+ * Time span exceeds throttle timeout, within same workflow, throttle is
enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowLongPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateSameWorkflowLongPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, true);
+ }
+
+ /***
+ * Time span is within throttle timeout, within same workflow, throttle is
enabled
+ * Job will not be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowShortPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateSameWorkflowShortPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+ true, true);
+ }
+
+ /***
+ * Time span exceeds throttle timeout, within same workflow, throttle is not
enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateSameWorkflowLongPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, true);
+ }
+
+ /***
+ * Time span is within throttle timeout, within same workflow, throttle is
not enabled
+ * Job will be updated
+ * @throws Exception
+ */
@Test
- public void testNewJobAndUpdate()
+ public void testUpdateSameWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateSameWorkflowShortPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, true);
+ }
+
+ /***
+ * Time span is within throttle timeout, within different workflow, throttle
is enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ public void testUpdateDiffWorkflowShortPeriodThrottle()
throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateDiffWorkflowShortPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, false);
+ }
+
+ /***
+ * Time span is within throttle timeout, within different workflow, throttle
is not enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateDiffWorkflowShortPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, false);
+ }
+
+ /***
+ * Time span exceeds throttle timeout, within different workflow, throttle
is enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateDiffWorkflowLongPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, false);
+ }
+
+ /***
+ * Time span exceeds throttle timeout, within different workflow, throttle
is not enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateDiffWorkflowLongPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, false);
+ }
+
+ private GobblinHelixJobScheduler createJobScheduler(HelixManager
helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+ java.nio.file.Path p =
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
- ConfigValueFactory.fromAnyRef("/tmp/" +
GobblinHelixJobScheduler.class.getSimpleName()));
+ ConfigValueFactory.fromAnyRef(p.toString()));
SchedulerService schedulerService = new SchedulerService(new Properties());
NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
jobCatalog.startAsync();
- GobblinHelixJobScheduler jobScheduler =
- new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager,
java.util.Optional.empty(),
- new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog);
-
- final Properties properties1 =
- GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig,
"1", workflowIdSuffix1);
-
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
"true");
+ Config helixJobSchedulerConfig =
ConfigFactory.empty().withValue(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+ ConfigValueFactory.fromAnyRef(isThrottleEnabled));
+ GobblinHelixJobScheduler gobblinHelixJobScheduler = new
GobblinHelixJobScheduler(helixJobSchedulerConfig, helixManager,
java.util.Optional.empty(),
+ new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog, clock);
+ return gobblinHelixJobScheduler;
+ }
+ private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties
properties) {
+
properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
"true");
NewJobConfigArrivalEvent newJobConfigArrivalEvent =
- new
NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1);
- jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
- properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
- "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) +
workflowIdSuffix2);
- Map<String, String> workflowIdMap;
- this.helixManager.connect();
+ new
NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties);
+ return newJobConfigArrivalEvent;
+ }
+
+ private void connectAndAssertWorkflowId(String expectedSuffix, String
jobName, HelixManager helixManager) throws Exception {
+ helixManager.connect();
+ String workFlowId = getWorkflowID(jobName, helixManager);
+ Assert.assertNotNull(workFlowId);
+ Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+ }
- String workFlowId = null;
+ private String getWorkflowID (String jobName, HelixManager helixManager)
+ throws Exception {
+ // Poll helix for up to 30 seconds to fetch until a workflow with a
matching job name exists in Helix and then return that workflowID
long endTime = System.currentTimeMillis() + 30000;
+ Map<String, String> workflowIdMap;
while (System.currentTimeMillis() < endTime) {
- workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
- Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
- if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
- workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
- break;
+ try{
+ workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(helixManager,
+ Collections.singletonList(jobName));
+ } catch(GobblinHelixUnexpectedStateException e){
+ continue;
+ }
+ if (workflowIdMap.containsKey(jobName)) {
+ return workflowIdMap.get(jobName);
}
Thread.sleep(100);
}
- Assert.assertNotNull(workFlowId);
- Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
+ return null;
+ }
- jobScheduler.handleUpdateJobConfigArrival(
- new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
- this.helixManager.connect();
- endTime = System.currentTimeMillis() + 30000;
- while (System.currentTimeMillis() < endTime) {
- workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
- Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
- if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
- workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
- break;
- }
- Thread.sleep(100);
+ private void runWorkflowTest(Duration mockedPeriod, String jobSuffix,
+ String newJobWorkflowIdSuffix, String updateWorkflowIdSuffix,
+ String assertUpdateWorkflowIdSuffix, boolean isThrottleEnabled, boolean
isSameWorkflow) throws Exception {
+ Clock mockClock = Mockito.mock(Clock.class);
+ AtomicReference<Instant> nextInstant = new AtomicReference<>(beginTime);
+ when(mockClock.instant()).thenAnswer(invocation ->
nextInstant.getAndAccumulate(nextInstant.get(), (currentInstant, x) ->
currentInstant.plus(mockedPeriod)));
Review Comment:
nextInstant.get() is not used and is just a placeholder right? Since it
seems like just a placeholder value you can use something like null
Issue Time Tracking
-------------------
Worklog Id: (was: 867887)
Time Spent: 4h (was: 3h 50m)
> Helix Job scheduler should not try to replace running workflow if within
> configured time
> ----------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1840
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1840
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Matthew Ho
> Priority: Major
> Time Spent: 4h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)