This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 0799900 Fix adding same job multiple times to RuntimeJobDag so
parallelJobs config will be honored (#1006)
0799900 is described below
commit 07999003c7832414822cabe2cb418cde42f2bd0f
Author: Ali Reza Zamani Zadeh Najari <[email protected]>
AuthorDate: Tue May 26 10:12:54 2020 -0700
Fix adding same job multiple times to RuntimeJobDag so parallelJobs config
will be honored (#1006)
In this commit, a fix has been implemented to avoid
_readyJobList in RuntimeJobDag to contain multiple
entries of the same job.
The investigation for this fix was motivated by the observation that
JobQueues' parallelJobs config wasn't being honored - it was only processing
jobs sequentially one by one. This commit fixes this.
---
.../java/org/apache/helix/task/RuntimeJobDag.java | 16 +++++-
.../helix/integration/task/TestEnqueueJobs.java | 65 +++++++++++++++++++++-
2 files changed, 78 insertions(+), 3 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
b/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
index b21723b..b5703f6 100644
--- a/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
+++ b/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
@@ -145,9 +145,21 @@ public class RuntimeJobDag extends JobDag {
String.format("Job: %s has either finished already, never been
scheduled, or been removed from DAG", job));
}
// Add finished job's successors to ready-list
+
if (_isJobQueue) {
- if (_lastJob != null && _parentsToChildren.containsKey(_lastJob)) {
-
_readyJobList.offer(_parentsToChildren.get(_lastJob).iterator().next());
+ // If it is a jobQueue, there should be a check to make sure that the a
job has not been added
+ // to the _readyJobList multiple times. This check is necessary because
once the controller
+ // switch happens, the _readyJobList and _inflightJobList will be
created from scratch. In this
+ // case, since there might be many jobs that have been finished before,
we do not want to have a
+ // job several times to the _readyJobList. If _readyJobList has multiple
instances of the same
+ // job, it can compromise the functionality of the parallel jobs.
+ while (_lastJob != null && _parentsToChildren.containsKey(_lastJob)) {
+ String nextJob = _parentsToChildren.get(_lastJob).iterator().next();
+ if (!_readyJobList.contains(nextJob)) {
+ _readyJobList.offer(nextJob);
+ break;
+ }
+ _lastJob = nextJob;
}
} else if (_successorMap.containsKey(job)) {
for (String successor : _successorMap.get(job)) {
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
index 28ee51d..4d88a5a 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
@@ -1,12 +1,16 @@
package org.apache.helix.integration.task;
+import java.util.Collections;
import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobQueue;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.Workflow;
import org.apache.helix.task.WorkflowConfig;
+import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -96,4 +100,63 @@ public class TestEnqueueJobs extends TaskTestBase {
_driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
}
-}
\ No newline at end of file
+
+ @Test
+ public void testQueueParallelJobs() throws InterruptedException {
+ final int parallelJobs = 3;
+ final int numberOfJobsAddedBeforeControllerSwitch = 4;
+ final int totalNumberOfJobs = 7;
+ String queueName = TestHelper.getTestMethodName();
+ JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName);
+ WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder()
+
.setWorkflowId(queueName).setParallelJobs(parallelJobs).setAllowOverlapJobAssignment(true);
+
_driver.start(builder.setWorkflowConfig(workflowCfgBuilder.build()).build());
+ JobConfig.Builder jobBuilder =
+ new
JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+ .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+
.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "10000"));
+
+ // Add 4 jobs to the queue
+ for (int i = 0; i < numberOfJobsAddedBeforeControllerSwitch; i++) {
+ _driver.enqueueJob(queueName, "JOB" + i, jobBuilder);
+ }
+
+ // Wait until all of the enqueued jobs (Job0 to Job3) are finished
+ for (int i = 0; i < numberOfJobsAddedBeforeControllerSwitch; i++) {
+ _driver.pollForJobState(queueName,
TaskUtil.getNamespacedJobName(queueName, "JOB" + i),
+ TaskState.COMPLETED);
+ }
+
+ // Stop the Controller
+ _controller.syncStop();
+
+ // Add 3 more jobs to the queue which should run in parallel after the
Controller is started
+ for (int i = numberOfJobsAddedBeforeControllerSwitch; i <
totalNumberOfJobs; i++) {
+ _driver.enqueueJob(queueName, "JOB" + i, jobBuilder);
+ }
+
+ // Start the Controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME,
controllerName);
+ _controller.syncStart();
+
+ // Wait until all of the newly added jobs (Job4 to Job6) are finished
+ for (int i = numberOfJobsAddedBeforeControllerSwitch; i <
totalNumberOfJobs; i++) {
+ _driver.pollForJobState(queueName,
TaskUtil.getNamespacedJobName(queueName, "JOB" + i),
+ TaskState.COMPLETED);
+ }
+
+ // Make sure the jobs have been running in parallel by checking the jobs
start time and finish
+ // time
+ long maxStartTime = Long.MIN_VALUE;
+ long minFinishTime = Long.MAX_VALUE;
+
+ for (int i = numberOfJobsAddedBeforeControllerSwitch; i <
totalNumberOfJobs; i++) {
+ JobContext jobContext =
+ _driver.getJobContext(TaskUtil.getNamespacedJobName(queueName, "JOB"
+ i));
+ maxStartTime = Long.max(maxStartTime, jobContext.getStartTime());
+ minFinishTime = Long.min(minFinishTime, jobContext.getFinishTime());
+ }
+ Assert.assertTrue(minFinishTime > maxStartTime);
+ }
+}