[
https://issues.apache.org/jira/browse/GOBBLIN-1840?focusedWorklogId=867637&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-867637
]
ASF GitHub Bot logged work on GOBBLIN-1840:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 27/Jun/23 04:27
Start Date: 27/Jun/23 04:27
Worklog Time Spent: 10m
Work Description: homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1243109243
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,35 @@ public void
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI,
jobUri);
this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+ GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+ new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics,
jobNameToNextSchedulableTime,
+ jobSchedulingThrottleTimeout, clock)
+ : new GobblinHelixJobLauncherListener(this.launcherMetrics);
if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
LOGGER.info("Scheduling job " + jobUri);
- scheduleJob(jobProps,
- new GobblinHelixJobLauncherListener(this.launcherMetrics));
+ scheduleJob(jobProps, listener);
} else {
LOGGER.info("No job schedule found, so running job " + jobUri);
- this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
- new
GobblinHelixJobLauncherListener(this.launcherMetrics)));
+ this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
listener));
}
} catch (JobException je) {
LOGGER.error("Failed to schedule or run job " + jobUri, je);
}
}
@Subscribe
- public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent
updateJobArrival) {
+ public synchronized void
handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
LOGGER.info("Received update for job configuration of job " +
updateJobArrival.getJobName());
+ String jobName = updateJobArrival.getJobName();
+
+ Instant nextSchedulableTime =
jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.MIN);
+ if (this.isThrottleEnabled &&
clock.instant().isBefore(nextSchedulableTime)) {
+ LOGGER.info("Replanning is skipped for job {}. Current time is "
+ + clock.instant() + " and the next schedulable time would be "
Review Comment:
clock.instant() should be using the `{}` syntax. Same for the
nextSchedulable time. And instead of getting the value from the map, use the
`nextSchedulableTime` variable
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +140,194 @@ public void setUp()
this.thread.start();
}
+ /***
+ * Time span exceeds throttle timeout, within same workflow, throttle is
enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowLongPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateSameWorkflowLongPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, true);
+ }
+
+ /***
+ * Time span is within throttle timeout, within same workflow, throttle is
enabled
+ * Job will not be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowShortPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateSameWorkflowShortPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+ true, true);
+ }
+
+ /***
+ * Time span exceeds throttle timeout, within same workflow, throttle is not
enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateSameWorkflowLongPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, true);
+ }
+
+ /***
+ * Time span is within throttle timeout, within same workflow, throttle is
not enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateSameWorkflowShortPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, true);
+ }
+
+ /***
+ * Time span is within throttle timeout, within different workflow, throttle
is enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ public void testUpdateDiffWorkflowShortPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateDiffWorkflowShortPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, false);
+ }
+
+ /***
+ * Time span is within throttle timeout, within different workflow, throttle
is not enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateDiffWorkflowShortPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, false);
+ }
+
+ /***
+ * Time span exceeds throttle timeout, within different workflow, throttle
is enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateDiffWorkflowLongPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, false);
+ }
+
+ /***
+ * Time span exceeds throttle timeout, within different workflow, throttle
is not enabled
+ * Job will be updated
+ * @throws Exception
+ */
@Test
- public void testNewJobAndUpdate()
+ public void testUpdateDiffWorkflowLongPeriodNoThrottle()
throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateDiffWorkflowLongPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, false);
+ }
+
+ private GobblinHelixJobScheduler createJobScheduler(HelixManager
helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+ java.nio.file.Path p =
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
- ConfigValueFactory.fromAnyRef("/tmp/" +
GobblinHelixJobScheduler.class.getSimpleName()));
+ ConfigValueFactory.fromAnyRef(p.toString()));
SchedulerService schedulerService = new SchedulerService(new Properties());
NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
jobCatalog.startAsync();
- GobblinHelixJobScheduler jobScheduler =
- new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager,
java.util.Optional.empty(),
- new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog);
-
- final Properties properties1 =
- GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig,
"1", workflowIdSuffix1);
-
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
"true");
+ Config helixJobSchedulerConfig =
ConfigFactory.empty().withValue("helix.job.scheduling.throttle.enabled",
+ ConfigValueFactory.fromAnyRef(isThrottleEnabled));
+ GobblinHelixJobScheduler gobblinHelixJobScheduler = new
GobblinHelixJobScheduler(helixJobSchedulerConfig, helixManager,
java.util.Optional.empty(),
+ new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog, clock);
+ return gobblinHelixJobScheduler;
+ }
+ private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties
properties) {
+
properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
"true");
NewJobConfigArrivalEvent newJobConfigArrivalEvent =
- new
NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1);
- jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
- properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
- "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) +
workflowIdSuffix2);
- Map<String, String> workflowIdMap;
- this.helixManager.connect();
+ new
NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties);
+ return newJobConfigArrivalEvent;
+ }
+
+ private void connectAndAssertWorkflowId(String expectedSuffix,
NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager )
throws Exception {
+ helixManager.connect();
+ String workFlowId = getWorkflowID(newJobConfigArrivalEvent, helixManager);
+ Assert.assertNotNull(workFlowId);
+ Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+ }
- String workFlowId = null;
+ private String getWorkflowID (NewJobConfigArrivalEvent
newJobConfigArrivalEvent, HelixManager helixManager)
+ throws Exception {
+ // Poll helix for up to 30 seconds to fetch until a workflow with a
matching job name exists in Helix and then return that workflowID
long endTime = System.currentTimeMillis() + 30000;
+ Map<String, String> workflowIdMap;
while (System.currentTimeMillis() < endTime) {
- workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
- Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
+ try{
+ workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(helixManager,
+ Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
+ } catch(GobblinHelixUnexpectedStateException e){
+ continue;
+ }
if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
- workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
- break;
+ return workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
}
Thread.sleep(100);
}
- Assert.assertNotNull(workFlowId);
- Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
+ return null;
+ }
- jobScheduler.handleUpdateJobConfigArrival(
- new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
- this.helixManager.connect();
- endTime = System.currentTimeMillis() + 30000;
- while (System.currentTimeMillis() < endTime) {
- workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
- Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
- if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
- workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
- break;
- }
- Thread.sleep(100);
+ private void runWorkflowTest(Instant mockedTime, String jobSuffix,
+ String newJobWorkflowIdSuffix, String updateWorkflowIdSuffix,
+ String assertUpdateWorkflowIdSuffix, boolean isThrottleEnabled, boolean
isSameWorkflow) throws Exception {
+ Clock mockClock = Mockito.mock(Clock.class);
+ AtomicInteger count = new AtomicInteger(0);
+ when(mockClock.instant()).thenAnswer(invocation -> count.getAndIncrement()
== 0 ? beginTime : mockedTime);
+
+ // Use GobblinHelixManagerFactory instead of HelixManagerFactory to avoid
the connection error
+ // helixManager is set to local variable to avoid the HelixManager
(ZkClient) is not connected error across tests
+ HelixManager helixManager = GobblinHelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager,
isThrottleEnabled, mockClock);
+ final Properties properties =
+ GobblinHelixJobLauncherTest.generateJobProperties(
Review Comment:
Nit: was this meant to be on a new line? Seems like it would fit fine on the
line above
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,74 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+@Slf4j
+public class GobblinThrottlingHelixJobLauncherListener extends
GobblinHelixJobLauncherListener {
+
+ public final static Logger LOG =
LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+ private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+ private Duration helixJobSchedulingThrottleTimeout;
+ private Clock clock;
+
+ public
GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics
jobLauncherMetrics,
+ ConcurrentHashMap jobNameToNextSchedulableTime, Duration
helixJobSchedulingThrottleTimeout, Clock clock) {
Review Comment:
Shouldn't the it should be `ConcurrentHashMap<String, Instant>` instead of
just `ConcurrentHashMap`?
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +140,194 @@ public void setUp()
this.thread.start();
}
+ /***
+ * Time span exceeds throttle timeout, within same workflow, throttle is
enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowLongPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateSameWorkflowLongPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, true);
+ }
+
+ /***
+ * Time span is within throttle timeout, within same workflow, throttle is
enabled
+ * Job will not be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowShortPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateSameWorkflowShortPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+ true, true);
+ }
+
+ /***
+ * Time span exceeds throttle timeout, within same workflow, throttle is not
enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateSameWorkflowLongPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, true);
+ }
+
+ /***
+ * Time span is within throttle timeout, within same workflow, throttle is
not enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateSameWorkflowShortPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, true);
+ }
+
+ /***
+ * Time span is within throttle timeout, within different workflow, throttle
is enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ public void testUpdateDiffWorkflowShortPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateDiffWorkflowShortPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, false);
+ }
+
+ /***
+ * Time span is within throttle timeout, within different workflow, throttle
is not enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateDiffWorkflowShortPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, false);
+ }
+
+ /***
+ * Time span exceeds throttle timeout, within different workflow, throttle
is enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateDiffWorkflowLongPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, false);
+ }
+
+ /***
+ * Time span exceeds throttle timeout, within different workflow, throttle
is not enabled
+ * Job will be updated
+ * @throws Exception
+ */
@Test
- public void testNewJobAndUpdate()
+ public void testUpdateDiffWorkflowLongPeriodNoThrottle()
throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateDiffWorkflowLongPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, false);
+ }
+
+ private GobblinHelixJobScheduler createJobScheduler(HelixManager
helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+ java.nio.file.Path p =
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
- ConfigValueFactory.fromAnyRef("/tmp/" +
GobblinHelixJobScheduler.class.getSimpleName()));
+ ConfigValueFactory.fromAnyRef(p.toString()));
SchedulerService schedulerService = new SchedulerService(new Properties());
NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
jobCatalog.startAsync();
- GobblinHelixJobScheduler jobScheduler =
- new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager,
java.util.Optional.empty(),
- new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog);
-
- final Properties properties1 =
- GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig,
"1", workflowIdSuffix1);
-
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
"true");
+ Config helixJobSchedulerConfig =
ConfigFactory.empty().withValue("helix.job.scheduling.throttle.enabled",
Review Comment:
Use the
`GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY`
instead of the raw string value
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -360,7 +400,7 @@ private void waitForJobCompletion(String jobName) {
}
@Subscribe
- public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent
deleteJobArrival) throws InterruptedException {
+ public synchronized void
handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival)
throws InterruptedException {
Review Comment:
Super minor nit. Not sure if it's even worth implementing:
Would we want to reset the `Instant` in the map to `Instant.EPOCH` if we
delete a workflow? My understanding is that internally we don't use this delete
job config method and only rely on update, so this wouldn't really affect our
own use case.
I am not sure which behavior is more intuitive:
1. If I explicitly delete, I should be able to reschedule it and bypass the
throttle time
2. Regardless of if I deleted the old flow, the throttle time should prevent
resubmission
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +140,194 @@ public void setUp()
this.thread.start();
}
+ /***
+ * Time span exceeds throttle timeout, within same workflow, throttle is
enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowLongPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateSameWorkflowLongPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, true);
+ }
+
+ /***
+ * Time span is within throttle timeout, within same workflow, throttle is
enabled
+ * Job will not be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowShortPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateSameWorkflowShortPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+ true, true);
+ }
+
+ /***
+ * Time span exceeds throttle timeout, within same workflow, throttle is not
enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateSameWorkflowLongPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, true);
+ }
+
+ /***
+ * Time span is within throttle timeout, within same workflow, throttle is
not enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateSameWorkflowShortPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, true);
+ }
+
+ /***
+ * Time span is within throttle timeout, within different workflow, throttle
is enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ public void testUpdateDiffWorkflowShortPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateDiffWorkflowShortPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, false);
+ }
+
+ /***
+ * Time span is within throttle timeout, within different workflow, throttle
is not enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateDiffWorkflowShortPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, false);
+ }
+
+ /***
+ * Time span exceeds throttle timeout, within different workflow, throttle
is enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateDiffWorkflowLongPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, false);
+ }
+
+ /***
+ * Time span exceeds throttle timeout, within different workflow, throttle
is not enabled
+ * Job will be updated
+ * @throws Exception
+ */
@Test
- public void testNewJobAndUpdate()
+ public void testUpdateDiffWorkflowLongPeriodNoThrottle()
throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateDiffWorkflowLongPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, false);
+ }
+
+ private GobblinHelixJobScheduler createJobScheduler(HelixManager
helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+ java.nio.file.Path p =
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
- ConfigValueFactory.fromAnyRef("/tmp/" +
GobblinHelixJobScheduler.class.getSimpleName()));
+ ConfigValueFactory.fromAnyRef(p.toString()));
SchedulerService schedulerService = new SchedulerService(new Properties());
NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
jobCatalog.startAsync();
- GobblinHelixJobScheduler jobScheduler =
- new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager,
java.util.Optional.empty(),
- new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog);
-
- final Properties properties1 =
- GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig,
"1", workflowIdSuffix1);
-
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
"true");
+ Config helixJobSchedulerConfig =
ConfigFactory.empty().withValue("helix.job.scheduling.throttle.enabled",
+ ConfigValueFactory.fromAnyRef(isThrottleEnabled));
+ GobblinHelixJobScheduler gobblinHelixJobScheduler = new
GobblinHelixJobScheduler(helixJobSchedulerConfig, helixManager,
java.util.Optional.empty(),
+ new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog, clock);
+ return gobblinHelixJobScheduler;
+ }
+ private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties
properties) {
+
properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
"true");
NewJobConfigArrivalEvent newJobConfigArrivalEvent =
- new
NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1);
- jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
- properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
- "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) +
workflowIdSuffix2);
- Map<String, String> workflowIdMap;
- this.helixManager.connect();
+ new
NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties);
+ return newJobConfigArrivalEvent;
+ }
+
+ private void connectAndAssertWorkflowId(String expectedSuffix,
NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager )
throws Exception {
Review Comment:
Random question, but why do we use `NewJobConfigArrivalEvent` instead of
just passing a string job name? There were some places in the code where we
constructed a brand new `NewJobConfigArrivalEvent` just to pass it into this
method.
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,35 @@ public void
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI,
jobUri);
this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+ GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+ new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics,
jobNameToNextSchedulableTime,
+ jobSchedulingThrottleTimeout, clock)
+ : new GobblinHelixJobLauncherListener(this.launcherMetrics);
if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
LOGGER.info("Scheduling job " + jobUri);
- scheduleJob(jobProps,
- new GobblinHelixJobLauncherListener(this.launcherMetrics));
+ scheduleJob(jobProps, listener);
} else {
LOGGER.info("No job schedule found, so running job " + jobUri);
- this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
- new
GobblinHelixJobLauncherListener(this.launcherMetrics)));
+ this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
listener));
}
} catch (JobException je) {
LOGGER.error("Failed to schedule or run job " + jobUri, je);
}
}
@Subscribe
- public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent
updateJobArrival) {
+ public synchronized void
handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
LOGGER.info("Received update for job configuration of job " +
updateJobArrival.getJobName());
+ String jobName = updateJobArrival.getJobName();
+
+ Instant nextSchedulableTime =
jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.MIN);
Review Comment:
Random question, but is there a reason we use `Instant.min` as the default
value here and `Instant.EPOCH` as the placeholder elsewhere?
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -303,7 +332,7 @@ public Object get(long timeout, TimeUnit unit) throws
InterruptedException, Exec
}
@Subscribe
- public void handleNewJobConfigArrival(NewJobConfigArrivalEvent
newJobArrival) {
+ public synchronized void handleNewJobConfigArrival(NewJobConfigArrivalEvent
newJobArrival) {
Review Comment:
I see a race condition here. Although only 1 thread can access this method
at a time. The scheduling itself is still async. I.e. the `onJobPrepare` method
is called async.
Suppose we have 2 threads `T1` and `T2`. And consider the following
interleaving:
```
Synchronous:
T1 calls handlenewJobConfigArrival
T1 submits listener to executor
T1 finishes handlenewJobConfigArrival
Synchronous:
T2 calls handlenewJobConfigArrival
T2 submits listener to executor
T2 finishes handlenewJobConfigArrival
Executor picks up runnable submitted by T1
Executor picks up runnable submitted by T2
```
This would fail to throttle since both were submitted to the executor. We
should make sure to update the map before submitting and if there is a
jobException we should reset the time exactly the same way done in the
listener.
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -360,7 +400,7 @@ private void waitForJobCompletion(String jobName) {
}
@Subscribe
- public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent
deleteJobArrival) throws InterruptedException {
+ public synchronized void
handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival)
throws InterruptedException {
Review Comment:
The current behavior is (2). And to make the behavior (1), we would:
1. Store the current time in the map,
2. Set the value in the map to `Instant.EPOCH`
3. If there is a job exception we reset the value back to the original value
that was in the map
The delete operations are synchronous and the method is `synchronized`, so
this approach would be thread safe
Issue Time Tracking
-------------------
Worklog Id: (was: 867637)
Time Spent: 3h (was: 2h 50m)
> Helix Job scheduler should not try to replace running workflow if within
> configured time
> ----------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1840
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1840
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Matthew Ho
> Priority: Major
> Time Spent: 3h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)