This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 1ecce5bc7 [GOBBLIN-1840] Helix Job scheduler should not try to replace
running workflow if within configured time (#3704)
1ecce5bc7 is described below
commit 1ecce5bc733b047b3c2b945cf4eba2bc21bac241
Author: Peiying Ye <[email protected]>
AuthorDate: Fri Jun 30 10:41:44 2023 -0700
[GOBBLIN-1840] Helix Job scheduler should not try to replace running
workflow if within configured time (#3704)
* [GOBBLIN-1840] Helix Job scheduler should not try to replace running
workflow if within configured time
* [GOBBLIN-1840] Remove unnecessary files
* [GOBBLIN-1840] Add config for throttleTimeoutDuration
* [GOBBLIN-1840] Clean up format and coding standard
* [GOBBLIN-1840] Clean up format layout
* [GOBBLIN-1840] Clean up auto format
* [GOBBLIN-1840] Clear up empty space
* [GOBBLIN-1840] Clarify naming standards and simplify repeated codes
* [GOBBLIN-1840] Add Javadoc on GobblinHelixJobSchedulerTest for setting
HelixManager as local variable
* [GOBBLIN-1840] Optimize imports and fix unit test errors
* [GOBBLIN-1840] Rewrite log info and add Javadoc
* [GOBBLIN-1840] Remove job status check
* [GOBBLIN-1840] Add log info and change config setting
* [GOBBLIN-1840] Add @Slf4j in GobblinThrottlingHelixJobLauncherListener
* [GOBBLIN-1840] Fix race condition of handleNewJobConfigArrival
* [GOBBLIN-1840] Improve mockClock mechanism
* [GOBBLIN-1840] Address comments
* [GOBBLIN-1840] Only put entry in jobNameToNextSchedulableTime when
throttle is enabled
* [GOBBLIN-1840] Remove extra schedulable time updates
* [GOBBLIN-1840] Fix checkstyle problems
---------
Co-authored-by: Peiying Ye <[email protected]>
---
.../cluster/GobblinClusterConfigurationKeys.java | 9 +
.../gobblin/cluster/GobblinHelixJobScheduler.java | 134 +++++++++---
.../GobblinThrottlingHelixJobLauncherListener.java | 69 ++++++
.../org/apache/gobblin/cluster/HelixUtils.java | 6 +-
.../cluster/GobblinHelixJobSchedulerTest.java | 243 ++++++++++++++++-----
5 files changed, 385 insertions(+), 76 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 31a8547aa..ef83ab029 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -17,6 +17,8 @@
package org.apache.gobblin.cluster;
+import java.time.Duration;
+
import org.apache.gobblin.annotation.Alpha;
@@ -222,4 +224,11 @@ public class GobblinClusterConfigurationKeys {
public static final String CONTAINER_ID_KEY = GOBBLIN_HELIX_PREFIX +
"containerId";
public static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX =
GOBBLIN_CLUSTER_PREFIX + "sysProps";
+
+ public static final String HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY =
"helix.job.scheduling.throttle.enabled";
+ public static final boolean
DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = false;
+
+ public static final String HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY
= "helix.job.scheduling.throttle.timeout.seconds";
+ public static final long
DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY =
Duration.ofMinutes(40).getSeconds();;
+
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 20f47da3e..d30554d2b 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -18,6 +18,10 @@
package org.apache.gobblin.cluster;
import java.io.IOException;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -111,14 +115,28 @@ public class GobblinHelixJobScheduler extends
JobScheduler implements StandardMe
private boolean startServicesCompleted;
private final long helixJobStopTimeoutMillis;
+ /**
+ * The throttling timeout prevents helix workflows with the same job name /
URI from being submitted
+ * more than once within the timeout period. This timeout is not reset by
deletes / cancels, meaning that
+ * if you delete a workflow within the timeout period, you cannot reschedule
until the timeout period is complete.
+ * However, if there is an error when launching the job, you can immediately
reschedule the flow. <br><br>
+ *
+ * NOTE: This throttle timeout period starts when the job launcher thread
picks up the runnable. Meaning that the
+ * time it takes to submit to Helix and start running the flow is also
included as part of the timeout period
+ */
+ private final Duration jobSchedulingThrottleTimeout;
+ private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+ private boolean isThrottleEnabled;
+ private Clock clock;
+
public GobblinHelixJobScheduler(Config sysConfig,
HelixManager jobHelixManager,
Optional<HelixManager>
taskDriverHelixManager,
EventBus eventBus,
Path appWorkDir, List<? extends Tag<?>>
metadataTags,
SchedulerService schedulerService,
- MutableJobCatalog jobCatalog) throws
Exception {
-
+ MutableJobCatalog jobCatalog,
+ Clock clock) throws Exception {
super(ConfigUtils.configToProperties(sysConfig), schedulerService);
this.commonJobProperties =
ConfigUtils.configToProperties(ConfigUtils.getConfigOrEmpty(sysConfig,
COMMON_JOB_PROPS));
this.jobHelixManager = jobHelixManager;
@@ -162,6 +180,27 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
this.helixWorkflowListingTimeoutMillis = ConfigUtils.getLong(sysConfig,
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS)
* 1000;
+ this.jobSchedulingThrottleTimeout =
Duration.of(ConfigUtils.getLong(sysConfig,
GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY,
+
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY),
ChronoUnit.SECONDS);
+
+ this.jobNameToNextSchedulableTime = new ConcurrentHashMap<>();
+
+ this.isThrottleEnabled = ConfigUtils.getBoolean(sysConfig,
GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY);
+
+ this.clock = clock;
+ }
+
+ public GobblinHelixJobScheduler(Config sysConfig,
+ HelixManager jobHelixManager,
+ Optional<HelixManager> taskDriverHelixManager,
+ EventBus eventBus,
+ Path appWorkDir, List<? extends Tag<?>> metadataTags,
+ SchedulerService schedulerService,
+ MutableJobCatalog jobCatalog) throws Exception {
+
+ this(sysConfig, jobHelixManager, taskDriverHelixManager, eventBus,
appWorkDir, metadataTags,
+ schedulerService, jobCatalog, Clock.systemUTC());
}
@Override
@@ -206,9 +245,9 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
if (cleanAllDistJobs) {
for (org.apache.gobblin.configuration.State state :
this.jobsMapping.getAllStates()) {
- String jobUri = state.getId();
- LOGGER.info("Delete mapping for job " + jobUri);
- this.jobsMapping.deleteMapping(jobUri);
+ String jobName = state.getId();
+ LOGGER.info("Delete mapping for job " + jobName);
+ this.jobsMapping.deleteMapping(jobName);
}
}
}
@@ -303,36 +342,70 @@ public class GobblinHelixJobScheduler extends
JobScheduler implements StandardMe
}
@Subscribe
- public void handleNewJobConfigArrival(NewJobConfigArrivalEvent
newJobArrival) {
- String jobUri = newJobArrival.getJobName();
- LOGGER.info("Received new job configuration of job " + jobUri);
+ public synchronized void handleNewJobConfigArrival(NewJobConfigArrivalEvent
newJobArrival) {
+ String jobName = newJobArrival.getJobName();
+ LOGGER.info("Received new job configuration of job " + jobName);
+
+ Instant nextSchedulableTime =
jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.EPOCH);
+ if (this.isThrottleEnabled &&
clock.instant().isBefore(nextSchedulableTime)) {
+ LOGGER.info("Adding new job is skipped for job {}. Current time is {}
and the next schedulable time would be {}",
+ jobName,
+ clock.instant(),
+ nextSchedulableTime
+ );
+ return;
+ }
+
+ if (isThrottleEnabled) {
+ nextSchedulableTime = clock.instant().plus(jobSchedulingThrottleTimeout);
+ jobNameToNextSchedulableTime.put(jobName, nextSchedulableTime);
+ }
+
try {
Properties jobProps = new Properties();
jobProps.putAll(this.commonJobProperties);
jobProps.putAll(newJobArrival.getJobConfig());
// set uri so that we can delete this job later
- jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI,
jobUri);
+ jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI,
jobName);
this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+ GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+ new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics,
jobNameToNextSchedulableTime)
+ : new GobblinHelixJobLauncherListener(this.launcherMetrics);
if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
- LOGGER.info("Scheduling job " + jobUri);
- scheduleJob(jobProps,
- new GobblinHelixJobLauncherListener(this.launcherMetrics));
+ LOGGER.info("Scheduling job " + jobName);
+ scheduleJob(jobProps, listener);
} else {
- LOGGER.info("No job schedule found, so running job " + jobUri);
- this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
- new
GobblinHelixJobLauncherListener(this.launcherMetrics)));
+ LOGGER.info("No job schedule found, so running job " + jobName);
+ this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
listener));
}
} catch (JobException je) {
- LOGGER.error("Failed to schedule or run job " + jobUri, je);
+ LOGGER.error("Failed to schedule or run job {} . Reset the next
scheduable time to {}",
+ jobName,
+ Instant.EPOCH,
+ je);
+ if (isThrottleEnabled) {
+ jobNameToNextSchedulableTime.put(jobName, Instant.EPOCH);
+ }
}
}
@Subscribe
- public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent
updateJobArrival) {
+ public synchronized void
handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
LOGGER.info("Received update for job configuration of job " +
updateJobArrival.getJobName());
+ String jobName = updateJobArrival.getJobName();
+
+ Instant nextSchedulableTime =
jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.EPOCH);
+ if (this.isThrottleEnabled &&
clock.instant().isBefore(nextSchedulableTime)) {
+ LOGGER.info("Replanning is skipped for job {}. Current time is {} and
the next schedulable time would be {}",
+ jobName,
+ clock.instant(),
+ nextSchedulableTime
+ );
+ return;
+ }
+
try {
handleDeleteJobConfigArrival(new
DeleteJobConfigArrivalEvent(updateJobArrival.getJobName(),
updateJobArrival.getJobConfig()));
@@ -359,8 +432,17 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
}
}
+ /***
+ * Deleting a workflow with throttling enabled means that the next
+ * schedulable time for the workflow will remain unchanged.
+ * Note: In such case, it is required to wait until the throttle
+ * timeout period elapses before the workflow can be rescheduled.
+ *
+ * @param deleteJobArrival
+ * @throws InterruptedException
+ */
@Subscribe
- public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent
deleteJobArrival) throws InterruptedException {
+ public synchronized void
handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival)
throws InterruptedException {
LOGGER.info("Received delete for job configuration of job " +
deleteJobArrival.getJobName());
try {
unscheduleJob(deleteJobArrival.getJobName());
@@ -373,8 +455,8 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
@Subscribe
public void handleCancelJobConfigArrival(CancelJobConfigArrivalEvent
cancelJobArrival)
throws InterruptedException {
- String jobUri = cancelJobArrival.getJoburi();
- LOGGER.info("Received cancel for job configuration of job " + jobUri);
+ String jobName = cancelJobArrival.getJoburi();
+ LOGGER.info("Received cancel for job configuration of job " + jobName);
Optional<String> distributedJobMode;
Optional<String> planningJob = Optional.empty();
Optional<String> actualJob = Optional.empty();
@@ -384,14 +466,14 @@ public class GobblinHelixJobScheduler extends
JobScheduler implements StandardMe
this.jobSchedulerMetrics.numCancellationStart.incrementAndGet();
try {
- distributedJobMode = this.jobsMapping.getDistributedJobMode(jobUri);
+ distributedJobMode = this.jobsMapping.getDistributedJobMode(jobName);
if (distributedJobMode.isPresent() &&
Boolean.parseBoolean(distributedJobMode.get())) {
- planningJob = this.jobsMapping.getPlanningJobId(jobUri);
+ planningJob = this.jobsMapping.getPlanningJobId(jobName);
} else {
- actualJob = this.jobsMapping.getActualJobId(jobUri);
+ actualJob = this.jobsMapping.getActualJobId(jobName);
}
} catch (IOException e) {
- LOGGER.warn("jobsMapping could not be retrieved for job {}", jobUri);
+ LOGGER.warn("jobsMapping could not be retrieved for job {}", jobName);
return;
}
@@ -466,7 +548,7 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
GobblinHelixJobScheduler.this.jobSchedulerMetrics.updateTimeBetweenJobSchedulingAndJobLaunching(this.creationTimeInMillis,
System.currentTimeMillis());
GobblinHelixJobScheduler.this.runJob(this.jobProps, this.jobListener);
} catch (JobException je) {
- LOGGER.error("Failed to run job " +
this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
+ LOGGER.error("Failed to schedule or run job " +
this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
}
}
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java
new file mode 100644
index 000000000..ec00be19b
--- /dev/null
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+@Slf4j
+public class GobblinThrottlingHelixJobLauncherListener extends
GobblinHelixJobLauncherListener {
+
+ public final static Logger LOG =
LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+ private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+
+ public
GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics
jobLauncherMetrics,
+ ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime) {
+ super(jobLauncherMetrics);
+ this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+ }
+
+ @Override
+ public void onJobCompletion(JobContext jobContext)
+ throws Exception {
+ super.onJobCompletion(jobContext);
+ if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
+ jobNameToNextSchedulableTime.put(jobContext.getJobName(), Instant.EPOCH);
+ LOG.info("{} failed. The next schedulable time is {} so that any future
schedule attempts will be allowed.",
+ jobContext.getJobName(), Instant.EPOCH);
+ }
+ }
+
+ @Override
+ public void onJobCancellation(JobContext jobContext)
+ throws Exception {
+ super.onJobCancellation(jobContext);
+ jobNameToNextSchedulableTime.put(jobContext.getJobName(), Instant.EPOCH);
+ LOG.info("{} is cancelled. The next schedulable time is {} so that any
future schedule attempts will be allowed.",
+ jobContext.getJobName(), Instant.EPOCH);
+ }
+}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index d62002967..688c8c7ba 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -417,7 +417,11 @@ public class HelixUtils {
}
Set<String> helixJobs = workflowConfig.getJobDag().getAllNodes();
for (String helixJob : helixJobs) {
- Iterator<TaskConfig> taskConfigIterator =
taskDriver.getJobConfig(helixJob).getTaskConfigMap().values().iterator();
+ JobConfig jobConfig = taskDriver.getJobConfig(helixJob);
+ if (jobConfig == null) {
+ throw new GobblinHelixUnexpectedStateException("Received null
jobConfig from Helix. We should not see any null configs when reading all
helixJobs. helixJob=%s", helixJob);
+ }
+ Iterator<TaskConfig> taskConfigIterator =
jobConfig.getTaskConfigMap().values().iterator();
if (taskConfigIterator.hasNext()) {
TaskConfig taskConfig = taskConfigIterator.next();
String jobName =
taskConfig.getConfigMap().get(ConfigurationKeys.JOB_NAME_KEY);
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java
index 498e2530a..e21c7e738 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java
@@ -20,18 +20,24 @@ package org.apache.gobblin.cluster;
import java.io.File;
import java.io.IOException;
import java.net.URL;
+import java.nio.file.Files;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.assertj.core.util.Lists;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -52,16 +58,20 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
import org.apache.gobblin.scheduler.SchedulerService;
+import static org.mockito.Mockito.when;
+
/**
* Unit tests for {@link org.apache.gobblin.cluster.GobblinHelixJobScheduler}.
*
+ * In all test cases, we use GobblinHelixManagerFactory instead of
+ * HelixManagerFactory, and instantiate a local HelixManager per test to
+ * provide isolation and prevent errors caused by the ZKClient being shared
+ * (e.g. ZKClient is not connected exceptions).
*/
-@Test(groups = {"gobblin.cluster"})
+@Test(groups = {"gobblin.cluster"}, singleThreaded = true)
public class GobblinHelixJobSchedulerTest {
public final static Logger LOG =
LoggerFactory.getLogger(GobblinHelixJobSchedulerTest.class);
-
- private HelixManager helixManager;
private FileSystem localFs;
private Path appWorkDir;
private final Closer closer = Closer.create();
@@ -70,10 +80,17 @@ public class GobblinHelixJobSchedulerTest {
private GobblinTaskRunner gobblinTaskRunner;
private Thread thread;
-
private final String workflowIdSuffix1 = "_1504201348471";
private final String workflowIdSuffix2 = "_1504201348472";
+ private final Instant beginTime = Instant.EPOCH;
+ private final Duration withinThrottlePeriod = Duration.of(1,
ChronoUnit.SECONDS);
+ private final Duration exceedsThrottlePeriod = Duration.of(
+
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY
+ 1, ChronoUnit.SECONDS);
+
+ private String zkConnectingString;
+ private String helixClusterName;
+
@BeforeClass
public void setUp()
throws Exception {
@@ -96,17 +113,11 @@ public class GobblinHelixJobSchedulerTest {
ConfigValueFactory.fromAnyRef(sourceJsonFile.getAbsolutePath()))
.withValue(ConfigurationKeys.JOB_STATE_IN_STATE_STORE,
ConfigValueFactory.fromAnyRef("true")).resolve();
- String zkConnectingString =
baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
- String helixClusterName =
baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+ this.zkConnectingString =
baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+ this.helixClusterName =
baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
HelixUtils.createGobblinHelixCluster(zkConnectingString, helixClusterName);
- this.helixManager = HelixManagerFactory
- .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
- zkConnectingString);
- this.closer.register(() -> helixManager.disconnect());
- this.helixManager.connect();
-
this.localFs = FileSystem.getLocal(new Configuration());
this.closer.register(() -> {
@@ -129,58 +140,192 @@ public class GobblinHelixJobSchedulerTest {
this.thread.start();
}
+ /***
+ * Time span exceeds throttle timeout, within same workflow, throttle is
enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowLongPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateSameWorkflowLongPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, true);
+ }
+
+ /***
+ * Time span is within throttle timeout, within same workflow, throttle is
enabled
+ * Job will not be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowShortPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateSameWorkflowShortPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+ true, true);
+ }
+
+ /***
+ * Time span exceeds throttle timeout, within same workflow, throttle is not
enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateSameWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateSameWorkflowLongPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, true);
+ }
+
+ /***
+ * Time span is within throttle timeout, within same workflow, throttle is
not enabled
+ * Job will be updated
+ * @throws Exception
+ */
@Test
- public void testNewJobAndUpdate()
+ public void testUpdateSameWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateSameWorkflowShortPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, true);
+ }
+
+ /***
+ * Time span is within throttle timeout, within different workflow, throttle
is enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ public void testUpdateDiffWorkflowShortPeriodThrottle()
throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateDiffWorkflowShortPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, false);
+ }
+
+ /***
+ * Time span is within throttle timeout, within different workflow, throttle
is not enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateDiffWorkflowShortPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, false);
+ }
+
+ /***
+ * Time span exceeds throttle timeout, within different workflow, throttle
is enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateDiffWorkflowLongPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, false);
+ }
+
+ /***
+ * Time span exceeds throttle timeout, within different workflow, throttle
is not enabled
+ * Job will be updated
+ * @throws Exception
+ */
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateDiffWorkflowLongPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, false);
+ }
+
+ private GobblinHelixJobScheduler createJobScheduler(HelixManager
helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+ java.nio.file.Path p =
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
- ConfigValueFactory.fromAnyRef("/tmp/" +
GobblinHelixJobScheduler.class.getSimpleName()));
+ ConfigValueFactory.fromAnyRef(p.toString()));
SchedulerService schedulerService = new SchedulerService(new Properties());
NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
jobCatalog.startAsync();
- GobblinHelixJobScheduler jobScheduler =
- new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager,
java.util.Optional.empty(),
- new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog);
-
- final Properties properties1 =
- GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig,
"1", workflowIdSuffix1);
-
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
"true");
+ Config helixJobSchedulerConfig =
ConfigFactory.empty().withValue(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+ ConfigValueFactory.fromAnyRef(isThrottleEnabled));
+ GobblinHelixJobScheduler gobblinHelixJobScheduler = new
GobblinHelixJobScheduler(helixJobSchedulerConfig, helixManager,
java.util.Optional.empty(),
+ new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog, clock);
+ return gobblinHelixJobScheduler;
+ }
+ private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties
properties) {
+
properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
"true");
NewJobConfigArrivalEvent newJobConfigArrivalEvent =
- new
NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1);
- jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
- properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
- "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) +
workflowIdSuffix2);
- Map<String, String> workflowIdMap;
- this.helixManager.connect();
+ new
NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties);
+ return newJobConfigArrivalEvent;
+ }
+
+ private void connectAndAssertWorkflowId(String expectedSuffix, String
jobName, HelixManager helixManager) throws Exception {
+ helixManager.connect();
+ String workFlowId = getWorkflowID(jobName, helixManager);
+ Assert.assertNotNull(workFlowId);
+ Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+ }
- String workFlowId = null;
+ private String getWorkflowID (String jobName, HelixManager helixManager)
+ throws Exception {
+ // Poll helix for up to 30 seconds to fetch until a workflow with a
matching job name exists in Helix and then return that workflowID
long endTime = System.currentTimeMillis() + 30000;
+ Map<String, String> workflowIdMap;
while (System.currentTimeMillis() < endTime) {
- workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
- Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
- if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
- workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
- break;
+ try{
+ workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(helixManager,
+ Collections.singletonList(jobName));
+ } catch(GobblinHelixUnexpectedStateException e){
+ continue;
+ }
+ if (workflowIdMap.containsKey(jobName)) {
+ return workflowIdMap.get(jobName);
}
Thread.sleep(100);
}
- Assert.assertNotNull(workFlowId);
- Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
+ return null;
+ }
- jobScheduler.handleUpdateJobConfigArrival(
- new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
- this.helixManager.connect();
- endTime = System.currentTimeMillis() + 30000;
- while (System.currentTimeMillis() < endTime) {
- workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
- Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
- if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
- workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
- break;
- }
- Thread.sleep(100);
+ private void runWorkflowTest(Duration mockStepAmountTime, String jobSuffix,
+ String newJobWorkflowIdSuffix, String updateWorkflowIdSuffix,
+ String assertUpdateWorkflowIdSuffix, boolean isThrottleEnabled, boolean
isSameWorkflow) throws Exception {
+ Clock mockClock = Mockito.mock(Clock.class);
+ AtomicReference<Instant> nextInstant = new AtomicReference<>(beginTime);
+ when(mockClock.instant()).thenAnswer(invocation ->
nextInstant.getAndAccumulate(null, (currentInstant, x) ->
currentInstant.plus(mockStepAmountTime)));
+
+ // Use GobblinHelixManagerFactory instead of HelixManagerFactory to avoid
the connection error
+ // helixManager is set to local variable to avoid the HelixManager
(ZkClient) is not connected error across tests
+ HelixManager helixManager = GobblinHelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager,
isThrottleEnabled, mockClock);
+ final Properties properties =
GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, jobSuffix,
newJobWorkflowIdSuffix);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(newJobWorkflowIdSuffix,
newJobConfigArrivalEvent.getJobName(), helixManager);
+
+ if (isSameWorkflow) {
+ properties.setProperty(ConfigurationKeys.JOB_ID_KEY,
+ "job_" + properties.getProperty(ConfigurationKeys.JOB_NAME_KEY) +
updateWorkflowIdSuffix);
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties));
+ connectAndAssertWorkflowId(assertUpdateWorkflowIdSuffix,
newJobConfigArrivalEvent.getJobName(), helixManager);
+ }
+ else {
+ final Properties properties2 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, jobSuffix + '2', updateWorkflowIdSuffix);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+ new
NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2);
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2));
+ connectAndAssertWorkflowId(assertUpdateWorkflowIdSuffix,
newJobConfigArrivalEvent2.getJobName(), helixManager);
}
- Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix2));
}
@AfterClass