[HELIX-616] Change JobQueue to be subclass of Workflow instead of WorkflowConfig.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7bbb20be Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7bbb20be Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7bbb20be Branch: refs/heads/helix-0.6.x Commit: 7bbb20be67a939a57f33d8f6d7c814b1dc246575 Parents: 7569a0a Author: Lei Xia <[email protected]> Authored: Fri Nov 20 15:54:34 2015 -0800 Committer: Lei Xia <[email protected]> Committed: Fri Nov 20 15:54:34 2015 -0800 ---------------------------------------------------------------------- .../java/org/apache/helix/task/JobQueue.java | 66 ++++++++++---------- .../java/org/apache/helix/task/TaskDriver.java | 42 ++++++++----- .../java/org/apache/helix/task/TaskRunner.java | 5 +- .../org/apache/helix/task/TaskStateModel.java | 8 +-- .../java/org/apache/helix/task/Workflow.java | 55 +++++++++++----- .../org/apache/helix/task/WorkflowConfig.java | 14 +++++ .../task/TestIndependentTaskRebalancer.java | 1 - .../integration/task/TestRecurringJobQueue.java | 52 ++++++++------- .../task/TestTaskRebalancerParallel.java | 4 +- 9 files changed, 155 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/JobQueue.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobQueue.java b/helix-core/src/main/java/org/apache/helix/task/JobQueue.java index bca5911..0280c88 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobQueue.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobQueue.java @@ -19,31 +19,26 @@ package org.apache.helix.task; * under the License. */ +import org.apache.helix.HelixException; + +import java.util.ArrayList; +import java.util.List; import java.util.Map; /** * A named queue to which jobs can be added */ -public class JobQueue extends WorkflowConfig { +public class JobQueue extends Workflow { /* Config fields */ public static final String CAPACITY = "CAPACITY"; - private final String _name; private final int _capacity; - private JobQueue(String name, int capacity, WorkflowConfig config) { - super(config.getJobDag(), config.getParallelJobs(), config.getTargetState(), config.getExpiry(), - config.isTerminable(), config.getScheduleConfig()); - _name = name; + private JobQueue(String name, int capacity, WorkflowConfig workflowConfig, + Map<String, Map<String, String>> jobConfigs, Map<String, List<TaskConfig>> taskConfigs) { + super(name, workflowConfig, jobConfigs, taskConfigs); _capacity = capacity; - } - - /** - * Get the name of this queue - * @return queue name - */ - public String getName() { - return _name; + validate(); } /** @@ -54,31 +49,24 @@ public class JobQueue extends WorkflowConfig { return _capacity; } - @Override public Map<String, String> getResourceConfigMap() throws Exception { - Map<String, String> cfgMap = super.getResourceConfigMap(); + Map<String, String> cfgMap = _workflowConfig.getResourceConfigMap(); cfgMap.put(CAPACITY, String.valueOf(_capacity)); return cfgMap; } - /** Supports creation of a single empty queue */ - public static class Builder { - private WorkflowConfig.Builder _builder; - private final String _name; + /** Supports creation of a single queue */ + public static class Builder extends Workflow.Builder { private int _capacity = Integer.MAX_VALUE; + private List<String> jobs; public Builder(String name) { - _builder = new WorkflowConfig.Builder(); - _name = name; - } - - public Builder parallelJobs(int parallelJobs) { - _builder.setParallelJobs(parallelJobs); - return this; + super(name); + jobs = new ArrayList<String>(); } public Builder expiry(long expiry) { - _builder.setExpiry(expiry); + _expiry = expiry; return this; } @@ -87,18 +75,32 @@ public class JobQueue extends WorkflowConfig { return this; } + @Override public Builder fromMap(Map<String, String> cfg) { - _builder = WorkflowConfig.Builder.fromMap(cfg); + super.fromMap(cfg); if (cfg.containsKey(CAPACITY)) { _capacity = Integer.parseInt(cfg.get(CAPACITY)); } return this; } + public void enqueueJob(final String job, JobConfig.Builder jobBuilder) { + if (jobs.size() >= _capacity) { + throw new HelixException("Failed to push new job to jobQueue, it is already full"); + } + addJobConfig(job, jobBuilder); + if (jobs.size() > 0) { + String previousJob = jobs.get(jobs.size() - 1); + addParentChildDependency(previousJob, job); + } + jobs.add(job); + } + public JobQueue build() { - _builder.setTerminable(false); - WorkflowConfig workflowConfig = _builder.build(); - return new JobQueue(_name, _capacity, workflowConfig); + WorkflowConfig.Builder builder = buildWorkflowConfig(); + builder.setTerminable(false); + WorkflowConfig workflowConfig = builder.build(); + return new JobQueue(_name, _capacity, workflowConfig, _jobConfigs, _taskConfigs); } } } http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index cc1eac1..654ba4e 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -191,8 +191,8 @@ public class TaskDriver { String flowName = flow.getName(); // first, add workflow config to ZK - _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, flowName), flow - .getWorkflowConfig().getResourceConfigMap()); + _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, flowName), + flow.getWorkflowConfig().getResourceConfigMap()); // then schedule jobs for (String job : flow.getJobConfigs().keySet()) { @@ -206,14 +206,7 @@ public class TaskDriver { /** Creates a new named job queue (workflow) */ public void createQueue(JobQueue queue) throws Exception { - String queueName = queue.getName(); - HelixProperty property = new HelixProperty(queueName); - property.getRecord().getSimpleFields().putAll(queue.getResourceConfigMap()); - boolean created = - _accessor.createProperty(_accessor.keyBuilder().resourceConfig(queueName), property); - if (!created) { - throw new IllegalArgumentException("Queue " + queueName + " already exists!"); - } + start(queue); } /** Flushes a named job queue */ @@ -566,20 +559,35 @@ public class TaskDriver { setWorkflowTargetState(workflow, TargetState.DELETE); } - /** Helper function to change target state for a given workflow */ + /** + * Helper function to change target state for a given workflow + */ private void setWorkflowTargetState(String workflowName, TargetState state) { setSingleWorkflowTargetState(workflowName, state); + // TODO: this is the temporary fix for current task rebalance implementation. + // We should fix this in new task framework implementation. + List<String> resources = _accessor.getChildNames(_accessor.keyBuilder().resourceConfigs()); + for (String resource : resources) { + if (resource.startsWith(workflowName)) { + setSingleWorkflowTargetState(resource, state); + } + } + + /* TODO: use this code for new task framework. // For recurring schedules, last scheduled incomplete workflow must also be handled WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflowName); String lastScheduledWorkflow = wCtx.getLastScheduledSingleWorkflow(); - WorkflowContext lastScheduledWorkflowCtx = - TaskUtil.getWorkflowContext(_propertyStore, lastScheduledWorkflow); - if (lastScheduledWorkflowCtx != null && - !(lastScheduledWorkflowCtx.getWorkflowState() == TaskState.COMPLETED - || lastScheduledWorkflowCtx.getWorkflowState() == TaskState.FAILED)) { - setSingleWorkflowTargetState(lastScheduledWorkflow, state); + if (lastScheduledWorkflow != null) { + WorkflowContext lastScheduledWorkflowCtx = + TaskUtil.getWorkflowContext(_propertyStore, lastScheduledWorkflow); + if (lastScheduledWorkflowCtx != null && !( + lastScheduledWorkflowCtx.getWorkflowState() == TaskState.COMPLETED + || lastScheduledWorkflowCtx.getWorkflowState() == TaskState.FAILED)) { + setSingleWorkflowTargetState(lastScheduledWorkflow, state); + } } + */ } /** Helper function to change target state for a given workflow */ http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java index abd1882..7b17043 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java @@ -48,6 +48,7 @@ public class TaskRunner implements Runnable { // If true, indicates that the task has finished. private volatile boolean _done = false; + public TaskRunner(Task task, String taskName, String taskPartition, String instance, HelixManager manager, String sessionId) { _task = task; @@ -111,7 +112,9 @@ public class TaskRunner implements Runnable { * Signals the task to cancel itself. */ public void cancel() { - _task.cancel(); + if (!_done) { + _task.cancel(); + } } /** http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java index 30939fc..525a38b 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java @@ -205,22 +205,22 @@ public class TaskStateModel extends StateModel { @Transition(to = "INIT", from = "COMPLETED") public void onBecomeInitFromCompleted(Message msg, NotificationContext context) { - _taskRunner = null; + reset(); } @Transition(to = "INIT", from = "STOPPED") public void onBecomeInitFromStopped(Message msg, NotificationContext context) { - _taskRunner = null; + reset(); } @Transition(to = "INIT", from = "TIMED_OUT") public void onBecomeInitFromTimedOut(Message msg, NotificationContext context) { - _taskRunner = null; + reset(); } @Transition(to = "INIT", from = "TASK_ERROR") public void onBecomeInitFromTaskError(Message msg, NotificationContext context) { - _taskRunner = null; + reset(); } @Override http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/Workflow.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java index f69605e..259b72c 100644 --- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java +++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java @@ -49,19 +49,19 @@ public class Workflow { public static final String UNSPECIFIED = "UNSPECIFIED"; /** Workflow name */ - private String _name; + protected String _name; /** Holds workflow-level configurations */ - private WorkflowConfig _workflowConfig; + protected WorkflowConfig _workflowConfig; /** Contains the per-job configurations for all jobs specified in the provided dag */ - private Map<String, Map<String, String>> _jobConfigs; + protected Map<String, Map<String, String>> _jobConfigs; /** Containers the per-job configurations of all individually-specified tasks */ - private Map<String, List<TaskConfig>> _taskConfigs; + protected Map<String, List<TaskConfig>> _taskConfigs; /** Constructs and validates a workflow against a provided dag and config set */ - private Workflow(String name, WorkflowConfig workflowConfig, + protected Workflow(String name, WorkflowConfig workflowConfig, Map<String, Map<String, String>> jobConfigs, Map<String, List<TaskConfig>> taskConfigs) { _name = name; _workflowConfig = workflowConfig; @@ -225,12 +225,14 @@ public class Workflow { /** Build a workflow incrementally from dependencies and single configs, validate at build time */ public static class Builder { - private String _name; - private JobDag _dag; - private Map<String, Map<String, String>> _jobConfigs; - private Map<String, List<TaskConfig>> _taskConfigs; - private ScheduleConfig _scheduleConfig; - private long _expiry; + protected String _name; + protected JobDag _dag; + protected Map<String, Map<String, String>> _jobConfigs; + protected Map<String, List<TaskConfig>> _taskConfigs; + protected ScheduleConfig _scheduleConfig; + protected long _expiry; + protected Map<String, String> _cfgMap; + protected int _parallelJobs = -1; public Builder(String name) { _name = name; @@ -287,6 +289,11 @@ public class Workflow { return this; } + public Builder fromMap(Map<String, String> cfg) { + _cfgMap = cfg; + return this; + } + public Builder setScheduleConfig(ScheduleConfig scheduleConfig) { _scheduleConfig = scheduleConfig; return this; @@ -301,13 +308,30 @@ public class Workflow { return TaskUtil.getNamespacedJobName(_name, job); } + public Builder parallelJobs(int parallelJobs) { + _parallelJobs = parallelJobs; + return this; + } + public Workflow build() { + WorkflowConfig.Builder builder = buildWorkflowConfig(); + // calls validate internally + return new Workflow(_name, builder.build(), _jobConfigs, _taskConfigs); + } + + protected WorkflowConfig.Builder buildWorkflowConfig() { for (String task : _jobConfigs.keySet()) { // addConfig(task, TaskConfig.WORKFLOW_ID, _name); _jobConfigs.get(task).put(JobConfig.WORKFLOW_ID, _name); } - WorkflowConfig.Builder builder = new WorkflowConfig.Builder(); + WorkflowConfig.Builder builder; + if (_cfgMap != null) { + builder = WorkflowConfig.Builder.fromMap(_cfgMap); + } else { + builder = new WorkflowConfig.Builder(); + } + builder.setJobDag(_dag); builder.setTargetState(TargetState.START); if (_scheduleConfig != null) { @@ -316,8 +340,11 @@ public class Workflow { if (_expiry > 0) { builder.setExpiry(_expiry); } - return new Workflow(_name, builder.build(), _jobConfigs, _taskConfigs); // calls validate - // internally + if (_parallelJobs != -1) { + builder.setParallelJobs(_parallelJobs); + } + + return builder; } } } http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java index f15f235..56fba58 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java @@ -100,6 +100,20 @@ public class WorkflowConfig { return defaultDateFormat; } + /** + * Get the scheduled start time of the workflow. + * + * @return start time if the workflow schedule is set, null if no schedule config set. + */ + public Date getStartTime() { + // Workflow with non-scheduled config is ready to schedule immediately. + if (_scheduleConfig == null) { + return null; + } + + return _scheduleConfig.getStartTime(); + } + public Map<String, String> getResourceConfigMap() throws Exception { Map<String, String> cfgMap = new HashMap<String, String>(); cfgMap.put(WorkflowConfig.DAG, getJobDag().toJson()); http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java index 1f17e92..a00a736 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java @@ -274,7 +274,6 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { _driver.start(workflowBuilder.build()); // Ensure the job completes - TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS); TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED); // Ensure that the class was invoked http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java index eef1ce6..38c9113 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java @@ -179,7 +179,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { return cal.getTime(); } - private JobQueue buildRecurrentJobQueue(String jobQueueName, int delayStart) { + private JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart) { Map<String, String> cfgMap = new HashMap<String, String>(); cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000)); cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(60)); @@ -191,11 +191,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { cfgMap.put(WorkflowConfig.START_TIME, WorkflowConfig.getDefaultDateFormat().format(cal.getTime())); //cfgMap.put(WorkflowConfig.START_TIME, - //WorkflowConfig.getDefaultDateFormat().format(getDateFromStartTime("00:00"))); - return (new JobQueue.Builder(jobQueueName).fromMap(cfgMap)).build(); + //WorkflowConfig.getDefaultDateFormat().format(getDateFromStartTime("00:00"))); + return new JobQueue.Builder(jobQueueName).fromMap(cfgMap); } - private JobQueue buildRecurrentJobQueue(String jobQueueName) { + + private JobQueue.Builder buildRecurrentJobQueue(String jobQueueName) { return buildRecurrentJobQueue(jobQueueName, 0); } @@ -205,9 +206,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { // Create a queue LOG.info("Starting job-queue: " + queueName); - JobQueue queue = buildRecurrentJobQueue(queueName); - _driver.createQueue(queue); - + JobQueue.Builder queueBuild = buildRecurrentJobQueue(queueName); // Create and Enqueue jobs List<String> currentJobNames = new ArrayList<String>(); for (int i = 0; i <= 1; i++) { @@ -218,10 +217,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) .setTargetPartitionStates(Sets.newHashSet(targetPartition)); String jobName = targetPartition.toLowerCase() + "Job" + i; - _driver.enqueueJob(queueName, jobName, job); + queueBuild.enqueueJob(jobName, job); currentJobNames.add(jobName); } + _driver.start(queueBuild.build()); + WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName); // ensure job 1 is started before stop it @@ -234,8 +235,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { _driver.delete(queueName); Thread.sleep(500); - queue = buildRecurrentJobQueue(queueName, 5); - _driver.createQueue(queue); + JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName, 5); currentJobNames.clear(); for (int i = 0; i <= 1; i++) { String targetPartition = (i == 0) ? "MASTER" : "SLAVE"; @@ -245,10 +245,13 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) .setTargetPartitionStates(Sets.newHashSet(targetPartition)); String jobName = targetPartition.toLowerCase() + "Job" + i; - _driver.enqueueJob(queueName, jobName, job); + queueBuilder.enqueueJob(jobName, job); currentJobNames.add(jobName); } + _driver.createQueue(queueBuilder.build()); + + wCtx = TestUtil.pollForWorkflowContext(_manager, queueName); // ensure jobs are started and completed @@ -269,12 +272,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { // Create a queue LOG.info("Starting job-queue: " + queueName); - JobQueue queue = buildRecurrentJobQueue(queueName); - _driver.createQueue(queue); + JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName, 5); // Create and Enqueue jobs List<String> currentJobNames = new ArrayList<String>(); Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500)); + Thread.sleep(100); for (int i = 0; i <= 4; i++) { String targetPartition = (i == 0) ? "MASTER" : "SLAVE"; @@ -285,9 +288,10 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { .setTargetPartitionStates(Sets.newHashSet(targetPartition)); String jobName = targetPartition.toLowerCase() + "Job" + i; LOG.info("Enqueuing job: " + jobName); - _driver.enqueueJob(queueName, jobName, job); + queueBuilder.enqueueJob(jobName, job); currentJobNames.add(i, jobName); } + _driver.createQueue(queueBuilder.build()); WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName); String scheduledQueue = wCtx.getLastScheduledSingleWorkflow(); @@ -360,8 +364,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { // Create a queue LOG.info("Starting job-queue: " + queueName); - JobQueue queue = buildRecurrentJobQueue(queueName); - _driver.createQueue(queue); + JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName); // create jobs List<JobConfig.Builder> jobs = new ArrayList<JobConfig.Builder>(); @@ -369,7 +372,6 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500)); final int JOB_COUNTS = 3; - for (int i = 0; i < JOB_COUNTS; i++) { String targetPartition = (i == 0) ? "MASTER" : "SLAVE"; @@ -384,8 +386,11 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { // enqueue all jobs except last one for (int i = 0; i < JOB_COUNTS - 1; ++i) { LOG.info("Enqueuing job: " + jobNames.get(i)); - _driver.enqueueJob(queueName, jobNames.get(i), jobs.get(i)); + queueBuilder.enqueueJob(jobNames.get(i), jobs.get(i)); } + + _driver.createQueue(queueBuilder.build()); + String currentLastJob = jobNames.get(JOB_COUNTS - 2); WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName); @@ -398,6 +403,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { // enqueue the last job LOG.info("Enqueuing job: " + jobNames.get(JOB_COUNTS - 1)); _driver.enqueueJob(queueName, jobNames.get(JOB_COUNTS - 1), jobs.get(JOB_COUNTS - 1)); + _driver.stop(queueName); // remove the last job _driver.deleteJob(queueName, jobNames.get(JOB_COUNTS - 1)); @@ -413,8 +419,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { // Create a queue LOG.info("Starting job-queue: " + queueName); - JobQueue queue = buildRecurrentJobQueue(queueName); - _driver.createQueue(queue); + JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName); // create jobs Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500)); @@ -431,8 +436,11 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { .setTargetPartitionStates(Sets.newHashSet("MASTER")); // enqueue both jobs - _driver.enqueueJob(queueName, "job1", job1); - _driver.enqueueJob(queueName, "job2", job2); + queueBuilder.enqueueJob("job1", job1); + queueBuilder.enqueueJob("job2", job2); + + _driver.createQueue(queueBuilder.build()); + WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName); String scheduledQueue = wCtx.getLastScheduledSingleWorkflow(); http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java index f6fc53a..2ff8c56 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java @@ -156,7 +156,9 @@ public class TestTaskRebalancerParallel extends ZkIntegrationTestBase { String queueName = TestHelper.getTestMethodName(); - JobQueue queue = new JobQueue.Builder(queueName).parallelJobs(PARALLEL_COUNT).build(); + JobQueue.Builder queueBuild = new JobQueue.Builder(queueName); + queueBuild.parallelJobs(PARALLEL_COUNT); + JobQueue queue = queueBuild.build(); _driver.createQueue(queue); List<JobConfig.Builder> jobConfigBuilders = new ArrayList<JobConfig.Builder>();
