This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 0ad8af404908a54f7b98ee945bf2dda8e83f002f Author: Hunter Lee <[email protected]> AuthorDate: Thu Mar 28 12:31:25 2019 -0700 TASK2.0: Job scheduling core pipeline fixes Task Framework 2.0 had stability issues and race conditions that weren't being handled correctly. Also, integration with RuntimeJobDag had some loopholes that needed to be fixed. This diff includes such fixes and improvements that makes it really show performance gains and cuts down on redundant computation. Changelist: 1. Race condition when a job is enqueued, only the new JobConfig is updated and not the DAG Add a two-way selective update which ensures consistency between JobConfigs and parent DAGs 2. Moved where getNextJob() is called in scheduleJobs() in WorkflowDispatcher This ensures that once a RuntimeJobDag is rebuilt, update for jobs happens in one pipeline run, which removes any extra delay or slowness 3. Race condition where the job you got from getNextJob is for some reason not schedulable This is due to deleting and enqueuing a job of the same name RuntimeJobDag has the old job name, which conflicts with the dependency in the new DAG This fixes the test: TestTaskRebalancerStopResume so that it does not enqueue a job of the same name 4. JobRebalancer was throwing an NPE when calling processJobStatusUpdateAndAssignment() This was sometimes making the Controller hang Added a null check for JobConfig (job could have been deleted/purged) 5. Fix bug with isWorkflowStopped TargetState comparison was done in the opposite way This fixes the test: TestRecurringJobQueue's testDeletingRecurrentQueueWithHistory() Sometimes contexts do not get deleted cleanly but this does not affect correctness 6. Add TestEnqueueJobs 7. Fix unstable TestGetLastScheduledTaskExecInfo 8. Other minor style fixes --- .../apache/helix/common/caches/TaskDataCache.java | 19 ++++- .../stages/task/TaskSchedulingStage.java | 5 +- .../apache/helix/task/AbstractTaskDispatcher.java | 2 +- .../java/org/apache/helix/task/JobRebalancer.java | 12 ++- .../java/org/apache/helix/task/RuntimeJobDag.java | 12 +-- .../main/java/org/apache/helix/task/TaskUtil.java | 11 ++- .../org/apache/helix/task/WorkflowDispatcher.java | 8 +- .../org/apache/helix/task/WorkflowRebalancer.java | 2 +- .../helix/integration/task/TaskTestUtil.java | 4 +- .../helix/integration/task/TestEnqueueJobs.java | 99 ++++++++++++++++++++++ .../integration/task/TestRecurringJobQueue.java | 5 +- .../task/TestTaskRebalancerStopResume.java | 16 ++-- .../task/TestGetLastScheduledTaskExecInfo.java | 2 +- 13 files changed, 161 insertions(+), 36 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java index 31319ca..5c29124 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java @@ -19,7 +19,6 @@ package org.apache.helix.common.caches; * under the License. */ -import com.google.common.base.Joiner; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -39,7 +38,6 @@ import org.apache.helix.task.AssignableInstanceManager; import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobContext; import org.apache.helix.task.RuntimeJobDag; -import org.apache.helix.task.Task; import org.apache.helix.task.TaskConstants; import org.apache.helix.task.WorkflowConfig; import org.apache.helix.task.WorkflowContext; @@ -128,6 +126,23 @@ public class TaskDataCache extends AbstractDataCache { if (!_jobConfigMap.containsKey(jobName) && newJobConfigs.get(jobName).getWorkflow() != null) { workflowsUpdated.add(newJobConfigs.get(jobName).getWorkflow()); } + + // Only for JobQueues when a new job is enqueued, there exists a race condition where only + // JobConfig is updated and the RuntimeJobDag does not get updated because when the client + // (TaskDriver) submits, it creates JobConfig ZNode first and modifies its parent JobDag next. + // To ensure that they are both properly updated, check that workflow's DAG and existing + // JobConfigs are consistent for JobQueues + JobConfig jobConfig = newJobConfigs.get(jobName); + if (_workflowConfigMap.containsKey(jobConfig.getWorkflow())) { + WorkflowConfig workflowConfig = _workflowConfigMap.get(jobConfig.getWorkflow()); + // Check that the job's parent workflow's DAG contains this job + if ((workflowConfig.isJobQueue() || !workflowConfig.isTerminable()) && !_runtimeJobDagMap + .get(workflowConfig.getWorkflowId()).getAllNodes().contains(jobName)) { + // Inconsistency between JobConfigs and DAGs found. Add the workflow to workflowsUpdated + // to rebuild the RuntimeJobDag + workflowsUpdated.add(jobConfig.getWorkflow()); + } + } } // Removed jobs diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java index bbc2a2f..94af50d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java @@ -259,9 +259,8 @@ public class TaskSchedulingStage extends AbstractBaseStage { String quotaType = getQuotaType(cache.getWorkflowConfig(workflowId)); restOfResources.remove(workflowId); if (assignableInstanceManager.hasGlobalCapacity(quotaType)) { - _workflowDispatcher - .assignWorkflow(workflowId, cache.getWorkflowConfig(workflowId), context, - currentStateOutput, bestPossibleOutput, resourceMap); + _workflowDispatcher.assignWorkflow(workflowId, cache.getWorkflowConfig(workflowId), + context, currentStateOutput, bestPossibleOutput); } else { LogUtil.logInfo(logger, _eventId, String.format( "Fail to schedule new jobs assignment for Workflow %s due to quota %s is full", diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java index 4de8112..78a7419 100644 --- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java @@ -1065,7 +1065,7 @@ public abstract class AbstractTaskDispatcher { */ protected boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) { if (cfg.isRecurring()) { - return cfg.getTargetState() == TargetState.START; + return cfg.getTargetState() == TargetState.STOP; } for (String job : cfg.getJobDag().getAllNodes()) { diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index 0acf825..6d80229 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -40,6 +40,13 @@ public class JobRebalancer extends TaskRebalancer { CurrentStateOutput currStateOutput) { long startTime = System.currentTimeMillis(); final String jobName = resource.getResourceName(); + JobConfig jobConfig = clusterData.getJobConfig(jobName); + if (jobConfig == null) { + LOG.error( + "Job {}'s JobConfig is missing. This job might have been deleted or purged. Skipping status update and assignment!", + jobName); + return buildEmptyAssignment(jobName, currStateOutput); + } LOG.debug("Computer Best Partition for job: " + jobName); if (_jobDispatcher == null) { _jobDispatcher = new JobDispatcher(); @@ -47,9 +54,8 @@ public class JobRebalancer extends TaskRebalancer { _jobDispatcher.init(_manager); _jobDispatcher.updateCache(clusterData); _jobDispatcher.setClusterStatusMonitor(_clusterStatusMonitor); - ResourceAssignment resourceAssignment = _jobDispatcher - .processJobStatusUpdateAndAssignment(jobName, currStateOutput, - clusterData.getWorkflowContext(clusterData.getJobConfig(jobName).getWorkflow())); + ResourceAssignment resourceAssignment = _jobDispatcher.processJobStatusUpdateAndAssignment( + jobName, currStateOutput, clusterData.getWorkflowContext(jobConfig.getWorkflow())); LOG.debug(String.format("JobRebalancer computation takes %d ms for Job %s", System.currentTimeMillis() - startTime, jobName)); return resourceAssignment; diff --git a/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java b/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java index e63e3b4..c223a29 100644 --- a/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java +++ b/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java @@ -20,26 +20,19 @@ package org.apache.helix.task; */ import java.util.ArrayDeque; -import java.util.Collections; import java.util.HashMap; -import java.util.LinkedList; import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Queue; import java.util.Set; import java.util.HashSet; -import org.apache.helix.HelixException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * RuntimeJobDag is a job DAG that provides the job iterator functionality at runtime (when jobs are * actually being assigned per job category). This is to support assignment of jobs based on their * categories and quotas. RuntimeJobDag uses the list scheduling algorithm using ready-list and * inflight-list to return jobs available for scheduling. - * * NOTE: RuntimeJobDag is not thread-safe. */ public class RuntimeJobDag extends JobDag { @@ -125,11 +118,15 @@ public class RuntimeJobDag extends JobDag { } // If list is empty, return null if (_readyJobList.isEmpty()) { + return null; } String nextJob = _readyJobList.poll(); _inflightJobList.add(nextJob); _lastJob = nextJob; + + + return nextJob; } @@ -212,5 +209,4 @@ public class RuntimeJobDag extends JobDag { public Set<String> getInflightJobList() { return new HashSet<>(_inflightJobList); } - } diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index d15cf8f..5da9fc5 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -654,7 +654,7 @@ public class TaskUtil { * @return True if remove success, otherwise false */ protected static boolean removeWorkflow(final HelixDataAccessor accessor, - final HelixPropertyStore propertyStore, String workflow, Set<String> jobs) { + final HelixPropertyStore<ZNRecord> propertyStore, String workflow, Set<String> jobs) { // clean up all jobs for (String job : jobs) { if (!removeJob(accessor, propertyStore, job)) { @@ -724,9 +724,9 @@ public class TaskUtil { * @return */ protected static Set<String> getExpiredJobs(HelixDataAccessor dataAccessor, - HelixPropertyStore propertyStore, WorkflowConfig workflowConfig, + HelixPropertyStore<ZNRecord> propertyStore, WorkflowConfig workflowConfig, WorkflowContext workflowContext) { - Set<String> expiredJobs = new HashSet<String>(); + Set<String> expiredJobs = new HashSet<>(); if (workflowContext != null) { Map<String, TaskState> jobStates = workflowContext.getJobStates(); @@ -742,7 +742,7 @@ public class TaskUtil { continue; } long expiry = jobConfig.getExpiry(); - if (expiry == workflowConfig.DEFAULT_EXPIRY || expiry < 0) { + if (expiry == WorkflowConfig.DEFAULT_EXPIRY || expiry < 0) { expiry = workflowConfig.getExpiry(); } if (jobContext != null && jobStates.get(job) == TaskState.COMPLETED) { @@ -822,7 +822,7 @@ public class TaskUtil { /** * update workflow's property to remove jobs from JOB_STATES if there are already started. */ - protected static boolean removeJobsState(final HelixPropertyStore propertyStore, + protected static boolean removeJobsState(final HelixPropertyStore<ZNRecord> propertyStore, final String workflow, final Set<String> jobs) { String contextPath = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow, TaskUtil.CONTEXT_NODE); @@ -983,7 +983,6 @@ public class TaskUtil { * @param workflowConfig * @param workflowContext */ - public static void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig, WorkflowContext workflowContext, HelixManager manager, RebalanceScheduler rebalanceScheduler) { diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java index 80d9afb..51b21eb 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java @@ -174,7 +174,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { public void assignWorkflow(String workflow, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, CurrentStateOutput currentStateOutput, - BestPossibleStateOutput bestPossibleOutput, Map<String, Resource> resourceMap) { + BestPossibleStateOutput bestPossibleOutput) { // Fetch workflow configuration and context if (workflowCfg == null) { // Already logged in status update. @@ -240,13 +240,13 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { // Assign new jobs while (nextJob != null) { String job = nextJob; - nextJob = jobDag.getNextJob(); TaskState jobState = workflowCtx.getJobState(job); if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) { if (LOG.isDebugEnabled()) { LOG.debug("Job " + job + " is already started or completed."); } processJob(job, currentStateOutput, bestPossibleOutput, workflowCtx); + nextJob = jobDag.getNextJob(); continue; } @@ -258,6 +258,9 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { break; } + // TODO: Part of isJobReadyToSchedule() is already done by RuntimeJobDag. Because there is + // some duplicate logic, consider refactoring. The check here and the ready-list in + // RuntimeJobDag may cause conflicts. // check ancestor job status if (isJobReadyToSchedule(job, workflowCfg, workflowCtx, inCompleteAllJobCount, jobConfigMap, clusterDataCache, clusterDataCache.getAssignableInstanceManager())) { @@ -288,6 +291,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { scheduledJobs++; } } + nextJob = jobDag.getNextJob(); } long currentScheduledTime = diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java index d0c4381..2411b39 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java @@ -54,7 +54,7 @@ public class WorkflowRebalancer extends TaskRebalancer { _workflowDispatcher.updateWorkflowStatus(workflow, workflowCfg, workflowCtx, currStateOutput, new BestPossibleStateOutput()); _workflowDispatcher.assignWorkflow(workflow, workflowCfg, workflowCtx, currStateOutput, - new BestPossibleStateOutput(), new HashMap<String, Resource>()); + new BestPossibleStateOutput()); LOG.debug(String.format("WorkflowRebalancer computation takes %d ms for workflow %s", System.currentTimeMillis() - startTime, workflow)); diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java index 02a7628..47b7cb8 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java @@ -206,7 +206,7 @@ public class TaskTestUtil { } public static JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart, - int recurrenInSeconds, TargetState targetState) { + int recurrenceInSeconds, TargetState targetState) { WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder(jobQueueName); workflowCfgBuilder.setExpiry(120000); if (targetState != null) { @@ -218,7 +218,7 @@ public class TaskTestUtil { cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + delayStart % 60); cal.set(Calendar.MILLISECOND, 0); ScheduleConfig scheduleConfig = - ScheduleConfig.recurringFromDate(cal.getTime(), TimeUnit.SECONDS, recurrenInSeconds); + ScheduleConfig.recurringFromDate(cal.getTime(), TimeUnit.SECONDS, recurrenceInSeconds); workflowCfgBuilder.setScheduleConfig(scheduleConfig); return new JobQueue.Builder(jobQueueName).setWorkflowConfig(workflowCfgBuilder.build()); } diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java new file mode 100644 index 0000000..28ee51d --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java @@ -0,0 +1,99 @@ +package org.apache.helix.integration.task; + +import org.apache.helix.TestHelper; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobQueue; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.TaskUtil; +import org.apache.helix.task.Workflow; +import org.apache.helix.task.WorkflowConfig; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestEnqueueJobs extends TaskTestBase { + + @BeforeClass + public void beforeClass() throws Exception { + setSingleTestEnvironment(); + super.beforeClass(); + } + + @Test + public void testJobQueueAddingJobsOneByOne() throws InterruptedException { + String queueName = TestHelper.getTestMethodName(); + JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName); + WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder().setWorkflowId(queueName).setParallelJobs(1); + _driver.start(builder.setWorkflowConfig(workflowCfgBuilder.build()).build()); + JobConfig.Builder jobBuilder = + new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) + .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2); + _driver.enqueueJob(queueName, "JOB0", jobBuilder); + for (int i = 1; i < 5; i++) { + _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + (i - 1)), + 10000L, TaskState.COMPLETED); + _driver.waitToStop(queueName, 5000L); + _driver.enqueueJob(queueName, "JOB" + i, jobBuilder); + _driver.resume(queueName); + } + + _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + 4), + TaskState.COMPLETED); + } + + @Test + public void testJobQueueAddingJobsAtSametime() throws InterruptedException { + String queueName = TestHelper.getTestMethodName(); + JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName); + WorkflowConfig.Builder workflowCfgBuilder = + new WorkflowConfig.Builder().setWorkflowId(queueName).setParallelJobs(1); + _driver.start(builder.setWorkflowConfig(workflowCfgBuilder.build()).build()); + + // Adding jobs + JobConfig.Builder jobBuilder = + new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) + .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2); + _driver.waitToStop(queueName, 5000L); + for (int i = 0; i < 5; i++) { + _driver.enqueueJob(queueName, "JOB" + i, jobBuilder); + } + _driver.resume(queueName); + + _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + 4), + TaskState.COMPLETED); + } + + @Test + public void testJobSubmitGenericWorkflows() throws InterruptedException { + String workflowName = TestHelper.getTestMethodName(); + JobConfig.Builder jobBuilder = + new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) + .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2); + Workflow.Builder builder = new Workflow.Builder(workflowName); + for (int i = 0; i < 5; i++) { + builder.addJob("JOB" + i, jobBuilder); + } + + /** + * Dependency visualization + * JOB0 + * + * / | \ + * + * JOB1 <-JOB2 JOB4 + * + * | / + * + * JOB3 + */ + + builder.addParentChildDependency("JOB0", "JOB1"); + builder.addParentChildDependency("JOB0", "JOB2"); + builder.addParentChildDependency("JOB0", "JOB4"); + builder.addParentChildDependency("JOB1", "JOB2"); + builder.addParentChildDependency("JOB2", "JOB3"); + builder.addParentChildDependency("JOB4", "JOB3"); + _driver.start(builder.build()); + + _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED); + } +} \ No newline at end of file diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java index 5eba70a..6d05d38 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java @@ -274,12 +274,15 @@ public class TestRecurringJobQueue extends TaskTestBase { // Record all scheduled workflows wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName); - List<String> scheduledWorkflows = new ArrayList<String>(wCtx.getScheduledWorkflows()); + List<String> scheduledWorkflows = new ArrayList<>(wCtx.getScheduledWorkflows()); final String lastScheduledWorkflow = wCtx.getLastScheduledSingleWorkflow(); // Delete recurrent workflow _driver.delete(queueName); + // Try to delete again to make sure things are cleaned up + _driver.delete(queueName); + // Wait until recurrent workflow and the last scheduled workflow are cleaned up boolean result = TestHelper.verify(new TestHelper.Verifier() { @Override public boolean verify() throws Exception { diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java index 5bd3392..5f5c6a3 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java @@ -231,12 +231,16 @@ public class TestTaskRebalancerStopResume extends TaskTestBase { currentJobNames.remove(deletedJob2); // add job 3 back - JobConfig.Builder job = - new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND) - .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(Sets.newHashSet("SLAVE")); - LOG.info("Enqueuing job: " + deletedJob2); - _driver.enqueueJob(queueName, deletedJob2, job); - currentJobNames.add(deletedJob2); + JobConfig.Builder job = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND) + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) + .setTargetPartitionStates(Sets.newHashSet("SLAVE")); + + // the name here MUST be unique in order to avoid conflicts with the old job cached in + // RuntimeJobDag + String newJob = deletedJob2 + "_second"; + LOG.info("Enqueuing job: " + newJob); + _driver.enqueueJob(queueName, newJob, job); + currentJobNames.add(newJob); // Ensure the jobs left are successful completed in the correct order long preJobFinish = 0; diff --git a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java index 25c486a..a73f02d 100644 --- a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java +++ b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java @@ -60,8 +60,8 @@ public class TestGetLastScheduledTaskExecInfo extends TaskTestBase { List<Long> startTimesFastTasks = setupTasks("TestWorkflow_3", 4, 10); // API call needs to return the most recent timestamp (value at last index) lastScheduledTaskTs = _driver.getLastScheduledTaskTimestamp("TestWorkflow_3"); - execInfo = _driver.getLastScheduledTaskExecutionInfo("TestWorkflow_3"); Thread.sleep(200); // Let the tasks run + execInfo = _driver.getLastScheduledTaskExecutionInfo("TestWorkflow_3"); Assert.assertEquals(startTimesFastTasks.get(startTimesFastTasks.size() - 1), lastScheduledTaskTs); Assert.assertEquals(execInfo.getJobName(), "TestWorkflow_3_job_0");
