http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 8e72f7a..6e6727c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -19,15 +19,6 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import com.google.common.collect.Lists;
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.helix.*;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.*;
-import org.apache.helix.model.builder.CustomModeISBuilder;
-import org.apache.helix.model.builder.IdealStateBuilder;
-import org.apache.log4j.Logger;
 
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
@@ -39,6 +30,23 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TimeZone;
 
+import com.google.common.collect.Lists;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.helix.model.builder.IdealStateBuilder;
+import org.apache.log4j.Logger;
+
+
 /**
  * Custom rebalancer implementation for the {@code Workflow} in task state 
model.
  */
@@ -52,7 +60,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
     LOG.debug("Computer Best Partition for workflow: " + workflow);
 
     // Fetch workflow configuration and context
-    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflow);
+    WorkflowConfig workflowCfg = TaskUtil.getWorkflowConfig(_manager, 
workflow);
     if (workflowCfg == null) {
       LOG.warn("Workflow configuration is NULL for " + workflow);
       return buildEmptyAssignment(workflow, currStateOutput);
@@ -70,7 +78,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
     TargetState targetState = workflowCfg.getTargetState();
     if (targetState == TargetState.DELETE) {
       LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the 
workflow context.");
-      cleanupWorkflow(workflow, workflowCfg);
+      cleanupWorkflow(workflow,  workflowCfg);
       return buildEmptyAssignment(workflow, currStateOutput);
     }
 
@@ -124,7 +132,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
       LOG.debug("Workflow " + workflow + " is not ready to be scheduled.");
     }
 
-    cleanExpiredJobs(workflowCfg, workflowCtx);
+    // clean up the expired jobs if it is a queue.
+    if (!workflowCfg.isTerminable() || workflowCfg.isJobQueue()) {
+      purgeExpiredJobs(workflow, workflowCfg, workflowCtx);
+    }
 
     TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
     return buildEmptyAssignment(workflow, currStateOutput);
@@ -158,7 +169,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
 
       // check ancestor job status
       if (isJobReadyToSchedule(job, workflowCfg, workflowCtx)) {
-        JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job);
+        JobConfig jobConfig = TaskUtil.getJobConfig(_manager, job);
+
         // Since the start time is calculated base on the time of completion 
of parent jobs for this
         // job, the calculated start time should only be calculate once. 
Persist the calculated time
         // in WorkflowContext znode.
