This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b8be8f63a [GOBBLIN-1784] Only clean dags from the dag manager if a 
flow event is received (#3641)
b8be8f63a is described below

commit b8be8f63a7b8d6b5d1feaf9e47f022cded2ac96b
Author: William Lo <[email protected]>
AuthorDate: Thu Feb 16 12:19:19 2023 -0800

    [GOBBLIN-1784] Only clean dags from the dag manager if a flow event is 
received (#3641)
    
    * Only clean dags from the dag manager if a flow event is received
    
    * Cleanup
    
    * Fix DagManagerFlowTest with cleanup changes
    
    * Address comments with more accurate logs, fix a bug where retries should 
be resetting the event to be emitted
---
 .../gobblin/service/modules/flowgraph/Dag.java     |   4 +-
 .../service/modules/orchestration/DagManager.java  | 125 ++++----
 .../modules/orchestration/DagManagerMetrics.java   |  14 +-
 .../modules/orchestration/DagManagerUtils.java     |   6 +-
 .../modules/orchestration/DagManagerFlowTest.java  |  23 +-
 .../modules/orchestration/DagManagerTest.java      | 339 ++++++++++++++-------
 6 files changed, 326 insertions(+), 185 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
index 40a56bc25..edb19d8d1 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -35,7 +35,6 @@ import lombok.Setter;
 
 import org.apache.gobblin.annotation.Alpha;
 
-
 /**
  * An implementation of Dag. Assumes that nodes have unique values. Nodes with 
duplicate values will produce
  * unpredictable behavior.
@@ -53,6 +52,9 @@ public class Dag<T> {
   private String message;
   @Setter
   private String flowEvent;
+  // Keep track of when the final flow status is emitted, in milliseconds to 
avoid many duplicate events
+  @Setter @Getter
+  private long eventEmittedTimeMillis = -1;
 
   public Dag(List<DagNode<T>> dagNodes) {
     this.nodes = dagNodes;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 6f26dc3e5..6251ac1d2 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -17,10 +17,6 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
-import com.codahale.metrics.Meter;
-import com.google.common.base.Joiner;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -43,15 +39,19 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
+import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.Subscribe;
 import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigException;
+import com.typesafe.config.ConfigFactory;
 
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
@@ -77,6 +77,7 @@ import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.service.monitoring.JobStatus;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.service.monitoring.KillFlowEvent;
@@ -129,6 +130,8 @@ public class DagManager extends AbstractIdleService {
   private static final String DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT = "DAYS";
   private static final String FAILED_DAG_RETENTION_TIME = 
FAILED_DAG_STATESTORE_PREFIX + ".retention.time";
   private static final long DEFAULT_FAILED_DAG_RETENTION_TIME = 7L;
+  // Re-emit the final flow status if not detected within 5 minutes
+  private static final long DAG_FLOW_STATUS_TOLERANCE_TIME_MILLIS = 
TimeUnit.MINUTES.toMillis(5);
   public static final String FAILED_DAG_POLLING_INTERVAL = 
FAILED_DAG_STATESTORE_PREFIX + ".retention.pollingIntervalMinutes";
   public static final Integer DEFAULT_FAILED_DAG_POLLING_INTERVAL = 60;
   public static final String DAG_MANAGER_HEARTBEAT = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagManager.heartbeat-%s";
@@ -477,9 +480,8 @@ public class DagManager extends AbstractIdleService {
     // dagToJobs holds a map of dagId to running jobs of that dag
     final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs = new 
HashMap<>();
     final Map<String, Long> dagToSLA = new HashMap<>();
-    private final Set<String> failedDagIdsFinishRunning = new HashSet<>();
-    private final Set<String> failedDagIdsFinishAllPossible = new HashSet<>();
     private final MetricContext metricContext;
+    private final Set<String> dagIdstoClean = new HashSet<>();
     private final Optional<EventSubmitter> eventSubmitter;
     private final Optional<Timer> jobStatusPolledTimer;
     private final AtomicLong orchestrationDelay = new AtomicLong(0);
@@ -760,7 +762,6 @@ public class DagManager extends AbstractIdleService {
      * Proceed the execution of each dag node based on job status.
      */
     private void pollAndAdvanceDag() throws IOException, ExecutionException, 
InterruptedException {
-      this.failedDagIdsFinishRunning.clear();
       Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted = 
Maps.newHashMap();
       List<DagNode<JobExecutionPlan>> nodesToCleanUp = Lists.newArrayList();
 
@@ -806,6 +807,7 @@ public class DagManager extends AbstractIdleService {
           if (jobStatus != null && jobStatus.isShouldRetry()) {
             log.info("Retrying job: {}, current attempts: {}, max attempts: 
{}", DagManagerUtils.getFullyQualifiedJobName(node),
                 jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
+            this.jobToDag.get(node).setFlowEvent(null);
             submitJob(node);
           }
         } catch (Exception e) {
@@ -1048,9 +1050,6 @@ public class DagManager extends AbstractIdleService {
     /**
      * Method that defines the actions to be performed when a job finishes 
either successfully or with failure.
      * This method updates the state of the dag and performs clean up actions 
as necessary.
-     * TODO : Dag should have a status field, like JobExecutionPlan has. This 
method should update that field,
-     *        which should be used by cleanup(). It may also remove the need 
of failedDagIdsFinishRunning,
-     *        failedDagIdsFinishAllPossible.
      */
     private Map<String, Set<DagNode<JobExecutionPlan>>> 
onJobFinish(DagNode<JobExecutionPlan> dagNode)
         throws IOException {
@@ -1067,19 +1066,11 @@ public class DagManager extends AbstractIdleService {
       switch (jobStatus) {
         case FAILED:
           dag.setMessage("Flow failed because job " + jobName + " failed");
-          if (DagManagerUtils.getFailureOption(dag) == 
FailureOption.FINISH_RUNNING) {
-            this.failedDagIdsFinishRunning.add(dagId);
-          } else {
-            this.failedDagIdsFinishAllPossible.add(dagId);
-          }
+          dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_FAILED);
           dagManagerMetrics.incrementExecutorFailed(dagNode);
           return Maps.newHashMap();
         case CANCELLED:
-          if (DagManagerUtils.getFailureOption(dag) == 
FailureOption.FINISH_RUNNING) {
-            this.failedDagIdsFinishRunning.add(dagId);
-          } else {
-            this.failedDagIdsFinishAllPossible.add(dagId);
-          }
+          dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
           return Maps.newHashMap();
         case COMPLETE:
           dagManagerMetrics.incrementExecutorSuccess(dagNode);
@@ -1117,46 +1108,73 @@ public class DagManager extends AbstractIdleService {
      * Perform clean up. Remove a dag from the dagstore if the dag is complete 
and update internal state.
      */
     private void cleanUp() {
-      List<String> dagIdstoClean = new ArrayList<>();
-      //Clean up failed dags
-      for (String dagId : this.failedDagIdsFinishRunning) {
-        //Skip monitoring of any other jobs of the failed dag.
-        LinkedList<DagNode<JobExecutionPlan>> dagNodeList = 
this.dagToJobs.get(dagId);
-        while (!dagNodeList.isEmpty()) {
-          DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
-          deleteJobState(dagId, dagNode);
-        }
-        Dag<JobExecutionPlan> dag = this.dags.get(dagId);
-        String status = TimingEvent.FlowTimings.FLOW_FAILED;
-        addFailedDag(dagId, dag);
-        log.info("Dag {} has finished with status {}; Cleaning up dag from the 
state store.", dagId, status);
-        // send an event before cleaning up dag
-        DagManagerUtils.emitFlowEvent(this.eventSubmitter, 
this.dags.get(dagId), status);
-        dagIdstoClean.add(dagId);
-      }
+      // Approximate the time when the flow events are emitted to account for 
delay when the flow event is received by the job monitor
+      long cleanUpProcessingTime = System.currentTimeMillis();
 
       // Remove dags that are finished and emit their appropriate metrics
       for (Map.Entry<String, Dag<JobExecutionPlan>> dagIdKeyPair : 
this.dags.entrySet()) {
         String dagId = dagIdKeyPair.getKey();
+        // On service restart, we repopulate the dags that are waiting to be 
cleaned up
+        if (dagIdstoClean.contains(dagId)) {
+          continue;
+        }
         Dag<JobExecutionPlan> dag = dagIdKeyPair.getValue();
-        if (!hasRunningJobs(dagId) && 
!this.failedDagIdsFinishRunning.contains(dagId)) {
-          String status = TimingEvent.FlowTimings.FLOW_SUCCEEDED;
-          if (this.failedDagIdsFinishAllPossible.contains(dagId)) {
-            status = TimingEvent.FlowTimings.FLOW_FAILED;
-            addFailedDag(dagId, dag);
-            this.failedDagIdsFinishAllPossible.remove(dagId);
+        if ((TimingEvent.FlowTimings.FLOW_FAILED.equals(dag.getFlowEvent()) || 
TimingEvent.FlowTimings.FLOW_CANCELLED.equals(dag.getFlowEvent())) &&
+            DagManagerUtils.getFailureOption(dag) == 
FailureOption.FINISH_RUNNING) {
+          //Skip monitoring of any other jobs of the failed dag.
+          LinkedList<DagNode<JobExecutionPlan>> dagNodeList = 
this.dagToJobs.get(dagId);
+          while (!dagNodeList.isEmpty()) {
+            DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
+            deleteJobState(dagId, dagNode);
+          }
+        }
+        if (!hasRunningJobs(dagId)) {
+          // Collect all the dagIds that are finished
+          this.dagIdstoClean.add(dagId);
+          if (dag.getFlowEvent() == null) {
+            // If the dag flow event is not set, then it is successful
+            dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_SUCCEEDED);
           } else {
-            
dagManagerMetrics.emitFlowSuccessMetrics(DagManagerUtils.getFlowId(this.dags.get(dagId)));
+            addFailedDag(dagId, dag);
           }
-          log.info("Dag {} has finished with status {}; Cleaning up dag from 
the state store.", dagId, status);
           // send an event before cleaning up dag
-          DagManagerUtils.emitFlowEvent(this.eventSubmitter, 
this.dags.get(dagId), status);
-          dagIdstoClean.add(dagId);
+          DagManagerUtils.emitFlowEvent(this.eventSubmitter, 
this.dags.get(dagId), dag.getFlowEvent());
+          dag.setEventEmittedTimeMillis(cleanUpProcessingTime);
         }
       }
 
-      for (String dagId: dagIdstoClean) {
-        cleanUpDag(dagId);
+      // Only clean up dags after the job status monitor processed the flow 
event
+      for (Iterator<String> dagIdIterator = this.dagIdstoClean.iterator(); 
dagIdIterator.hasNext();) {
+        String dagId = dagIdIterator.next();
+        Dag<JobExecutionPlan> dag = this.dags.get(dagId);
+        JobStatus flowStatus = pollFlowStatus(dag);
+        if (flowStatus != null && 
FlowStatusGenerator.FINISHED_STATUSES.contains(flowStatus.getEventName())) {
+          FlowId flowId = DagManagerUtils.getFlowId(dag);
+          switch(dag.getFlowEvent()) {
+            case TimingEvent.FlowTimings.FLOW_SUCCEEDED:
+              this.dagManagerMetrics.emitFlowSuccessMetrics(flowId);
+              this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId, 
FlowState.SUCCESSFUL);
+              break;
+            case TimingEvent.FlowTimings.FLOW_FAILED:
+              this.dagManagerMetrics.emitFlowFailedMetrics(flowId);
+              this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId, 
FlowState.FAILED);
+              break;
+            case TimingEvent.FlowTimings.FLOW_CANCELLED:
+              this.dagManagerMetrics.emitFlowSlaExceededMetrics(flowId);
+              this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId, 
FlowState.FAILED);
+              break;
+            default:
+              log.warn("Unexpected flow event {} for dag {}", 
dag.getFlowEvent(), dagId);
+          }
+          log.info("Dag {} has finished with status {}; Cleaning up dag from 
the state store.", dagId, dag.getFlowEvent());
+          cleanUpDag(dagId);
+          dagIdIterator.remove();
+        } else if (cleanUpProcessingTime > dag.getEventEmittedTimeMillis() + 
DAG_FLOW_STATUS_TOLERANCE_TIME_MILLIS) {
+          // Re-emit the flow event if the flow status has not been processed 
within the DagFlowStatusTolerance time
+          DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, 
dag.getFlowEvent());
+        } else {
+          log.info("Waiting for flow event {} to be emitted before cleaning up 
dag {}", dag.getFlowEvent(), dagId);
+        }
       }
     }
 
@@ -1164,7 +1182,6 @@ public class DagManager extends AbstractIdleService {
      * Add a dag to failed dag state store
      */
     private synchronized void addFailedDag(String dagId, Dag<JobExecutionPlan> 
dag) {
-      FlowId flowId = DagManagerUtils.getFlowId(dag);
       try {
         log.info("Adding dag " + dagId + " to failed dag state store");
         this.failedDagStateStore.writeCheckpoint(this.dags.get(dagId));
@@ -1172,12 +1189,6 @@ public class DagManager extends AbstractIdleService {
         log.error("Failed to add dag " + dagId + " to failed dag state store", 
e);
       }
       this.failedDagIds.add(dagId);
-      if 
(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED.equals(dag.getFlowEvent())) 
{
-        this.dagManagerMetrics.emitFlowSlaExceededMetrics(flowId);
-      } else if 
(!TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED.equals(dag.getFlowEvent()))
 {
-        dagManagerMetrics.emitFlowFailedMetrics(flowId);
-      }
-      this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId, 
DagManager.FlowState.FAILED);
     }
 
     /**
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
index a81bf39b1..e9e605b06 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
@@ -17,16 +17,20 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
-import com.codahale.metrics.MetricRegistry;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import lombok.extern.slf4j.Slf4j;
+
 import org.apache.commons.lang3.StringUtils;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.ContextAwareCounter;
 import org.apache.gobblin.metrics.ContextAwareGauge;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 3dd0fb592..152dca00d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -16,8 +16,6 @@
  */
 package org.apache.gobblin.service.modules.orchestration;
 
-import com.google.common.collect.ImmutableMap;
-import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
@@ -28,12 +26,14 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
-import java.util.stream.Collectors;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 9366b7a8a..ecb84d839 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
-import com.google.common.base.Optional;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collections;
@@ -26,26 +25,27 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.gobblin.config.ConfigBuilder;
-import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
-import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
 
 import javax.annotation.Nullable;
 
+import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -168,6 +168,17 @@ public class DagManagerFlowTest {
         
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow2",
 "group2",
         flowExecutionId3, "job0", "group2");
 
+    Mockito.doReturn(DagManagerTest.getMockFlowStatus("flow0", "group0", 
flowExecutionId1, String.valueOf(ExecutionStatus.CANCELLED)))
+        
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow0",
 "group0",
+            flowExecutionId1, JobStatusRetriever.NA_KEY, 
JobStatusRetriever.NA_KEY);
+
+    Mockito.doReturn(DagManagerTest.getMockFlowStatus("flow1", "group1", 
flowExecutionId2, String.valueOf(ExecutionStatus.CANCELLED)))
+        
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow1",
 "group1",
+            flowExecutionId2, JobStatusRetriever.NA_KEY, 
JobStatusRetriever.NA_KEY);
+
+    Mockito.doReturn(DagManagerTest.getMockFlowStatus("flow2", "group2", 
flowExecutionId3, String.valueOf(ExecutionStatus.CANCELLED)))
+        
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow2",
 "group2",
+            flowExecutionId3, JobStatusRetriever.NA_KEY, 
JobStatusRetriever.NA_KEY);
     // check removal of dag in dagToJobs map
     AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
         assertTrue(input -> 
!dagManager.dagManagerThreads[queue1].dagToJobs.containsKey(dagId1), 
ERROR_MESSAGE);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 655c4f895..0a572cf26 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -16,9 +16,6 @@
  */
 package org.apache.gobblin.service.modules.orchestration;
 
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.MetricRegistry;
-import com.google.common.base.Optional;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -38,13 +35,15 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
@@ -55,6 +54,7 @@ import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
@@ -124,7 +124,6 @@ public class DagManagerTest {
     failedDagIdsField.setAccessible(true);
     this.failedDagIds = (Set<String>) 
failedDagIdsField.get(this._dagManagerThread);
   }
-
   /**
    * Create a list of dags with only one node each
    * @return a Dag.
@@ -185,6 +184,10 @@ public class DagManagerTest {
     return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
   }
 
+  static Iterator<JobStatus> getMockFlowStatus(String flowName, String 
flowGroup, Long flowExecutionId, String eventName) {
+    return getMockJobStatus(flowName, flowGroup, flowExecutionId, 
JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY, eventName);
+  }
+
   static Iterator<JobStatus> getMockJobStatus(String flowName, String 
flowGroup, Long flowExecutionId, String jobGroup, String jobName, String 
eventName) {
     return getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, 
jobName, eventName, false, flowExecutionId + 10);
   }
@@ -219,6 +222,7 @@ public class DagManagerTest {
     Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName1, flowGroup, 
String.valueOf(ExecutionStatus.RUNNING));
     Iterator<JobStatus> jobStatusIterator6 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName2, flowGroup, 
String.valueOf(ExecutionStatus.COMPLETE));
     Iterator<JobStatus> jobStatusIterator7 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName1, flowGroup, 
String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIterator8 = getMockFlowStatus(flowName, 
flowGroup, flowExecutionId, String.valueOf(ExecutionStatus.COMPLETE));
 
     
Mockito.when(_jobStatusRetriever.getJobStatusesForFlowExecution(Mockito.anyString(),
 Mockito.anyString(),
         Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
@@ -228,7 +232,8 @@ public class DagManagerTest {
         thenReturn(jobStatusIterator4).
         thenReturn(jobStatusIterator5).
         thenReturn(jobStatusIterator6).
-        thenReturn(jobStatusIterator7);
+        thenReturn(jobStatusIterator7).
+        thenReturn(jobStatusIterator8);
 
     //Run the thread once. Ensure the first job is running
     this._dagManagerThread.run();
@@ -311,6 +316,7 @@ public class DagManagerTest {
           getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1, 
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
       Iterator<JobStatus> jobStatusIterator9 =
           getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+      Iterator<JobStatus> jobStatusIterator10 = getMockFlowStatus(flowName, 
flowGroup, flowExecutionId, String.valueOf(ExecutionStatus.FAILED));
 
 
       Mockito.when(_jobStatusRetriever
@@ -323,7 +329,8 @@ public class DagManagerTest {
           thenReturn(jobStatusIterator6).
           thenReturn(jobStatusIterator7).
           thenReturn(jobStatusIterator8).
-          thenReturn(jobStatusIterator9);
+          thenReturn(jobStatusIterator9).
+          thenReturn(jobStatusIterator10);
 
       //Run the thread once. Ensure the first job is running
       this._dagManagerThread.run();
@@ -410,17 +417,21 @@ public class DagManagerTest {
     Iterator<JobStatus> jobStatusIterator6 =
         getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2, 
flowGroup, String.valueOf(ExecutionStatus.FAILED));
     Iterator<JobStatus> jobStatusIterator7 =
-        getMockJobStatus(flowName, flowGroup, flowExecutionId, "NA_KEY", 
"NA_KEY", String.valueOf(ExecutionStatus.PENDING_RESUME));
+        getMockFlowStatus(flowName, flowGroup, flowExecutionId, 
String.valueOf(ExecutionStatus.FAILED));
     Iterator<JobStatus> jobStatusIterator8 =
-        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+        getMockFlowStatus(flowName, flowGroup, flowExecutionId, 
String.valueOf(ExecutionStatus.PENDING_RESUME));
     Iterator<JobStatus> jobStatusIterator9 =
-        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
     Iterator<JobStatus> jobStatusIterator10 =
-        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2, 
flowGroup, String.valueOf(ExecutionStatus.PENDING_RESUME));
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
     Iterator<JobStatus> jobStatusIterator11 =
-        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2, 
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2, 
flowGroup, String.valueOf(ExecutionStatus.PENDING_RESUME));
     Iterator<JobStatus> jobStatusIterator12 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2, 
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator13 =
         getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIterator14 =
+        getMockFlowStatus(flowName, flowGroup, flowExecutionId, 
String.valueOf(ExecutionStatus.COMPLETE));
 
     Mockito.when(_jobStatusRetriever
         .getJobStatusesForFlowExecution(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), 
Mockito.anyString())).
@@ -435,7 +446,9 @@ public class DagManagerTest {
         thenReturn(jobStatusIterator9).
         thenReturn(jobStatusIterator10).
         thenReturn(jobStatusIterator11).
-        thenReturn(jobStatusIterator12);
+        thenReturn(jobStatusIterator12).
+        thenReturn(jobStatusIterator13).
+        thenReturn(jobStatusIterator14);
 
     // Run thread until job2 fails
     for (int i = 0; i < 4; i++) {
@@ -483,6 +496,7 @@ public class DagManagerTest {
     Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName2, flowGroup, 
String.valueOf(ExecutionStatus.RUNNING));
     Iterator<JobStatus> jobStatusIterator6 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName1, flowGroup, 
String.valueOf(ExecutionStatus.COMPLETE));
     Iterator<JobStatus> jobStatusIterator7 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName2, flowGroup, 
String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIterator8 = getMockFlowStatus(flowName, 
flowGroup, flowExecutionId, String.valueOf(ExecutionStatus.COMPLETE));
 
     
Mockito.when(_jobStatusRetriever.getJobStatusesForFlowExecution(Mockito.anyString(),
 Mockito.anyString(),
         Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
@@ -492,7 +506,8 @@ public class DagManagerTest {
         thenReturn(jobStatusIterator4).
         thenReturn(jobStatusIterator5).
         thenReturn(jobStatusIterator6).
-        thenReturn(jobStatusIterator7);
+        thenReturn(jobStatusIterator7).
+        thenReturn(jobStatusIterator8);
 
     //Run the thread once. Ensure the first job is running
     this._dagManagerThread.run();
@@ -562,7 +577,7 @@ public class DagManagerTest {
     Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName0, flowGroup, 
String.valueOf(ExecutionStatus.PENDING_RETRY), true);
     Iterator<JobStatus> jobStatusIterator6 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName0, flowGroup, 
String.valueOf(ExecutionStatus.RUNNING));
     Iterator<JobStatus> jobStatusIterator7 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName0, flowGroup, 
String.valueOf(ExecutionStatus.FAILED));
-
+    Iterator<JobStatus> jobStatusIterator8 = getMockFlowStatus(flowName, 
flowGroup, flowExecutionId, String.valueOf(ExecutionStatus.FAILED));
 
     
Mockito.when(_jobStatusRetriever.getJobStatusesForFlowExecution(Mockito.anyString(),
 Mockito.anyString(),
         Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
@@ -572,7 +587,8 @@ public class DagManagerTest {
         thenReturn(jobStatusIterator4).
         thenReturn(jobStatusIterator5).
         thenReturn(jobStatusIterator6).
-        thenReturn(jobStatusIterator7);
+        thenReturn(jobStatusIterator7).
+        thenReturn(jobStatusIterator8);
 
     // Run 4 times, first job fails every time and is retried
     for (int i = 0; i < 4; i++) {
@@ -633,17 +649,21 @@ public class DagManagerTest {
     Iterator<JobStatus> jobStatusIterator6 =
         getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2, 
flowGroup, String.valueOf(ExecutionStatus.CANCELLED));
     Iterator<JobStatus> jobStatusIterator7 =
-        getMockJobStatus(flowName, flowGroup, flowExecutionId, "NA_KEY", 
"NA_KEY", String.valueOf(ExecutionStatus.PENDING_RESUME));
-        Iterator<JobStatus> jobStatusIterator8 =
-        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+        getMockFlowStatus(flowName, flowGroup, flowExecutionId, 
String.valueOf(ExecutionStatus.CANCELLED));
+    Iterator<JobStatus> jobStatusIterator8 =
+        getMockFlowStatus(flowName, flowGroup, flowExecutionId, 
String.valueOf(ExecutionStatus.PENDING_RESUME));
     Iterator<JobStatus> jobStatusIterator9 =
-        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
     Iterator<JobStatus> jobStatusIterator10 =
-        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2, 
flowGroup, String.valueOf(ExecutionStatus.PENDING_RESUME));
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
     Iterator<JobStatus> jobStatusIterator11 =
-        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2, 
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2, 
flowGroup, String.valueOf(ExecutionStatus.PENDING_RESUME));
     Iterator<JobStatus> jobStatusIterator12 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2, 
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator13 =
         getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIterator14 = getMockFlowStatus(flowName, 
flowGroup, flowExecutionId, String.valueOf(ExecutionStatus.COMPLETE));
+
 
     Mockito.when(_jobStatusRetriever
         .getJobStatusesForFlowExecution(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), 
Mockito.anyString())).
@@ -658,7 +678,9 @@ public class DagManagerTest {
         thenReturn(jobStatusIterator9).
         thenReturn(jobStatusIterator10).
         thenReturn(jobStatusIterator11).
-        thenReturn(jobStatusIterator12);
+        thenReturn(jobStatusIterator12).
+        thenReturn(jobStatusIterator13).
+        thenReturn(jobStatusIterator14);
 
     // Run until job2 cancelled
     for (int i = 0; i < 3; i++) {
@@ -706,27 +728,43 @@ public class DagManagerTest {
     //Add a dag to the queue of dags
     this.queue.offer(dag);
     // The start time should be 16 minutes ago, which is past the start SLA so 
the job should be cancelled
-    Iterator<JobStatus> jobStatusIterator1 =
+    Iterator<JobStatus> jobStatusIteratorFlow0_0 =
         getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0, 
flowGroup, String.valueOf(ExecutionStatus.ORCHESTRATED),
             false, flowExecutionId - Duration.ofMinutes(16).toMillis());
     // This is for the second Dag that does not match the SLA so should 
schedule normally
-    Iterator<JobStatus> jobStatusIterator2 =
+    Iterator<JobStatus> jobStatusIteratorFlow1_0 =
         getMockJobStatus(flowName1, flowGroup1, flowExecutionId+1, jobName0, 
flowGroup1, String.valueOf(ExecutionStatus.ORCHESTRATED),
             false, flowExecutionId - Duration.ofMinutes(10).toMillis());
     // Let the first job get reported as cancel due to SLA kill on start and 
clean up
-    Iterator<JobStatus> jobStatusIterator3 =
+    Iterator<JobStatus> jobStatusIteratorFlow0_1 =
         getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0, 
flowGroup, String.valueOf(ExecutionStatus.CANCELLED),
             false, flowExecutionId - Duration.ofMinutes(16).toMillis());
+    Iterator<JobStatus> jobStatusIteratorFlow0_2 =
+        getMockFlowStatus(flowName, flowGroup, flowExecutionId, 
String.valueOf(ExecutionStatus.CANCELLED));
     // Cleanup the running job that is scheduled normally
-    Iterator<JobStatus> jobStatusIterator4 =
+    Iterator<JobStatus> jobStatusIteratorFlow1_1 =
         getMockJobStatus(flowName1, flowGroup1, flowExecutionId+1, jobName0, 
flowGroup1, String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIteratorFlow1_2 =
+        getMockJobStatus(flowName1, flowGroup1, flowExecutionId+1, "job1", 
flowGroup1, String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIteratorFlow1_3 =
+        getMockJobStatus(flowName1, flowGroup1, flowExecutionId+1, "job2", 
flowGroup1, String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIteratorFlow1_4 =
+        getMockJobStatus(flowName1, flowGroup1, flowExecutionId+1, "job2", 
flowGroup1, String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIteratorFlow1_5 =
+        getMockFlowStatus(flowName1, flowGroup1, flowExecutionId+1, 
String.valueOf(ExecutionStatus.COMPLETE));
 
     Mockito.when(_jobStatusRetriever
-        .getJobStatusesForFlowExecution(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), 
Mockito.anyString())).
-        thenReturn(jobStatusIterator1).
-        thenReturn(jobStatusIterator2).
-        thenReturn(jobStatusIterator3).
-        thenReturn(jobStatusIterator4);
+        .getJobStatusesForFlowExecution(Mockito.eq("flow0"), 
Mockito.eq("group0"), Mockito.anyLong(), Mockito.anyString(), 
Mockito.anyString())).
+        thenReturn(jobStatusIteratorFlow0_0).
+        thenReturn(jobStatusIteratorFlow0_1).
+        thenReturn(jobStatusIteratorFlow0_2);
+
+    Mockito.when(_jobStatusRetriever
+        .getJobStatusesForFlowExecution(Mockito.eq("flow1"), 
Mockito.eq("group1"), Mockito.anyLong(), Mockito.anyString(), 
Mockito.anyString())).
+        thenReturn(jobStatusIteratorFlow1_0).
+        thenReturn(jobStatusIteratorFlow1_1).
+        thenReturn(jobStatusIteratorFlow1_2).
+        
thenReturn(jobStatusIteratorFlow1_3).thenReturn(jobStatusIteratorFlow1_4).thenReturn(jobStatusIteratorFlow1_5);
 
     // Run the thread once. Ensure the first job is running
     this._dagManagerThread.run();
@@ -748,6 +786,7 @@ public class DagManagerTest {
     // Cleanup
     this._dagManagerThread.run();
     this._dagManagerThread.run();
+    this._dagManagerThread.run();
     Assert.assertEquals(this.dags.size(), 0);
     Assert.assertEquals(this.jobToDag.size(), 0);
     Assert.assertEquals(this.dagToJobs.size(), 0);
@@ -782,12 +821,20 @@ public class DagManagerTest {
     Iterator<JobStatus> jobStatusIterator3 =
         getMockJobStatus("flow2", "flow2", flowExecutionId+1, "job0", 
"group2", String.valueOf(ExecutionStatus.ORCHESTRATED),
             false, startOrchestrationTime);
-
+    Iterator<JobStatus> jobStatusIterator4 =
+        getMockFlowStatus("flow2", "flow2", flowExecutionId+1, 
String.valueOf(ExecutionStatus.CANCELLED));
+    Iterator<JobStatus> jobStatusIterator5 =
+        getMockFlowStatus("flow2", "flow2", flowExecutionId+1, 
String.valueOf(ExecutionStatus.CANCELLED));
+    Iterator<JobStatus> jobStatusIterator6 =
+        getMockFlowStatus("flow2", "flow2", flowExecutionId+1, 
String.valueOf(ExecutionStatus.CANCELLED));
     Mockito.when(_jobStatusRetriever
         .getJobStatusesForFlowExecution(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), 
Mockito.anyString())).
         thenReturn(jobStatusIterator1).
         thenReturn(jobStatusIterator2).
-        thenReturn(jobStatusIterator3);
+        thenReturn(jobStatusIterator3).
+        thenReturn(jobStatusIterator4).
+        thenReturn(jobStatusIterator5).
+        thenReturn(jobStatusIterator6);
 
     // Run the thread once. All 3 jobs should be emitted an SLA exceeded event
     this._dagManagerThread.run();
@@ -836,10 +883,13 @@ public class DagManagerTest {
     // Job should have been run normally without breaking on SLA check, so we 
can just mark as completed for status
     Iterator<JobStatus> jobStatusIterator1 =
         getMockJobStatus(flowName, flowGroup, flowExecutionId+1, jobName, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIterator2 =
+        getMockFlowStatus(flowName, flowGroup, flowExecutionId+1, 
String.valueOf(ExecutionStatus.COMPLETE));
 
     Mockito.when(_jobStatusRetriever
         .getJobStatusesForFlowExecution(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), 
Mockito.anyString())).
-        thenReturn(jobStatusIterator1);
+        thenReturn(jobStatusIterator1).
+        thenReturn(jobStatusIterator2);
 
     // Run the thread once. Job should run without crashing thread on SLA 
check and cleanup
     this._dagManagerThread.run();
@@ -855,31 +905,36 @@ public class DagManagerTest {
     this.queue.offer(dagList.get(0));
     Config jobConfig0 = 
dagList.get(0).getNodes().get(0).getValue().getJobSpec().getConfig();
     Config jobConfig1 = 
dagList.get(1).getNodes().get(0).getValue().getJobSpec().getConfig();
-    Iterator<JobStatus> jobStatusIterator0 =
+    Iterator<JobStatus> jobStatusIteratorFlow0_0 =
         getMockJobStatus("flow0", "group0", 
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
             "job0", "group0", String.valueOf(ExecutionStatus.RUNNING));
-    Iterator<JobStatus> jobStatusIterator1 =
+    Iterator<JobStatus> jobStatusIteratorFlow1_0 =
         getMockJobStatus("flow1", "group1", 
Long.valueOf(jobConfig1.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
             "job0", "group1", String.valueOf(ExecutionStatus.FAILED));
+    Iterator<JobStatus> jobStatusIteratorFlow1_1 =
+        getMockFlowStatus("flow1", "group1", 
Long.valueOf(jobConfig1.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)), 
String.valueOf(ExecutionStatus.FAILED));
     // Cleanup the running job that is scheduled normally
-    Iterator<JobStatus> jobStatusIterator2 =
+    Iterator<JobStatus> jobStatusIteratorFlow0_1 =
         getMockJobStatus("flow0", "group0", 
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
             "job0", "group0", String.valueOf(ExecutionStatus.RUNNING));
-    Iterator<JobStatus> jobStatusIterator3 =
+    Iterator<JobStatus> jobStatusIteratorFlow0_2 =
         getMockJobStatus("flow0", "group0", 
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
             "job0", "group0", String.valueOf(ExecutionStatus.COMPLETE));
-
+    Iterator<JobStatus> jobStatusIteratorFlow0_3 =
+        getMockFlowStatus("flow0", "group0", 
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)), 
String.valueOf(ExecutionStatus.COMPLETE));
     Mockito.when(_jobStatusRetriever
         .getJobStatusesForFlowExecution(Mockito.eq("flow0"), 
Mockito.eq("group0"), Mockito.anyLong(),
             Mockito.anyString(), Mockito.anyString()))
-        .thenReturn(jobStatusIterator0)
-        .thenReturn(jobStatusIterator2)
-        .thenReturn(jobStatusIterator3);
+        .thenReturn(jobStatusIteratorFlow0_0)
+        .thenReturn(jobStatusIteratorFlow0_1)
+        .thenReturn(jobStatusIteratorFlow0_2)
+        .thenReturn(jobStatusIteratorFlow0_3);
 
     Mockito.when(_jobStatusRetriever
         .getJobStatusesForFlowExecution(Mockito.eq("flow1"), 
Mockito.eq("group1"), Mockito.anyLong(),
             Mockito.anyString(), Mockito.anyString()))
-        .thenReturn(jobStatusIterator1);
+        .thenReturn(jobStatusIteratorFlow1_0)
+        .thenReturn(jobStatusIteratorFlow1_1);
 
     this._dagManagerThread.run();
     // dag will not be processed due to exceeding the quota, will log a 
message and exit out without adding it to dags
@@ -892,6 +947,10 @@ public class DagManagerTest {
         "user")).getCount(), 1);
 
     this._dagManagerThread.run(); // cleanup
+
+    Assert.assertEquals(this.dags.size(), 0);
+    Assert.assertEquals(this.jobToDag.size(), 0);
+    Assert.assertEquals(this.dagToJobs.size(), 0);
   }
 
   @Test (dependsOnMethods = "testDagManagerQuotaExceeded")
@@ -905,36 +964,44 @@ public class DagManagerTest {
     Config jobConfig1 = 
dagList.get(1).getNodes().get(0).getValue().getJobSpec().getConfig();
     Config jobConfig2 = 
dagList.get(1).getNodes().get(0).getValue().getJobSpec().getConfig();
 
-    Iterator<JobStatus> jobStatusIterator0 =
+    Iterator<JobStatus> jobStatusIteratorFlow0_0 =
         getMockJobStatus("flow0", "group0", 
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
             "job0", "group0", String.valueOf(ExecutionStatus.RUNNING));
-    Iterator<JobStatus> jobStatusIterator1 =
+    Iterator<JobStatus> jobStatusIteratorFlow1_0 =
         getMockJobStatus("flow1", "group1", 
Long.valueOf(jobConfig1.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
             "job0", "group1", String.valueOf(ExecutionStatus.FAILED));
-    Iterator<JobStatus> jobStatusIterator2 =
+    Iterator<JobStatus> jobStatusIteratorFlow0_1 =
         getMockJobStatus("flow0", "group0", 
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
             "job0", "group0", String.valueOf(ExecutionStatus.RUNNING));
-    Iterator<JobStatus> jobStatusIterator3 =
-        getMockJobStatus("flow2", "group2", 
Long.valueOf(jobConfig2.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
-            "job0", "group2", String.valueOf(ExecutionStatus.FAILED));
-    Iterator<JobStatus> jobStatusIterator4 =
+    Iterator<JobStatus> jobStatusIteratorFlow0_2 =
         getMockJobStatus("flow0", "group0", 
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
             "job0", "group0", String.valueOf(ExecutionStatus.COMPLETE));
-
+    Iterator<JobStatus> jobStatusIteratorFlow2_0 =
+        getMockJobStatus("flow2", "group2", 
Long.valueOf(jobConfig2.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+            "job0", "group2", String.valueOf(ExecutionStatus.FAILED));
+    Iterator<JobStatus> jobStatusIteratorFlow0_3 =
+        getMockFlowStatus("flow0", "group0", 
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)), 
String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIteratorFlow1_1 =
+        getMockFlowStatus("flow1", "group2", 
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)), 
String.valueOf(ExecutionStatus.FAILED));
+    Iterator<JobStatus> jobStatusIteratorFlow2_1 =
+        getMockFlowStatus("flow1", "group2", 
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)), 
String.valueOf(ExecutionStatus.FAILED));
     Mockito.when(_jobStatusRetriever
         .getJobStatusesForFlowExecution(Mockito.eq("flow0"), 
Mockito.eq("group0"), Mockito.anyLong(),
             Mockito.anyString(), Mockito.anyString()))
-        .thenReturn(jobStatusIterator0)
-        .thenReturn(jobStatusIterator2)
-        .thenReturn(jobStatusIterator4);
+        .thenReturn(jobStatusIteratorFlow0_0)
+        .thenReturn(jobStatusIteratorFlow0_1)
+        .thenReturn(jobStatusIteratorFlow0_2)
+        .thenReturn(jobStatusIteratorFlow0_3);
     Mockito.when(_jobStatusRetriever
         .getJobStatusesForFlowExecution(Mockito.eq("flow1"), 
Mockito.eq("group1"), Mockito.anyLong(),
             Mockito.anyString(), Mockito.anyString()))
-        .thenReturn(jobStatusIterator1);
+        .thenReturn(jobStatusIteratorFlow1_0)
+        .thenReturn(jobStatusIteratorFlow1_1);
     Mockito.when(_jobStatusRetriever
         .getJobStatusesForFlowExecution(Mockito.eq("flow2"), 
Mockito.eq("group2"), Mockito.anyLong(),
             Mockito.anyString(), Mockito.anyString()))
-        .thenReturn(jobStatusIterator3);
+        .thenReturn(jobStatusIteratorFlow2_0)
+        .thenReturn(jobStatusIteratorFlow2_1);
 
     this._dagManagerThread.run();
 
@@ -946,6 +1013,7 @@ public class DagManagerTest {
     // Test case where a job that exceeded a quota would cause a double 
decrement after fixing the proxy user name, allowing for more jobs to run
     this.queue.offer(dagList.get(2));
     this._dagManagerThread.run();
+
     // Assert that running dag metrics are only counted once
     Assert.assertEquals(allCounters.get(MetricRegistry.name(
         ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
@@ -958,6 +1026,9 @@ public class DagManagerTest {
         ServiceMetricNames.SERVICE_USERS,
         "user")).getCount(), 0);
 
+    Assert.assertEquals(this.dags.size(), 0);
+    Assert.assertEquals(this.jobToDag.size(), 0);
+    Assert.assertEquals(this.dagToJobs.size(), 0);
   }
 
   @Test (dependsOnMethods = "testQuotaDecrement")
@@ -967,39 +1038,44 @@ public class DagManagerTest {
     this.queue.offer(dagList.get(0));
     Config jobConfig0 = 
dagList.get(0).getNodes().get(0).getValue().getJobSpec().getConfig();
     Config jobConfig1 = 
dagList.get(1).getNodes().get(0).getValue().getJobSpec().getConfig();
-    Iterator<JobStatus> jobStatusIterator0 =
+    Iterator<JobStatus> jobStatusIteratorFlow0_0 =
         getMockJobStatus("flow0", "group0", 
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
             "job0", "group0", String.valueOf(ExecutionStatus.ORCHESTRATED), 
true);
     // Cleanup the running job that is scheduled normally
-    Iterator<JobStatus> jobStatusIterator1 =
+    Iterator<JobStatus> jobStatusIteratorFlow0_1 =
         getMockJobStatus("flow0", "group0", 
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
             "job0", "group0", String.valueOf(ExecutionStatus.RUNNING), true);
-    Iterator<JobStatus> jobStatusIterator2 =
+    Iterator<JobStatus> jobStatusIteratorFlow0_2 =
         getMockJobStatus("flow0", "group0", 
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
             "job0", "group0", String.valueOf(ExecutionStatus.ORCHESTRATED));
-    Iterator<JobStatus> jobStatusIterator3 =
+    Iterator<JobStatus> jobStatusIteratorFlow0_3 =
         getMockJobStatus("flow0", "group0", 
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
             "job0", "group0", String.valueOf(ExecutionStatus.COMPLETE));
-    Iterator<JobStatus> jobStatusIterator4 =
+    Iterator<JobStatus> jobStatusIteratorFlow0_4 =
+        getMockFlowStatus("flow0", "group0", 
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)), 
String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIteratorFlow1_0 =
         getMockJobStatus("flow1", "group1", 
Long.valueOf(jobConfig1.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
             "job0", "group1", String.valueOf(ExecutionStatus.ORCHESTRATED));
-    Iterator<JobStatus> jobStatusIterator5 =
+    Iterator<JobStatus> jobStatusIteratorFlow1_1 =
         getMockJobStatus("flow1", "group1", 
Long.valueOf(jobConfig1.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
             "job0", "group1", String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIteratorFlow1_2 =
+        getMockFlowStatus("flow1", "group1", 
Long.valueOf(jobConfig1.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)), 
String.valueOf(ExecutionStatus.COMPLETE));
     Mockito.when(_jobStatusRetriever
         .getJobStatusesForFlowExecution(Mockito.eq("flow0"), 
Mockito.eq("group0"), Mockito.anyLong(),
             Mockito.anyString(), Mockito.anyString()))
-        .thenReturn(jobStatusIterator0)
-        .thenReturn(jobStatusIterator1)
-        .thenReturn(jobStatusIterator2)
-        .thenReturn(jobStatusIterator3);
+        .thenReturn(jobStatusIteratorFlow0_0)
+        .thenReturn(jobStatusIteratorFlow0_1)
+        .thenReturn(jobStatusIteratorFlow0_2)
+        .thenReturn(jobStatusIteratorFlow0_3)
+        .thenReturn(jobStatusIteratorFlow0_4);
 
     Mockito.when(_jobStatusRetriever
         .getJobStatusesForFlowExecution(Mockito.eq("flow1"), 
Mockito.eq("group1"), Mockito.anyLong(),
             Mockito.anyString(), Mockito.anyString()))
-        .thenReturn(jobStatusIterator4)
-        .thenReturn(jobStatusIterator5);
-
+        .thenReturn(jobStatusIteratorFlow1_0)
+        .thenReturn(jobStatusIteratorFlow1_1)
+        .thenReturn(jobStatusIteratorFlow1_2);
     // Dag1 is running
     this._dagManagerThread.run();
     SortedMap<String, Counter> allCounters = 
metricContext.getParent().get().getCounters();
@@ -1021,6 +1097,13 @@ public class DagManagerTest {
     this.queue.offer(dagList.get(1));
     this._dagManagerThread.run();
     this._dagManagerThread.run(); // cleanup
+    Assert.assertEquals(allCounters.get(MetricRegistry.name(
+        ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+        ServiceMetricNames.SERVICE_USERS,
+        "user")).getCount(), 0);
+    Assert.assertEquals(this.dags.size(), 0);
+    Assert.assertEquals(this.jobToDag.size(), 0);
+    Assert.assertEquals(this.dagToJobs.size(), 0);
   }
 
   @Test (dependsOnMethods = "testQuotasRetryFlow")
@@ -1031,20 +1114,25 @@ public class DagManagerTest {
         
ConfigBuilder.create().addPrimitive(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS,
 false).build());    //Add a dag to the queue of dags
     this.queue.offer(adhocDag);
 
-    Iterator<JobStatus> jobStatusIterator1 =
+    Iterator<JobStatus> jobStatusIteratorFlow0_0 =
         getMockJobStatus("flow" + flowId, "group" + flowId, flowId, "job0", 
"group0", String.valueOf(ExecutionStatus.COMPLETE));
-    Iterator<JobStatus> jobStatusIterator2 =
+    Iterator<JobStatus> jobStatusIteratorFlow0_1 =
+        getMockFlowStatus("flow" + flowId, "group" + flowId, flowId, 
String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIteratorFlow1_0 =
         getMockJobStatus("flow" + flowId+1, "group" + flowId+1, flowId+1, 
"job0", "group0", String.valueOf(ExecutionStatus.COMPLETE));
-
+    Iterator<JobStatus> jobStatusIteratorFlow1_1 =
+        getMockFlowStatus("flow" + flowId+1, "group" + flowId+1, flowId+1, 
String.valueOf(ExecutionStatus.COMPLETE));
 
     Mockito.when(_jobStatusRetriever
         .getJobStatusesForFlowExecution(Mockito.eq("flow" + flowId), 
Mockito.eq("group" + flowId), Mockito.anyLong(),
             Mockito.anyString(), Mockito.anyString()))
-        .thenReturn(jobStatusIterator1);
+        .thenReturn(jobStatusIteratorFlow0_0)
+        .thenReturn(jobStatusIteratorFlow0_1);
     Mockito.when(_jobStatusRetriever
         .getJobStatusesForFlowExecution(Mockito.eq("flow" + (flowId+1)), 
Mockito.eq("group" + (flowId+1)), Mockito.anyLong(),
             Mockito.anyString(), Mockito.anyString()))
-        .thenReturn(jobStatusIterator2);
+        .thenReturn(jobStatusIteratorFlow1_0)
+        .thenReturn(jobStatusIteratorFlow1_1);
 
     String flowStateGaugeName0 = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group"+flowId,
         "flow"+flowId, ServiceMetricNames.RUNNING_STATUS);
@@ -1063,6 +1151,9 @@ public class DagManagerTest {
     this._dagManagerThread.run();
     // should be successful since it should be cleaned up with status complete
     
Assert.assertEquals(metricContext.getParent().get().getGauges().get(flowStateGaugeName1).getValue(),
 DagManager.FlowState.SUCCESSFUL.value);
+    Assert.assertEquals(this.dags.size(), 0);
+    Assert.assertEquals(this.jobToDag.size(), 0);
+    Assert.assertEquals(this.dagToJobs.size(), 0);
   }
 
   @Test (dependsOnMethods = "testEmitFlowMetricOnlyIfNotAdhoc")
@@ -1087,33 +1178,48 @@ public class DagManagerTest {
     this.queue.offer(dagList.get(1));
     this.queue.offer(dagList.get(2));;
     // Set orchestration time to be 20 minutes in the past, the job should be 
marked as SLA killed
-    Iterator<JobStatus> jobStatusIterator1 =
+    Iterator<JobStatus> jobStatusIteratorFlow0_0 =
         getMockJobStatus("flow0", "group0", flowExecutionId, "job0", "group0", 
String.valueOf(ExecutionStatus.RUNNING),
             false, flowExecutionId);
-    Iterator<JobStatus> jobStatusIterator2 =
-        getMockJobStatus("flow1", "flow1", flowExecutionId, "job0", "group1", 
String.valueOf(ExecutionStatus.RUNNING),
+    Iterator<JobStatus> jobStatusIteratorFlow1_0 =
+        getMockJobStatus("flow1", "group1", flowExecutionId, "job0", "group1", 
String.valueOf(ExecutionStatus.RUNNING),
             false, flowExecutionId);
-    Iterator<JobStatus> jobStatusIterator3 =
-        getMockJobStatus("flow2", "flow2", flowExecutionId, "job0", "group2", 
String.valueOf(ExecutionStatus.RUNNING),
+    Iterator<JobStatus> jobStatusIteratorFlow2_0 =
+        getMockJobStatus("flow2", "group2", flowExecutionId, "job0", "group2", 
String.valueOf(ExecutionStatus.RUNNING),
             false, flowExecutionId);
-    Iterator<JobStatus> jobStatusIterator4 =
-        getMockJobStatus("flow0", "flow0", flowExecutionId, "job0", "group0", 
String.valueOf(ExecutionStatus.CANCELLED),
+    Iterator<JobStatus> jobStatusIteratorFlow0_1 =
+        getMockJobStatus("flow0", "group0", flowExecutionId, "job0", "group0", 
String.valueOf(ExecutionStatus.CANCELLED),
             false, flowExecutionId);
-    Iterator<JobStatus> jobStatusIterator5 =
-        getMockJobStatus("flow1", "flow1", flowExecutionId, "job0", "group1", 
String.valueOf(ExecutionStatus.CANCELLED),
+    Iterator<JobStatus> jobStatusIteratorFlow1_1 =
+        getMockJobStatus("flow1", "group1", flowExecutionId, "job0", "group1", 
String.valueOf(ExecutionStatus.CANCELLED),
             false, flowExecutionId);
-    Iterator<JobStatus> jobStatusIterator6 =
-        getMockJobStatus("flow2", "flow2", flowExecutionId, "job0", "group2", 
String.valueOf(ExecutionStatus.CANCELLED),
+    Iterator<JobStatus> jobStatusIteratorFlow2_1 =
+        getMockJobStatus("flow2", "group2", flowExecutionId, "job0", "group2", 
String.valueOf(ExecutionStatus.CANCELLED),
             false, flowExecutionId);
+    Iterator<JobStatus> jobStatusIteratorFlow0_2 =
+        getMockFlowStatus("flow0", "group0", flowExecutionId, 
String.valueOf(ExecutionStatus.CANCELLED));
+    Iterator<JobStatus> jobStatusIteratorFlow1_2 =
+        getMockFlowStatus("flow1", "group1", flowExecutionId, 
String.valueOf(ExecutionStatus.CANCELLED));
+    Iterator<JobStatus> jobStatusIteratorFlow2_2 =
+        getMockFlowStatus("flow2", "group2", flowExecutionId, 
String.valueOf(ExecutionStatus.CANCELLED));
 
     Mockito.when(_jobStatusRetriever
-        .getJobStatusesForFlowExecution(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), 
Mockito.anyString())).
-        thenReturn(jobStatusIterator1).
-        thenReturn(jobStatusIterator2).
-        thenReturn(jobStatusIterator3).
-        thenReturn(jobStatusIterator4).
-        thenReturn(jobStatusIterator5).
-        thenReturn(jobStatusIterator6);
+        .getJobStatusesForFlowExecution(Mockito.eq("flow0"), 
Mockito.eq("group0"), Mockito.anyLong(), Mockito.anyString(), 
Mockito.anyString())).
+        thenReturn(jobStatusIteratorFlow0_0).
+        thenReturn(jobStatusIteratorFlow0_1).
+        thenReturn(jobStatusIteratorFlow0_2);
+
+    Mockito.when(_jobStatusRetriever
+        .getJobStatusesForFlowExecution(Mockito.eq("flow1"), 
Mockito.eq("group1"), Mockito.anyLong(), Mockito.anyString(), 
Mockito.anyString())).
+        thenReturn(jobStatusIteratorFlow1_0).
+        thenReturn(jobStatusIteratorFlow1_1).
+        thenReturn(jobStatusIteratorFlow1_2);
+
+    Mockito.when(_jobStatusRetriever
+    .getJobStatusesForFlowExecution(Mockito.eq("flow2"), Mockito.eq("group2"), 
Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
+        thenReturn(jobStatusIteratorFlow2_0).
+        thenReturn(jobStatusIteratorFlow2_1).
+        thenReturn(jobStatusIteratorFlow2_2);
 
     // Run the thread once. All 3 jobs should be emitted an SLA exceeded event
     this._dagManagerThread.run();
@@ -1126,7 +1232,6 @@ public class DagManagerTest {
     String slakilledGroupName = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group0", 
ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER);
     
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName1).getCount(),
 2);
     
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName2).getCount(),
 1);
-    
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledGroupName).getCount(),
 1);
     // Cleanup
     this._dagManagerThread.run();
     
Assert.assertEquals(metricContext.getParent().get().getMeters().get(allSlaKilledMeterName).getCount(),
 previousSlaKilledCount + 3);
@@ -1160,39 +1265,48 @@ public class DagManagerTest {
     this.queue.offer(dagList.get(1));
     this.queue.offer(dagList.get(2));;
     // The start time should be 16 minutes ago, which is past the start SLA so 
the job should be cancelled
-    Iterator<JobStatus> jobStatusIterator1 =
+    Iterator<JobStatus> jobStatusIteratorFlow0_0 =
         getMockJobStatus( "flow0", "group0", flowExecutionId, "job0", 
"group0", String.valueOf(ExecutionStatus.ORCHESTRATED),
             false, flowExecutionId);
-    Iterator<JobStatus> jobStatusIterator2 =
-        getMockJobStatus("flow1", "flow1", flowExecutionId+1, "job0", 
"group1", String.valueOf(ExecutionStatus.ORCHESTRATED),
+    Iterator<JobStatus> jobStatusIteratorFlow1_0 =
+        getMockJobStatus("flow1", "group1", flowExecutionId+1, "job0", 
"group1", String.valueOf(ExecutionStatus.ORCHESTRATED),
             false, flowExecutionId);
-    Iterator<JobStatus> jobStatusIterator3 =
-        getMockJobStatus("flow2", "flow2", flowExecutionId+1, "job0", 
"group2", String.valueOf(ExecutionStatus.ORCHESTRATED),
+    Iterator<JobStatus> jobStatusIteratorFlow2_0 =
+        getMockJobStatus("flow2", "group2", flowExecutionId+1, "job0", 
"group2", String.valueOf(ExecutionStatus.ORCHESTRATED),
             false, flowExecutionId);
-    Iterator<JobStatus> jobStatusIterator4 =
-        getMockJobStatus( "flow0", "flow0", flowExecutionId+1, "job0", 
"group0", String.valueOf(ExecutionStatus.COMPLETE),
+    Iterator<JobStatus> jobStatusIteratorFlow0_1 =
+        getMockJobStatus( "flow0", "group0", flowExecutionId+1, "job0", 
"group0", String.valueOf(ExecutionStatus.COMPLETE),
             false, flowExecutionId);
-    Iterator<JobStatus> jobStatusIterator5 =
-        getMockJobStatus("flow1", "flow1", flowExecutionId+1, "job0", 
"group1", String.valueOf(ExecutionStatus.FAILED),
+    Iterator<JobStatus> jobStatusIteratorFlow1_1 =
+        getMockJobStatus("flow1", "group1", flowExecutionId+1, "job0", 
"group1", String.valueOf(ExecutionStatus.FAILED),
             false, flowExecutionId);
-    Iterator<JobStatus> jobStatusIterator6 =
-        getMockJobStatus("flow2", "flow2", flowExecutionId+1, "job0", 
"group2", String.valueOf(ExecutionStatus.COMPLETE),
+    Iterator<JobStatus> jobStatusIteratorFlow2_1 =
+        getMockJobStatus("flow2", "group2", flowExecutionId+1, "job0", 
"group2", String.valueOf(ExecutionStatus.COMPLETE),
             false, flowExecutionId);
+    Iterator<JobStatus> jobStatusIteratorFlow0_2 =
+        getMockFlowStatus( "flow0", "group0", flowExecutionId+1, 
String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIteratorFlow1_2 =
+        getMockFlowStatus("flow1", "group1", flowExecutionId+1, 
String.valueOf(ExecutionStatus.FAILED));
+    Iterator<JobStatus> jobStatusIteratorFlow2_2 =
+        getMockFlowStatus("flow2", "group2", flowExecutionId+1, 
String.valueOf(ExecutionStatus.COMPLETE));
 
     Mockito.when(_jobStatusRetriever
         .getJobStatusesForFlowExecution(Mockito.eq("flow0"), 
Mockito.eq("group0"), Mockito.anyLong(), Mockito.anyString(), 
Mockito.anyString())).
-        thenReturn(jobStatusIterator1).
-        thenReturn(jobStatusIterator4);
+        thenReturn(jobStatusIteratorFlow0_0).
+        thenReturn(jobStatusIteratorFlow0_1).
+        thenReturn(jobStatusIteratorFlow0_2);
 
     Mockito.when(_jobStatusRetriever
         .getJobStatusesForFlowExecution(Mockito.eq("flow1"), 
Mockito.eq("group1"), Mockito.anyLong(), Mockito.anyString(), 
Mockito.anyString())).
-        thenReturn(jobStatusIterator2).
-        thenReturn(jobStatusIterator5);
+        thenReturn(jobStatusIteratorFlow1_0).
+        thenReturn(jobStatusIteratorFlow1_1).
+        thenReturn(jobStatusIteratorFlow1_2);
 
     Mockito.when(_jobStatusRetriever
         .getJobStatusesForFlowExecution(Mockito.eq("flow2"), 
Mockito.eq("group2"), Mockito.anyLong(), Mockito.anyString(), 
Mockito.anyString())).
-        thenReturn(jobStatusIterator3).
-        thenReturn(jobStatusIterator6);
+        thenReturn(jobStatusIteratorFlow2_0).
+        thenReturn(jobStatusIteratorFlow2_1).
+        thenReturn(jobStatusIteratorFlow2_2);
 
     this._dagManagerThread.run();
 
@@ -1213,9 +1327,8 @@ public class DagManagerTest {
     Assert.assertEquals(this.dags.size(), 0);
     Assert.assertEquals(this.jobToDag.size(), 0);
     Assert.assertEquals(this.dagToJobs.size(), 0);
-  }
-
 
+  }
 
   @AfterClass
   public void cleanUp() throws Exception {

Reply via email to