http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java index 8e72f7a..6e6727c 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java @@ -19,15 +19,6 @@ package org.apache.helix.task; * under the License. */ -import com.google.common.collect.Lists; -import org.I0Itec.zkclient.DataUpdater; -import org.apache.helix.*; -import org.apache.helix.controller.stages.ClusterDataCache; -import org.apache.helix.controller.stages.CurrentStateOutput; -import org.apache.helix.model.*; -import org.apache.helix.model.builder.CustomModeISBuilder; -import org.apache.helix.model.builder.IdealStateBuilder; -import org.apache.log4j.Logger; import java.text.DateFormat; import java.text.SimpleDateFormat; @@ -39,6 +30,23 @@ import java.util.Map; import java.util.Set; import java.util.TimeZone; +import com.google.common.collect.Lists; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixProperty; +import org.apache.helix.PropertyKey; +import org.apache.helix.ZNRecord; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.Resource; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.builder.CustomModeISBuilder; +import org.apache.helix.model.builder.IdealStateBuilder; +import org.apache.log4j.Logger; + + /** * Custom rebalancer implementation for the {@code Workflow} in task state model. */ @@ -52,7 +60,7 @@ public class WorkflowRebalancer extends TaskRebalancer { LOG.debug("Computer Best Partition for workflow: " + workflow); // Fetch workflow configuration and context - WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflow); + WorkflowConfig workflowCfg = TaskUtil.getWorkflowConfig(_manager, workflow); if (workflowCfg == null) { LOG.warn("Workflow configuration is NULL for " + workflow); return buildEmptyAssignment(workflow, currStateOutput); @@ -70,7 +78,7 @@ public class WorkflowRebalancer extends TaskRebalancer { TargetState targetState = workflowCfg.getTargetState(); if (targetState == TargetState.DELETE) { LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context."); - cleanupWorkflow(workflow, workflowCfg); + cleanupWorkflow(workflow, workflowCfg); return buildEmptyAssignment(workflow, currStateOutput); } @@ -124,7 +132,10 @@ public class WorkflowRebalancer extends TaskRebalancer { LOG.debug("Workflow " + workflow + " is not ready to be scheduled."); } - cleanExpiredJobs(workflowCfg, workflowCtx); + // clean up the expired jobs if it is a queue. + if (!workflowCfg.isTerminable() || workflowCfg.isJobQueue()) { + purgeExpiredJobs(workflow, workflowCfg, workflowCtx); + } TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx); return buildEmptyAssignment(workflow, currStateOutput); @@ -158,7 +169,8 @@ public class WorkflowRebalancer extends TaskRebalancer { // check ancestor job status if (isJobReadyToSchedule(job, workflowCfg, workflowCtx)) { - JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job); + JobConfig jobConfig = TaskUtil.getJobConfig(_manager, job); + // Since the start time is calculated base on the time of completion of parent jobs for this // job, the calculated start time should only be calculate once. Persist the calculated time // in WorkflowContext znode. @@ -440,140 +452,61 @@ public class WorkflowRebalancer extends TaskRebalancer { } /** - * Cleans up workflow configs and workflow contexts associated with this workflow, - * including all job-level configs and context, plus workflow-level information. + * Clean up a workflow. This removes the workflow config, idealstate, externalview and workflow + * contexts associated with this workflow, and all jobs information, including their configs, + * context, IS and EV. */ private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg) { LOG.info("Cleaning up workflow: " + workflow); - HelixDataAccessor accessor = _manager.getHelixDataAccessor(); - /* - if (workflowCtx != null && workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) { - LOG.error("Workflow " + workflow + " has not completed, abort the clean up task."); - return; - }*/ - - for (String job : workflowcfg.getJobDag().getAllNodes()) { - cleanupJob(job, workflow); - } - - // clean up workflow-level info if this was the last in workflow if (workflowcfg.isTerminable() || workflowcfg.getTargetState() == TargetState.DELETE) { - // clean up IS & EV - cleanupIdealStateExtView(_manager.getHelixDataAccessor(), workflow); - - // delete workflow config - PropertyKey workflowCfgKey = TaskUtil.getWorkflowConfigKey(accessor, workflow); - if (accessor.getProperty(workflowCfgKey) != null) { - if (!accessor.removeProperty(workflowCfgKey)) { - LOG.error(String.format( - "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix.", - workflow, workflowCfgKey)); - } + Set<String> jobs = workflowcfg.getJobDag().getAllNodes(); + // Remove all pending timer tasks for this workflow if exists + _scheduledRebalancer.removeScheduledRebalance(workflow); + for (String job : jobs) { + _scheduledRebalancer.removeScheduledRebalance(job); } - // Delete workflow context - LOG.info("Removing workflow context: " + workflow); - if (!TaskUtil.removeWorkflowContext(_manager, workflow)) { - LOG.error(String.format( - "Error occurred while trying to clean up workflow %s. Aborting further clean up steps.", - workflow)); + if (!TaskUtil.removeWorkflow(_manager, workflow, jobs)) { + LOG.warn("Failed to clean up workflow " + workflow); } - - // Remove pending timer task for this workflow if exists - _scheduledRebalancer.removeScheduledRebalance(workflow); + } else { + LOG.info("Did not clean up workflow " + workflow + + " because neither the workflow is non-terminable nor is set to DELETE."); } } - /** - * Cleans up job configs and job contexts associated with this job, - * including all job-level configs and context, plus the job info in the workflow context. + * Clean up all jobs that are COMPLETED and passes its expiry time. + * + * @param workflowConfig + * @param workflowContext */ - private void cleanupJob(final String job, String workflow) { - LOG.info("Cleaning up job: " + job + " in workflow: " + workflow); - HelixDataAccessor accessor = _manager.getHelixDataAccessor(); - - // Remove any idealstate and externalView. - cleanupIdealStateExtView(accessor, job); - - // Remove DAG references in workflow - PropertyKey workflowKey = TaskUtil.getWorkflowConfigKey(accessor, workflow); - DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() { - @Override public ZNRecord update(ZNRecord currentData) { - if (currentData != null) { - JobDag jobDag = JobDag.fromJson( - currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name())); - for (String child : jobDag.getDirectChildren(job)) { - jobDag.getChildrenToParents().get(child).remove(job); - } - for (String parent : jobDag.getDirectParents(job)) { - jobDag.getParentsToChildren().get(parent).remove(job); - } - jobDag.getChildrenToParents().remove(job); - jobDag.getParentsToChildren().remove(job); - jobDag.getAllNodes().remove(job); - try { - currentData - .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson()); - } catch (Exception e) { - LOG.error("Could not update DAG for job: " + job, e); - } - } else { - LOG.error("Could not update DAG for job: " + job + " ZNRecord is null."); - } - return currentData; - } - }; - accessor.getBaseDataAccessor().update(workflowKey.getPath(), dagRemover, - AccessOption.PERSISTENT); - - // Delete job configs. - PropertyKey cfgKey = TaskUtil.getWorkflowConfigKey(accessor, job); - if (accessor.getProperty(cfgKey) != null) { - if (!accessor.removeProperty(cfgKey)) { - LOG.error(String.format( - "Error occurred while trying to clean up job %s. Failed to remove node %s from Helix.", - job, cfgKey)); - } - } - - // Delete job context - // For recurring workflow, it's OK if the node doesn't exist. - if (!TaskUtil.removeJobContext(_manager, job)) { - LOG.warn(String.format("Error occurred while trying to clean up job %s.", job)); - } - - LOG.info(String.format("Successfully cleaned up job context %s.", job)); - - _scheduledRebalancer.removeScheduledRebalance(job); - } - - private void cleanExpiredJobs(WorkflowConfig workflowConfig, WorkflowContext workflowContext) { - if (workflowContext == null) { + // TODO: run this in a separate thread. + // Get all jobConfigs & jobContext from ClusterCache. + protected void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig, + WorkflowContext workflowContext) { + if (workflowContext.getLastJobPurgeTime() + JOB_PURGE_INTERVAL > System.currentTimeMillis()) { return; } - Map<String, TaskState> jobStates = workflowContext.getJobStates(); - long newTimeToClean = Long.MAX_VALUE; - for (String job : workflowConfig.getJobDag().getAllNodes()) { - JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job); - JobContext jobContext = TaskUtil.getJobContext(_manager, job); - // There is no ABORTED state for JobQueue Job. The job will die with workflow - if (jobContext != null && jobStates.containsKey(job) && ( - jobStates.get(job) == TaskState.COMPLETED || jobStates.get(job) == TaskState.FAILED)) { - if (System.currentTimeMillis() >= jobConfig.getExpiry() + jobContext.getFinishTime()) { - cleanupJob(job, workflowConfig.getWorkflowId()); - } else { - newTimeToClean = - Math.min(newTimeToClean, jobConfig.getExpiry() + jobContext.getFinishTime()); - } - } + Set<String> expiredJobs = TaskUtil + .getExpiredJobs(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(), + workflowConfig, workflowContext); + for (String job : expiredJobs) { + _scheduledRebalancer.removeScheduledRebalance(job); + } + if (!TaskUtil + .removeJobsFromWorkflow(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(), + workflow, expiredJobs, true)) { + LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow); } - if (newTimeToClean < Long.MAX_VALUE && newTimeToClean < _scheduledRebalancer - .getRebalanceTime(workflowConfig.getWorkflowId())) { - _scheduledRebalancer - .scheduleRebalance(_manager, workflowConfig.getWorkflowId(), newTimeToClean); + long currentTime = System.currentTimeMillis(); + long nextPurgeTime = currentTime + JOB_PURGE_INTERVAL; + workflowContext.setLastJobPurgeTime(currentTime); + long currentScheduledTime = _scheduledRebalancer.getRebalanceTime(workflow); + if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) { + _scheduledRebalancer.scheduleRebalance(_manager, workflow, nextPurgeTime); } }
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java b/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java index 7688017..ef8f971 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java @@ -116,7 +116,7 @@ public class TaskAdmin { driver.flushQueue(workflow); break; case clean: - driver.cleanupJobQueue(workflow); + driver.cleanupQueue(workflow); break; default: throw new IllegalArgumentException("Unknown command " + args[0]); http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java index 641f13a..6036732 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java @@ -212,9 +212,10 @@ public class TaskTestUtil { } public static JobQueue.Builder buildJobQueue(String jobQueueName, int delayStart, - int failureThreshold) { + int failureThreshold, int capacity) { WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder(); workflowCfgBuilder.setExpiry(120000); + workflowCfgBuilder.setCapacity(capacity); Calendar cal = Calendar.getInstance(); cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60); @@ -228,8 +229,17 @@ public class TaskTestUtil { return new JobQueue.Builder(jobQueueName).setWorkflowConfig(workflowCfgBuilder.build()); } + public static JobQueue.Builder buildJobQueue(String jobQueueName, int delayStart, + int failureThreshold) { + return buildJobQueue(jobQueueName, delayStart, failureThreshold, 500); + } + public static JobQueue.Builder buildJobQueue(String jobQueueName) { - return buildJobQueue(jobQueueName, 0, 0); + return buildJobQueue(jobQueueName, 0, 0, 500); + } + + public static JobQueue.Builder buildJobQueue(String jobQueueName, int capacity) { + return buildJobQueue(jobQueueName, 0, 0, capacity); } public static WorkflowContext buildWorkflowContext(String workflowResource, http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java index 71fed49..a0a1617 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java @@ -52,7 +52,7 @@ public class TestJobQueueCleanUp extends TaskTestBase { _driver.start(builder.build()); _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + 4), TaskState.FAILED); - _driver.cleanupJobQueue(queueName); + _driver.cleanupQueue(queueName); Assert.assertEquals(_driver.getWorkflowConfig(queueName).getJobDag().size(), 0); } @@ -71,7 +71,7 @@ public class TestJobQueueCleanUp extends TaskTestBase { _driver.start(builder.build()); _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + 3), TaskState.IN_PROGRESS); - _driver.cleanupJobQueue(queueName); + _driver.cleanupQueue(queueName); Assert.assertEquals(_driver.getWorkflowConfig(queueName).getJobDag().size(), 2); } } http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java index 16df022..4d0c3c6 100644 --- a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java +++ b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java @@ -19,6 +19,9 @@ package org.apache.helix.task; * under the License. */ +import java.util.HashSet; +import java.util.Set; +import org.apache.helix.HelixException; import org.apache.helix.TestHelper; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.integration.task.MockTask; @@ -39,34 +42,57 @@ public class TestCleanExpiredJobs extends TaskSynchronizedTestBase { @Test public void testCleanExpiredJobs() throws Exception { - String workflowName = TestHelper.getTestMethodName(); - JobQueue.Builder builder = TaskTestUtil.buildJobQueue(workflowName); + String queue = TestHelper.getTestMethodName(); + JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queue); JobConfig.Builder jobBuilder = new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2) .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG).setExpiry(1L); long startTime = System.currentTimeMillis(); - for (int i = 0; i < 5; i++) { + for (int i = 0; i < 8; i++) { builder.enqueueJob("JOB" + i, jobBuilder); - TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(workflowName, "JOB" + i), + } + + for (int i = 0; i < 8; i++) { + TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(queue, "JOB" + i), TaskTestUtil.buildJobContext(startTime, startTime, TaskPartitionState.COMPLETED)); } + for (int i = 4; i < 6; i++) { + TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(queue, "JOB" + i), + TaskTestUtil + .buildJobContext(startTime, startTime + 100000, TaskPartitionState.COMPLETED)); + } + WorkflowContext workflowContext = TaskTestUtil - .buildWorkflowContext(workflowName, TaskState.IN_PROGRESS, null, TaskState.COMPLETED, - TaskState.FAILED, TaskState.ABORTED, TaskState.IN_PROGRESS, TaskState.NOT_STARTED); + .buildWorkflowContext(queue, TaskState.IN_PROGRESS, null, TaskState.COMPLETED, + TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED, TaskState.COMPLETED, + TaskState.COMPLETED, TaskState.IN_PROGRESS, TaskState.NOT_STARTED); + + Set<String> jobsLeft = new HashSet<String>(); + jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 1)); + jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 2)); + jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 4)); + jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 5)); + jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 6)); + jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 7)); + _driver.start(builder.build()); _cache = TaskTestUtil.buildClusterDataCache(_manager.getHelixDataAccessor()); - TaskUtil.setWorkflowContext(_manager, workflowName, workflowContext); + TaskUtil.setWorkflowContext(_manager, queue, workflowContext); TaskTestUtil.calculateBestPossibleState(_cache, _manager); - WorkflowConfig workflowConfig = _driver.getWorkflowConfig(workflowName); - Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), 3); + WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queue); + Assert.assertEquals(workflowConfig.getJobDag().getAllNodes(), jobsLeft); + workflowContext = _driver.getWorkflowContext(queue); + Assert.assertTrue(workflowContext.getLastJobPurgeTime() > startTime + && workflowContext.getLastJobPurgeTime() < System.currentTimeMillis()); } - @Test void testNotCleanJobsDueToParentFail() throws Exception { - String workflowName = TestHelper.getTestMethodName(); - JobQueue.Builder builder = TaskTestUtil.buildJobQueue(workflowName); + @Test + void testNotCleanJobsDueToParentFail() throws Exception { + String queue = TestHelper.getTestMethodName(); + JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queue); JobConfig.Builder jobBuilder = new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2) @@ -76,17 +102,57 @@ public class TestCleanExpiredJobs extends TaskSynchronizedTestBase { builder.enqueueJob("JOB0", jobBuilder); builder.enqueueJob("JOB1", jobBuilder); builder.addParentChildDependency("JOB0", "JOB1"); - TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(workflowName, "JOB0"), + TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(queue, "JOB0"), TaskTestUtil.buildJobContext(startTime, startTime, TaskPartitionState.COMPLETED)); WorkflowContext workflowContext = TaskTestUtil - .buildWorkflowContext(workflowName, TaskState.IN_PROGRESS, null, TaskState.FAILED, + .buildWorkflowContext(queue, TaskState.IN_PROGRESS, null, TaskState.FAILED, TaskState.FAILED); _driver.start(builder.build()); _cache = TaskTestUtil.buildClusterDataCache(_manager.getHelixDataAccessor()); - TaskUtil.setWorkflowContext(_manager, workflowName, workflowContext); + TaskUtil.setWorkflowContext(_manager, queue, workflowContext); TaskTestUtil.calculateBestPossibleState(_cache, _manager); - WorkflowConfig workflowConfig = _driver.getWorkflowConfig(workflowName); - Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), 1); + WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queue); + Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), 2); + } + + @Test + void testNotCleanJobsThroughEnqueueJob() throws Exception { + int capacity = 5; + String queue = TestHelper.getTestMethodName(); + JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queue, capacity); + JobConfig.Builder jobBuilder = + new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) + .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2) + .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG).setExpiry(1L); + + long startTime = System.currentTimeMillis(); + for (int i = 0; i < capacity; i++) { + builder.enqueueJob("JOB" + i, jobBuilder); + } + + _driver.start(builder.build()); + try { + // should fail here since the queue is full. + _driver.enqueueJob(queue, "JOB" + capacity, jobBuilder); + Assert.fail("Queue is not full."); + } catch (HelixException e) { + Assert.assertTrue(e.getMessage().contains("queue is full")); + } + + for (int i = 0; i < capacity; i++) { + TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(queue, "JOB" + i), + TaskTestUtil.buildJobContext(startTime, startTime, TaskPartitionState.COMPLETED)); + } + + WorkflowContext workflowContext = TaskTestUtil + .buildWorkflowContext(queue, TaskState.IN_PROGRESS, null, TaskState.COMPLETED, + TaskState.COMPLETED, TaskState.FAILED, TaskState.IN_PROGRESS); + TaskUtil.setWorkflowContext(_manager, queue, workflowContext); + + _driver.enqueueJob(queue, "JOB" + capacity, jobBuilder); + + WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queue); + Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), capacity - 1); } }