@@ -440,140 +452,61 @@ public class WorkflowRebalancer extends TaskRebalancer {
   }
 
   /**
-   * Cleans up workflow configs and workflow contexts associated with this 
workflow,
-   * including all job-level configs and context, plus workflow-level 
information.
+   * Clean up a workflow. This removes the workflow config, idealstate, 
externalview and workflow
+   * contexts associated with this workflow, and all jobs information, 
including their configs,
+   * context, IS and EV.
    */
   private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg) {
     LOG.info("Cleaning up workflow: " + workflow);
-    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
 
-    /*
-    if (workflowCtx != null && workflowCtx.getFinishTime() == 
WorkflowContext.UNFINISHED) {
-      LOG.error("Workflow " + workflow + " has not completed, abort the clean 
up task.");
-      return;
-    }*/
-
-    for (String job : workflowcfg.getJobDag().getAllNodes()) {
-      cleanupJob(job, workflow);
-    }
-
-    // clean up workflow-level info if this was the last in workflow
     if (workflowcfg.isTerminable() || workflowcfg.getTargetState() == 
TargetState.DELETE) {
-      // clean up IS & EV
-      cleanupIdealStateExtView(_manager.getHelixDataAccessor(), workflow);
-
-      // delete workflow config
-      PropertyKey workflowCfgKey = TaskUtil.getWorkflowConfigKey(accessor, 
workflow);
-      if (accessor.getProperty(workflowCfgKey) != null) {
-        if (!accessor.removeProperty(workflowCfgKey)) {
-          LOG.error(String.format(
-              "Error occurred while trying to clean up workflow %s. Failed to 
remove node %s from Helix.",
-              workflow, workflowCfgKey));
-        }
+      Set<String> jobs = workflowcfg.getJobDag().getAllNodes();
+      // Remove all pending timer tasks for this workflow if exists
+      _scheduledRebalancer.removeScheduledRebalance(workflow);
+      for (String job : jobs) {
+        _scheduledRebalancer.removeScheduledRebalance(job);
       }
-      // Delete workflow context
-      LOG.info("Removing workflow context: " + workflow);
-      if (!TaskUtil.removeWorkflowContext(_manager, workflow)) {
-        LOG.error(String.format(
-            "Error occurred while trying to clean up workflow %s. Aborting 
further clean up steps.",
-            workflow));
+      if (!TaskUtil.removeWorkflow(_manager, workflow, jobs)) {
+        LOG.warn("Failed to clean up workflow " + workflow);
       }
-
-      // Remove pending timer task for this workflow if exists
-      _scheduledRebalancer.removeScheduledRebalance(workflow);
+    } else {
+      LOG.info("Did not clean up workflow " + workflow
+          + " because neither the workflow is non-terminable nor is set to 
DELETE.");
     }
   }
 
-
   /**
-   * Cleans up job configs and job contexts associated with this job,
-   * including all job-level configs and context, plus the job info in the 
workflow context.
+   * Clean up all jobs that are COMPLETED and passes its expiry time.
+   *
+   * @param workflowConfig
+   * @param workflowContext
    */
-  private void cleanupJob(final String job, String workflow) {
-    LOG.info("Cleaning up job: " + job + " in workflow: " + workflow);
-    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-
-    // Remove any idealstate and externalView.
-    cleanupIdealStateExtView(accessor, job);
-
-    // Remove DAG references in workflow
-    PropertyKey workflowKey = TaskUtil.getWorkflowConfigKey(accessor, 
workflow);
-    DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
-      @Override public ZNRecord update(ZNRecord currentData) {
-        if (currentData != null) {
-          JobDag jobDag = JobDag.fromJson(
-              
currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
-          for (String child : jobDag.getDirectChildren(job)) {
-            jobDag.getChildrenToParents().get(child).remove(job);
-          }
-          for (String parent : jobDag.getDirectParents(job)) {
-            jobDag.getParentsToChildren().get(parent).remove(job);
-          }
-          jobDag.getChildrenToParents().remove(job);
-          jobDag.getParentsToChildren().remove(job);
-          jobDag.getAllNodes().remove(job);
-          try {
-            currentData
-                
.setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), 
jobDag.toJson());
-          } catch (Exception e) {
-            LOG.error("Could not update DAG for job: " + job, e);
-          }
-        } else {
-          LOG.error("Could not update DAG for job: " + job + " ZNRecord is 
null.");
-        }
-        return currentData;
-      }
-    };
-    accessor.getBaseDataAccessor().update(workflowKey.getPath(), dagRemover,
-        AccessOption.PERSISTENT);
-
-    // Delete job configs.
-    PropertyKey cfgKey = TaskUtil.getWorkflowConfigKey(accessor, job);
-    if (accessor.getProperty(cfgKey) != null) {
-      if (!accessor.removeProperty(cfgKey)) {
-        LOG.error(String.format(
-            "Error occurred while trying to clean up job %s. Failed to remove 
node %s from Helix.",
-            job, cfgKey));
-      }
-    }
-
-    // Delete job context
-    // For recurring workflow, it's OK if the node doesn't exist.
-    if (!TaskUtil.removeJobContext(_manager, job)) {
-      LOG.warn(String.format("Error occurred while trying to clean up job 
%s.", job));
-    }
-
-    LOG.info(String.format("Successfully cleaned up job context %s.", job));
-
-    _scheduledRebalancer.removeScheduledRebalance(job);
-  }
-
-  private void cleanExpiredJobs(WorkflowConfig workflowConfig, WorkflowContext 
workflowContext) {
-    if (workflowContext == null) {
+  // TODO: run this in a separate thread.
+  // Get all jobConfigs & jobContext from ClusterCache.
+  protected void purgeExpiredJobs(String workflow, WorkflowConfig 
workflowConfig,
+      WorkflowContext workflowContext) {
+    if (workflowContext.getLastJobPurgeTime() + JOB_PURGE_INTERVAL > 
System.currentTimeMillis()) {
       return;
     }
 
-    Map<String, TaskState> jobStates = workflowContext.getJobStates();
-    long newTimeToClean = Long.MAX_VALUE;
-    for (String job : workflowConfig.getJobDag().getAllNodes()) {
-      JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job);
-      JobContext jobContext = TaskUtil.getJobContext(_manager, job);
-      // There is no ABORTED state for JobQueue Job. The job will die with 
workflow
-      if (jobContext != null && jobStates.containsKey(job) && (
-          jobStates.get(job) == TaskState.COMPLETED || jobStates.get(job) == 
TaskState.FAILED)) {
-        if (System.currentTimeMillis() >= jobConfig.getExpiry() + 
jobContext.getFinishTime()) {
-          cleanupJob(job, workflowConfig.getWorkflowId());
-        } else {
-          newTimeToClean =
-              Math.min(newTimeToClean, jobConfig.getExpiry() + 
jobContext.getFinishTime());
-        }
-      }
+    Set<String> expiredJobs = TaskUtil
+        .getExpiredJobs(_manager.getHelixDataAccessor(), 
_manager.getHelixPropertyStore(),
+            workflowConfig, workflowContext);
+    for (String job : expiredJobs) {
+      _scheduledRebalancer.removeScheduledRebalance(job);
+    }
+    if (!TaskUtil
+        .removeJobsFromWorkflow(_manager.getHelixDataAccessor(), 
_manager.getHelixPropertyStore(),
+            workflow, expiredJobs, true)) {
+      LOG.warn("Failed to clean up expired and completed jobs from workflow " 
+ workflow);
     }
 
-    if (newTimeToClean < Long.MAX_VALUE && newTimeToClean < 
_scheduledRebalancer
-        .getRebalanceTime(workflowConfig.getWorkflowId())) {
-      _scheduledRebalancer
-          .scheduleRebalance(_manager, workflowConfig.getWorkflowId(), 
newTimeToClean);
+    long currentTime = System.currentTimeMillis();
+    long nextPurgeTime = currentTime + JOB_PURGE_INTERVAL;
+    workflowContext.setLastJobPurgeTime(currentTime);
+    long currentScheduledTime = 
_scheduledRebalancer.getRebalanceTime(workflow);
+    if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
+      _scheduledRebalancer.scheduleRebalance(_manager, workflow, 
nextPurgeTime);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java 
b/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java
index 7688017..ef8f971 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java
@@ -116,7 +116,7 @@ public class TaskAdmin {
         driver.flushQueue(workflow);
         break;
       case clean:
-        driver.cleanupJobQueue(workflow);
+        driver.cleanupQueue(workflow);
         break;
       default:
         throw new IllegalArgumentException("Unknown command " + args[0]);

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java 
b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 641f13a..6036732 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -212,9 +212,10 @@ public class TaskTestUtil {
   }
 
   public static JobQueue.Builder buildJobQueue(String jobQueueName, int 
delayStart,
-      int failureThreshold) {
+      int failureThreshold, int capacity) {
     WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder();
     workflowCfgBuilder.setExpiry(120000);
+    workflowCfgBuilder.setCapacity(capacity);
 
     Calendar cal = Calendar.getInstance();
     cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
@@ -228,8 +229,17 @@ public class TaskTestUtil {
     return new 
JobQueue.Builder(jobQueueName).setWorkflowConfig(workflowCfgBuilder.build());
   }
 
+  public static JobQueue.Builder buildJobQueue(String jobQueueName, int 
delayStart,
+      int failureThreshold) {
+    return buildJobQueue(jobQueueName, delayStart, failureThreshold, 500);
+  }
+
   public static JobQueue.Builder buildJobQueue(String jobQueueName) {
-    return buildJobQueue(jobQueueName, 0, 0);
+    return buildJobQueue(jobQueueName, 0, 0, 500);
+  }
+
+  public static JobQueue.Builder buildJobQueue(String jobQueueName, int 
capacity) {
+    return buildJobQueue(jobQueueName, 0, 0, capacity);
   }
 
   public static WorkflowContext buildWorkflowContext(String workflowResource,

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
index 71fed49..a0a1617 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
@@ -52,7 +52,7 @@ public class TestJobQueueCleanUp extends TaskTestBase {
     _driver.start(builder.build());
     _driver.pollForJobState(queueName, 
TaskUtil.getNamespacedJobName(queueName, "JOB" + 4),
         TaskState.FAILED);
-    _driver.cleanupJobQueue(queueName);
+    _driver.cleanupQueue(queueName);
     
Assert.assertEquals(_driver.getWorkflowConfig(queueName).getJobDag().size(), 0);
   }
 
@@ -71,7 +71,7 @@ public class TestJobQueueCleanUp extends TaskTestBase {
     _driver.start(builder.build());
     _driver.pollForJobState(queueName, 
TaskUtil.getNamespacedJobName(queueName, "JOB" + 3),
         TaskState.IN_PROGRESS);
-    _driver.cleanupJobQueue(queueName);
+    _driver.cleanupQueue(queueName);
     
Assert.assertEquals(_driver.getWorkflowConfig(queueName).getJobDag().size(), 2);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java 
b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
index 16df022..4d0c3c6 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
@@ -19,6 +19,9 @@ package org.apache.helix.task;
  * under the License.
  */
 
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.helix.HelixException;
 import org.apache.helix.TestHelper;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.integration.task.MockTask;
@@ -39,34 +42,57 @@ public class TestCleanExpiredJobs extends 
TaskSynchronizedTestBase {
 
   @Test
   public void testCleanExpiredJobs() throws Exception {
-    String workflowName = TestHelper.getTestMethodName();
-    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(workflowName);
+    String queue = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queue);
     JobConfig.Builder jobBuilder =
         new 
JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
             .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
             
.setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG).setExpiry(1L);
 
     long startTime = System.currentTimeMillis();
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < 8; i++) {
       builder.enqueueJob("JOB" + i, jobBuilder);
-      TaskUtil.setJobContext(_manager, 
TaskUtil.getNamespacedJobName(workflowName, "JOB" + i),
+    }
+
+    for (int i = 0; i < 8; i++) {
+      TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(queue, 
"JOB" + i),
           TaskTestUtil.buildJobContext(startTime, startTime, 
TaskPartitionState.COMPLETED));
     }
 
+    for (int i = 4; i < 6; i++) {
+      TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(queue, 
"JOB" + i),
+          TaskTestUtil
+              .buildJobContext(startTime, startTime + 100000, 
TaskPartitionState.COMPLETED));
+    }
+
     WorkflowContext workflowContext = TaskTestUtil
-        .buildWorkflowContext(workflowName, TaskState.IN_PROGRESS, null, 
TaskState.COMPLETED,
-            TaskState.FAILED, TaskState.ABORTED, TaskState.IN_PROGRESS, 
TaskState.NOT_STARTED);
+        .buildWorkflowContext(queue, TaskState.IN_PROGRESS, null, 
TaskState.COMPLETED,
+            TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED, 
TaskState.COMPLETED,
+            TaskState.COMPLETED, TaskState.IN_PROGRESS, TaskState.NOT_STARTED);
+
+    Set<String> jobsLeft = new HashSet<String>();
+    jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 1));
+    jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 2));
+    jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 4));
+    jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 5));
+    jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 6));
+    jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 7));
+
     _driver.start(builder.build());
     _cache = 
