homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1244481162
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +140,192 @@ public void setUp()
this.thread.start();
}
+ /***
+ * Time span exceeds throttle timeout, within same workflow, throttle is
enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowLongPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateSameWorkflowLongPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, true);
+ }
+
+ /***
+ * Time span is within throttle timeout, within same workflow, throttle is
enabled
+ * Job will not be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowShortPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateSameWorkflowShortPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+ true, true);
+ }
+
+ /***
+ * Time span exceeds throttle timeout, within same workflow, throttle is not
enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateSameWorkflowLongPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, true);
+ }
+
+ /***
+ * Time span is within throttle timeout, within same workflow, throttle is
not enabled
+ * Job will be updated
+ * @throws Exception
+ */
@Test
- public void testNewJobAndUpdate()
+ public void testUpdateSameWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateSameWorkflowShortPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, true);
+ }
+
+ /***
+ * Time span is within throttle timeout, within different workflow, throttle
is enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ public void testUpdateDiffWorkflowShortPeriodThrottle()
throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateDiffWorkflowShortPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, false);
+ }
+
+ /***
+ * Time span is within throttle timeout, within different workflow, throttle
is not enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateDiffWorkflowShortPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, false);
+ }
+
+ /***
+ * Time span exceeds throttle timeout, within different workflow, throttle
is enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateDiffWorkflowLongPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, false);
+ }
+
+ /***
+ * Time span exceeds throttle timeout, within different workflow, throttle
is not enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateDiffWorkflowLongPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, false);
+ }
+
+ private GobblinHelixJobScheduler createJobScheduler(HelixManager
helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+ java.nio.file.Path p =
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
- ConfigValueFactory.fromAnyRef("/tmp/" +
GobblinHelixJobScheduler.class.getSimpleName()));
+ ConfigValueFactory.fromAnyRef(p.toString()));
SchedulerService schedulerService = new SchedulerService(new Properties());
NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
jobCatalog.startAsync();
- GobblinHelixJobScheduler jobScheduler =
- new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager,
java.util.Optional.empty(),
- new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog);
-
- final Properties properties1 =
- GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig,
"1", workflowIdSuffix1);
-
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
"true");
+ Config helixJobSchedulerConfig =
ConfigFactory.empty().withValue(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+ ConfigValueFactory.fromAnyRef(isThrottleEnabled));
+ GobblinHelixJobScheduler gobblinHelixJobScheduler = new
GobblinHelixJobScheduler(helixJobSchedulerConfig, helixManager,
java.util.Optional.empty(),
+ new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog, clock);
+ return gobblinHelixJobScheduler;
+ }
+ private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties
properties) {
+
properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
"true");
NewJobConfigArrivalEvent newJobConfigArrivalEvent =
- new
NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1);
- jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
- properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
- "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) +
workflowIdSuffix2);
- Map<String, String> workflowIdMap;
- this.helixManager.connect();
+ new
NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties);
+ return newJobConfigArrivalEvent;
+ }
+
+ private void connectAndAssertWorkflowId(String expectedSuffix, String
jobName, HelixManager helixManager) throws Exception {
+ helixManager.connect();
+ String workFlowId = getWorkflowID(jobName, helixManager);
+ Assert.assertNotNull(workFlowId);
+ Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+ }
- String workFlowId = null;
+ private String getWorkflowID (String jobName, HelixManager helixManager)
+ throws Exception {
+ // Poll helix for up to 30 seconds to fetch until a workflow with a
matching job name exists in Helix and then return that workflowID
long endTime = System.currentTimeMillis() + 30000;
+ Map<String, String> workflowIdMap;
while (System.currentTimeMillis() < endTime) {
- workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
- Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
- if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
- workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
- break;
+ try{
+ workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(helixManager,
+ Collections.singletonList(jobName));
+ } catch(GobblinHelixUnexpectedStateException e){
+ continue;
+ }
+ if (workflowIdMap.containsKey(jobName)) {
+ return workflowIdMap.get(jobName);
}
Thread.sleep(100);
}
- Assert.assertNotNull(workFlowId);
- Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
+ return null;
+ }
- jobScheduler.handleUpdateJobConfigArrival(
- new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
- this.helixManager.connect();
- endTime = System.currentTimeMillis() + 30000;
- while (System.currentTimeMillis() < endTime) {
- workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
- Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
- if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
- workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
- break;
- }
- Thread.sleep(100);
+ private void runWorkflowTest(Duration mockedPeriod, String jobSuffix,
Review Comment:
A java doc that describes what these variables are so that future people can
use this method would be helpful.
Also, mockedPeriod is a bit of a weird name. Since you're now using it to
represent the step amount each time clock.instant() is called.
Is this really necessary? In your original implementation it was just
returning a final Instant defined at the beginning which was a bit easier to
reason about. But now we sort of rely on how many times clock.instant() is
called to know what the current time is
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]