TaskTestUtil.buildClusterDataCache(_manager.getHelixDataAccessor());
-    TaskUtil.setWorkflowContext(_manager, workflowName, workflowContext);
+    TaskUtil.setWorkflowContext(_manager, queue, workflowContext);
     TaskTestUtil.calculateBestPossibleState(_cache, _manager);
-    WorkflowConfig workflowConfig = _driver.getWorkflowConfig(workflowName);
-    Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), 3);
+    WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queue);
+    Assert.assertEquals(workflowConfig.getJobDag().getAllNodes(), jobsLeft);
+    workflowContext = _driver.getWorkflowContext(queue);
+    Assert.assertTrue(workflowContext.getLastJobPurgeTime() > startTime
+        && workflowContext.getLastJobPurgeTime() < System.currentTimeMillis());
   }
 
-  @Test void testNotCleanJobsDueToParentFail() throws Exception {
-    String workflowName = TestHelper.getTestMethodName();
-    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(workflowName);
+  @Test
+  void testNotCleanJobsDueToParentFail() throws Exception {
+    String queue = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queue);
     JobConfig.Builder jobBuilder =
         new 
JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
             .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
@@ -76,17 +102,57 @@ public class TestCleanExpiredJobs extends 
TaskSynchronizedTestBase {
     builder.enqueueJob("JOB0", jobBuilder);
     builder.enqueueJob("JOB1", jobBuilder);
     builder.addParentChildDependency("JOB0", "JOB1");
-    TaskUtil.setJobContext(_manager, 
TaskUtil.getNamespacedJobName(workflowName, "JOB0"),
+    TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(queue, 
"JOB0"),
         TaskTestUtil.buildJobContext(startTime, startTime, 
TaskPartitionState.COMPLETED));
 
     WorkflowContext workflowContext = TaskTestUtil
-        .buildWorkflowContext(workflowName, TaskState.IN_PROGRESS, null, 
TaskState.FAILED,
+        .buildWorkflowContext(queue, TaskState.IN_PROGRESS, null, 
TaskState.FAILED,
             TaskState.FAILED);
     _driver.start(builder.build());
     _cache = 
TaskTestUtil.buildClusterDataCache(_manager.getHelixDataAccessor());
-    TaskUtil.setWorkflowContext(_manager, workflowName, workflowContext);
+    TaskUtil.setWorkflowContext(_manager, queue, workflowContext);
     TaskTestUtil.calculateBestPossibleState(_cache, _manager);
-    WorkflowConfig workflowConfig = _driver.getWorkflowConfig(workflowName);
-    Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), 1);
+    WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queue);
+    Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), 2);
+  }
+
+  @Test
+  void testNotCleanJobsThroughEnqueueJob() throws Exception {
+    int capacity = 5;
+    String queue = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queue, capacity);
+    JobConfig.Builder jobBuilder =
+        new 
JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+            
.setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG).setExpiry(1L);
+
+    long startTime = System.currentTimeMillis();
+    for (int i = 0; i < capacity; i++) {
+      builder.enqueueJob("JOB" + i, jobBuilder);
+    }
+
+    _driver.start(builder.build());
+    try {
+      // should fail here since the queue is full.
+      _driver.enqueueJob(queue, "JOB" + capacity, jobBuilder);
+      Assert.fail("Queue is not full.");
+    } catch (HelixException e) {
+      Assert.assertTrue(e.getMessage().contains("queue is full"));
+    }
+
+    for (int i = 0; i < capacity; i++) {
+      TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(queue, 
"JOB" + i),
+          TaskTestUtil.buildJobContext(startTime, startTime, 
TaskPartitionState.COMPLETED));
+    }
+
+    WorkflowContext workflowContext = TaskTestUtil
+        .buildWorkflowContext(queue, TaskState.IN_PROGRESS, null, 
TaskState.COMPLETED,
+            TaskState.COMPLETED, TaskState.FAILED, TaskState.IN_PROGRESS);
+    TaskUtil.setWorkflowContext(_manager, queue, workflowContext);
+
+    _driver.enqueueJob(queue, "JOB" + capacity, jobBuilder);
+
+    WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queue);
+    Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), 
capacity - 1);
   }
 }

Reply via email to