Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 95eff37ce -> c103a8f6a


[GOBBLIN-614][GOBBLIN-599][GOBBLIN-598][GOBBLIN-615][GOBBLIN-616] Allow 
multiple flow failure options in DagManager.

Closes #2481 from sv2000/retention


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/c103a8f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/c103a8f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/c103a8f6

Branch: refs/heads/master
Commit: c103a8f6a6fc09926fdb2b62ead516b63a4ba9aa
Parents: 95eff37
Author: suvasude <[email protected]>
Authored: Tue Oct 23 13:35:42 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Tue Oct 23 13:35:42 2018 -0700

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |   1 +
 .../modules/orchestration/DagManager.java       | 142 ++++++++----
 .../modules/orchestration/DagManagerUtils.java  |  54 +++--
 .../service/modules/spec/JobExecutionPlan.java  |   7 +-
 .../modules/orchestration/DagManagerTest.java   | 218 +++++++++++--------
 .../orchestration/DagManagerUtilsTest.java      | 117 ----------
 6 files changed, 268 insertions(+), 271 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c103a8f6/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 40d2f44..ffc2376 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -132,6 +132,7 @@ public class ConfigurationKeys {
   public static final String FLOW_GROUP_KEY = "flow.group";
   public static final String FLOW_DESCRIPTION_KEY = "flow.description";
   public static final String FLOW_EXECUTION_ID_KEY = "flow.executionId";
+  public static final String FLOW_FAILURE_OPTION = "flow.failureOption";
 
   /**
    * Common topology configuration properties.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c103a8f6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index ac304fb..2d0c555 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -50,12 +50,18 @@ import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.monitoring.JobStatus;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
+import static org.apache.gobblin.service.ExecutionStatus.COMPLETE;
+import static org.apache.gobblin.service.ExecutionStatus.FAILED;
+import static org.apache.gobblin.service.ExecutionStatus.RUNNING;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
 
 /**
  * This class implements a manager to manage the life cycle of a {@link Dag}. 
A {@link Dag} is submitted to the
@@ -78,16 +84,43 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 @Alpha
 @Slf4j
 public class DagManager extends AbstractIdleService {
+  public static final String DEFAULT_FLOW_FAILURE_OPTION = 
FailureOption.FINISH_ALL_POSSIBLE.name();
+
   private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
   private static final Integer DEFAULT_NUM_THREADS = 3;
   private static final Integer TERMINATION_TIMEOUT = 30;
+  private static final String DAG_MANAGER_PREFIX = 
"gobblin.service.dagManager.";
+  private static final String NUM_THREADS_KEY = DAG_MANAGER_PREFIX + 
"numThreads";
+  private static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
DAG_MANAGER_PREFIX + "pollingInterval";
+  private static final String JOB_STATUS_RETRIEVER_KEY = DAG_MANAGER_PREFIX + 
"jobStatusRetriever";
+  private static final String DAG_STORE_CLASS_KEY = DAG_MANAGER_PREFIX + 
"dagStateStoreClass";
+
+  static final String DAG_STATESTORE_DIR = DAG_MANAGER_PREFIX + 
"dagStateStoreDir";
 
-  public static final String DAG_MANAGER_PREFIX = 
"gobblin.service.dagManager.";
-  public static final String NUM_THREADS_KEY = DAG_MANAGER_PREFIX + 
"numThreads";
-  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
DAG_MANAGER_PREFIX + "pollingInterval";
-  public static final String JOB_STATUS_RETRIEVER_KEY = DAG_MANAGER_PREFIX + 
"jobStatusRetriever";
-  public static final String DAG_STORE_CLASS_KEY = DAG_MANAGER_PREFIX + 
"dagStateStoreClass";
-  public static final String DAG_STATESTORE_DIR = DAG_MANAGER_PREFIX + 
"dagStateStoreDir";
+  /**
+   * Action to be performed on a {@link Dag}, in case of a job failure. 
Currently, we allow 2 modes:
+   * <ul>
+   *   <li> FINISH_RUNNING, which allows currently running jobs to finish.</li>
+   *   <li> FINISH_ALL_POSSIBLE, which allows every possible job in the Dag to 
finish, as long as all the dependencies
+   *   of the job are successful.</li>
+   * </ul>
+   */
+  public enum FailureOption {
+    FINISH_RUNNING("FINISH_RUNNING"),
+    CANCEL("CANCEL"),
+    FINISH_ALL_POSSIBLE("FINISH_ALL_POSSIBLE");
+
+    private final String failureOption;
+
+    FailureOption(final String failureOption) {
+      this.failureOption = failureOption;
+    }
+
+    @Override
+    public String toString() {
+      return this.failureOption;
+    }
+  }
 
   private BlockingQueue<Dag<JobExecutionPlan>> queue;
   private ScheduledExecutorService scheduledExecutorPool;
@@ -138,7 +171,7 @@ public class DagManager extends AbstractIdleService {
    * submitted dag to the {@link DagStateStore} and then adds the dag to a 
{@link BlockingQueue} to be picked up
    * by one of the {@link DagManagerThread}s.
    */
-  public synchronized void offer(Dag<JobExecutionPlan> dag) throws IOException 
{
+  synchronized void offer(Dag<JobExecutionPlan> dag) throws IOException {
     //Persist the dag
     this.dagStateStore.writeCheckpoint(dag);
     //Add it to the queue of dags
@@ -175,10 +208,11 @@ public class DagManager extends AbstractIdleService {
    * </ol>
    */
   public static class DagManagerThread implements Runnable {
-    private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new HashMap<>();
+    private final Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new HashMap<>();
     private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
-    private final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> 
dagToJobs = new HashMap<>();
-    private final Set<String> failedDagIds = new HashSet<>();
+    private final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs 
= new HashMap<>();
+    private final Set<String> failedDagIdsFinishRunning = new HashSet<>();
+    private final Set<String> failedDagIdsFinishAllPossible = new HashSet<>();
     private final MetricContext metricContext;
     private final Optional<EventSubmitter> eventSubmitter;
 
@@ -189,8 +223,8 @@ public class DagManager extends AbstractIdleService {
     /**
      * Constructor.
      */
-    public DagManagerThread(JobStatusRetriever jobStatusRetriever, 
DagStateStore dagStateStore, BlockingQueue<Dag<JobExecutionPlan>> queue,
-        boolean instrumentationEnabled) {
+    DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore 
dagStateStore,
+        BlockingQueue<Dag<JobExecutionPlan>> queue, boolean 
instrumentationEnabled) {
       this.jobStatusRetriever = jobStatusRetriever;
       this.dagStateStore = dagStateStore;
       this.queue = queue;
@@ -204,7 +238,8 @@ public class DagManager extends AbstractIdleService {
     }
 
     /**
-     * Main body of the {@link DagManagerThread}.
+     * Main body of the {@link DagManagerThread}. Deque the next item from the 
queue and poll job statuses of currently
+     * running jobs.
      */
     @Override
     public void run() {
@@ -213,7 +248,9 @@ public class DagManager extends AbstractIdleService {
         //Poll the queue for a new Dag to execute.
         if (nextItem != null) {
           Dag<JobExecutionPlan> dag = (Dag<JobExecutionPlan>) nextItem;
-
+          if (dag.isEmpty()) {
+            log.info("Empty dag; ignoring the dag");
+          }
           //Initialize dag.
           initialize(dag);
         }
@@ -251,18 +288,18 @@ public class DagManager extends AbstractIdleService {
       }
 
       this.dags.put(dagId, dag);
-      log.info("Dag {} - determining if any jobs are already running.");
+      log.info("Dag {} - determining if any jobs are already running.", dagId);
       //Are there any jobs already in the running state? This check is for 
Dags already running
       //before a leadership change occurs.
-      for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
-        if (DagManagerUtils.getExecutionStatus(dagNode) == 
ExecutionStatus.RUNNING) {
+      for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+        if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) {
           addJobState(dagId, dagNode);
         }
       }
-      log.info("Dag {} submitting jobs ready for execution.");
+      log.info("Dag {} submitting jobs ready for execution.", dagId);
       //Determine the next set of jobs to run and submit them for execution
       submitNext(dagId);
-      log.info("Dag {} Initialization complete.");
+      log.info("Dag {} Initialization complete.", dagId);
     }
 
     /**
@@ -271,8 +308,8 @@ public class DagManager extends AbstractIdleService {
      */
     private void pollJobStatuses()
         throws IOException {
-      this.failedDagIds.clear();
-      for (Dag.DagNode<JobExecutionPlan> node : this.jobToDag.keySet()) {
+      this.failedDagIdsFinishRunning.clear();
+      for (DagNode<JobExecutionPlan> node : this.jobToDag.keySet()) {
         TimingEvent jobStatusPollTimer = this.eventSubmitter.isPresent()
             ? 
eventSubmitter.get().getTimingEvent(TimingEvent.JobStatusTimings.JOB_STATUS_POLLED)
             : null;
@@ -282,19 +319,21 @@ public class DagManager extends AbstractIdleService {
         }
         Preconditions.checkNotNull(jobStatus, "Received null job status for a 
running job " + DagManagerUtils.getJobName(node));
         JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(node);
-        //TODO: This will be updated when JobStatus schema provides the 
correct execution status.
-        //Currently, it is a placeholder.
-        switch (jobStatus.getEventName()) {
-          case TimingEvent.LauncherTimings.JOB_COMPLETE:
-            jobExecutionPlan.setExecutionStatus(ExecutionStatus.COMPLETE);
+
+        ExecutionStatus status = valueOf(jobStatus.getEventName());
+
+        switch (status) {
+          case COMPLETE:
+            jobExecutionPlan.setExecutionStatus(COMPLETE);
             onJobFinish(node);
             break;
-          case TimingEvent.LauncherTimings.JOB_FAILED:
-            jobExecutionPlan.setExecutionStatus(ExecutionStatus.FAILED);
+          case FAILED:
+          case CANCELLED:
+            jobExecutionPlan.setExecutionStatus(FAILED);
             onJobFinish(node);
             break;
           default:
-            jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
+            jobExecutionPlan.setExecutionStatus(RUNNING);
             break;
         }
       }
@@ -303,7 +342,7 @@ public class DagManager extends AbstractIdleService {
     /**
      * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
      */
-    private JobStatus pollJobStatus(Dag.DagNode<JobExecutionPlan> dagNode) {
+    private JobStatus pollJobStatus(DagNode<JobExecutionPlan> dagNode) {
       Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
       String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
       String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
@@ -320,11 +359,11 @@ public class DagManager extends AbstractIdleService {
       }
     }
 
-    public void submitNext(String dagId)
-        throws IOException {
+    void submitNext(String dagId) throws IOException {
       Dag<JobExecutionPlan> dag = this.dags.get(dagId);
+      Set<DagNode<JobExecutionPlan>> nextNodes = DagManagerUtils.getNext(dag);
       //Submit jobs from the dag ready for execution.
-      for (Dag.DagNode<JobExecutionPlan> dagNode : 
DagManagerUtils.getNext(dag)) {
+      for (DagNode<JobExecutionPlan> dagNode : nextNodes) {
         submitJob(dagNode);
         addJobState(dagId, dagNode);
       }
@@ -335,9 +374,9 @@ public class DagManager extends AbstractIdleService {
     /**
      * Submits a {@link JobSpec} to a {@link 
org.apache.gobblin.runtime.api.SpecExecutor}.
      */
-    private void submitJob(Dag.DagNode<JobExecutionPlan> dagNode) {
+    private void submitJob(DagNode<JobExecutionPlan> dagNode) {
       JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(dagNode);
-      jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
+      jobExecutionPlan.setExecutionStatus(RUNNING);
       JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
 
       // Run this spec on selected executor
@@ -359,7 +398,7 @@ public class DagManager extends AbstractIdleService {
      * Method that defines the actions to be performed when a job finishes 
either successfully or with failure.
      * This method updates the state of the dag and performs clean up actions 
as necessary.
      */
-    private void onJobFinish(Dag.DagNode<JobExecutionPlan> dagNode)
+    private void onJobFinish(DagNode<JobExecutionPlan> dagNode)
         throws IOException {
       Dag<JobExecutionPlan> dag = this.jobToDag.get(dagNode);
       String dagId = DagManagerUtils.generateDagId(dag);
@@ -369,25 +408,29 @@ public class DagManager extends AbstractIdleService {
 
       deleteJobState(dagId, dagNode);
 
-      if (jobStatus == ExecutionStatus.COMPLETE) {
+      if (jobStatus == COMPLETE) {
         submitNext(dagId);
-      } else if (jobStatus == ExecutionStatus.FAILED) {
-        this.failedDagIds.add(dagId);
+      } else if (jobStatus == FAILED) {
+        if (DagManagerUtils.getFailureOption(dag) == 
FailureOption.FINISH_RUNNING) {
+          this.failedDagIdsFinishRunning.add(dagId);
+        } else {
+          this.failedDagIdsFinishAllPossible.add(dagId);
+        }
       }
     }
 
-    private void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> 
dagNode) {
+    private void deleteJobState(String dagId, DagNode<JobExecutionPlan> 
dagNode) {
       this.jobToDag.remove(dagNode);
       this.dagToJobs.get(dagId).remove(dagNode);
     }
 
-    private void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> 
dagNode) {
+    private void addJobState(String dagId, DagNode<JobExecutionPlan> dagNode) {
       Dag<JobExecutionPlan> dag = this.dags.get(dagId);
       this.jobToDag.put(dagNode, dag);
       if (this.dagToJobs.containsKey(dagId)) {
         this.dagToJobs.get(dagId).add(dagNode);
       } else {
-        LinkedList<Dag.DagNode<JobExecutionPlan>> dagNodeList = 
Lists.newLinkedList();
+        LinkedList<DagNode<JobExecutionPlan>> dagNodeList = 
Lists.newLinkedList();
         dagNodeList.add(dagNode);
         this.dagToJobs.put(dagId, dagNodeList);
       }
@@ -402,21 +445,26 @@ public class DagManager extends AbstractIdleService {
      */
     private void cleanUp() {
       //Clean up failed dags
-      for (String dagId : this.failedDagIds) {
+      for (String dagId : this.failedDagIdsFinishRunning) {
         //Skip monitoring of any other jobs of the failed dag.
-        LinkedList<Dag.DagNode<JobExecutionPlan>> dagNodeList = 
this.dagToJobs.get(dagId);
+        LinkedList<DagNode<JobExecutionPlan>> dagNodeList = 
this.dagToJobs.get(dagId);
         while (!dagNodeList.isEmpty()) {
-          Dag.DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
+          DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
           deleteJobState(dagId, dagNode);
         }
         log.info("Dag {} has finished with status FAILED; Cleaning up dag from 
the state store.", dagId);
         cleanUpDag(dagId);
       }
 
-      //Clean up successfully completed dags
+      //Clean up completed dags
       for (String dagId : this.dags.keySet()) {
         if (!hasRunningJobs(dagId)) {
-          log.info("Dag {} has finished with status COMPLETE; Cleaning up dag 
from the state store.", dagId);
+          String status = "COMPLETE";
+          if (this.failedDagIdsFinishAllPossible.contains(dagId)) {
+            status = "FAILED";
+            this.failedDagIdsFinishAllPossible.remove(dagId);
+          }
+          log.info("Dag {} has finished with status {}; Cleaning up dag from 
the state store.", dagId, status);
           cleanUpDag(dagId);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c103a8f6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 348aa78..9ac51fe 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -31,7 +31,11 @@ import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
+import 
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
 
 public class DagManagerUtils {
   /**
@@ -39,7 +43,7 @@ public class DagManagerUtils {
    * @param dag instance of a {@link Dag}.
    * @return a String id associated corresponding to the {@link Dag} instance.
    */
-  public static String generateDagId(Dag<JobExecutionPlan> dag) {
+  static String generateDagId(Dag<JobExecutionPlan> dag) {
     Config jobConfig = 
dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
     String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
     String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
@@ -47,28 +51,28 @@ public class DagManagerUtils {
     return Joiner.on("_").join(flowGroup, flowName, flowExecutionId);
   }
 
-  public static String getJobName(Dag.DagNode<JobExecutionPlan> dagNode) {
+  static String getJobName(DagNode<JobExecutionPlan> dagNode) {
     return 
dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY);
   }
 
-  public static JobExecutionPlan 
getJobExecutionPlan(Dag.DagNode<JobExecutionPlan> dagNode) {
+  static JobExecutionPlan getJobExecutionPlan(DagNode<JobExecutionPlan> 
dagNode) {
     return dagNode.getValue();
   }
 
-  public static JobSpec getJobSpec(Dag.DagNode<JobExecutionPlan> dagNode) {
+  public static JobSpec getJobSpec(DagNode<JobExecutionPlan> dagNode) {
     return dagNode.getValue().getJobSpec();
   }
 
-  public static Config getJobConfig(Dag.DagNode<JobExecutionPlan> dagNode) {
+  static Config getJobConfig(DagNode<JobExecutionPlan> dagNode) {
     return dagNode.getValue().getJobSpec().getConfig();
   }
 
-  public static SpecProducer getSpecProducer(Dag.DagNode<JobExecutionPlan> 
dagNode)
+  static SpecProducer getSpecProducer(DagNode<JobExecutionPlan> dagNode)
       throws ExecutionException, InterruptedException {
     return dagNode.getValue().getSpecExecutor().getProducer().get();
   }
 
-  public static ExecutionStatus 
getExecutionStatus(Dag.DagNode<JobExecutionPlan> dagNode) {
+  static ExecutionStatus getExecutionStatus(DagNode<JobExecutionPlan> dagNode) 
{
     return dagNode.getValue().getExecutionStatus();
   }
 
@@ -77,17 +81,19 @@ public class DagManagerUtils {
    * identifies each node yet to be executed and for which each of its parent 
nodes is in the {@link ExecutionStatus#COMPLETE}
    * state.
    */
-  public static Set<Dag.DagNode<JobExecutionPlan>> 
getNext(Dag<JobExecutionPlan> dag) {
-    Set<Dag.DagNode<JobExecutionPlan>> nextNodesToExecute = new HashSet<>();
-    LinkedList<Dag.DagNode<JobExecutionPlan>> nodesToExpand = 
Lists.newLinkedList(dag.getStartNodes());
+  static Set<DagNode<JobExecutionPlan>> getNext(Dag<JobExecutionPlan> dag) {
+    Set<DagNode<JobExecutionPlan>> nextNodesToExecute = new HashSet<>();
+    LinkedList<DagNode<JobExecutionPlan>> nodesToExpand = 
Lists.newLinkedList(dag.getStartNodes());
+    FailureOption failureOption = getFailureOption(dag);
 
     while (!nodesToExpand.isEmpty()) {
-      Dag.DagNode<JobExecutionPlan> node = nodesToExpand.poll();
+      DagNode<JobExecutionPlan> node = nodesToExpand.poll();
+      ExecutionStatus executionStatus = getExecutionStatus(node);
       boolean addFlag = true;
-      if (getExecutionStatus(node) == ExecutionStatus.$UNKNOWN) {
+      if (executionStatus == ExecutionStatus.$UNKNOWN) {
         //Add a node to be executed next, only if all of its parent nodes are 
COMPLETE.
-        List<Dag.DagNode<JobExecutionPlan>> parentNodes = dag.getParents(node);
-        for (Dag.DagNode<JobExecutionPlan> parentNode : parentNodes) {
+        List<DagNode<JobExecutionPlan>> parentNodes = dag.getParents(node);
+        for (DagNode<JobExecutionPlan> parentNode : parentNodes) {
           if (getExecutionStatus(parentNode) != ExecutionStatus.COMPLETE) {
             addFlag = false;
             break;
@@ -96,14 +102,28 @@ public class DagManagerUtils {
         if (addFlag) {
           nextNodesToExecute.add(node);
         }
-      } else if (getExecutionStatus(node) == ExecutionStatus.COMPLETE) {
+      } else if (executionStatus == ExecutionStatus.COMPLETE) {
         //Explore the children of COMPLETED node as next candidates for 
execution.
         nodesToExpand.addAll(dag.getChildren(node));
-      } else {
-        return new HashSet<>();
+      } else if ((executionStatus == ExecutionStatus.FAILED) || 
(executionStatus == ExecutionStatus.CANCELLED)) {
+        switch (failureOption) {
+          case FINISH_RUNNING:
+            return new HashSet<>();
+          case FINISH_ALL_POSSIBLE:
+          default:
+            break;
+        }
       }
     }
     return nextNodesToExecute;
   }
 
+  static FailureOption getFailureOption(Dag<JobExecutionPlan> dag) {
+    if (dag.isEmpty()) {
+      return null;
+    }
+    DagNode<JobExecutionPlan> dagNode = dag.getStartNodes().get(0);
+    String failureOption = ConfigUtils.getString(getJobConfig(dagNode), 
ConfigurationKeys.FLOW_FAILURE_OPTION, DagManager.DEFAULT_FLOW_FAILURE_OPTION);
+    return FailureOption.valueOf(failureOption);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c103a8f6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 023e104..5e9f86c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -35,6 +35,7 @@ import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -71,6 +72,7 @@ public class JobExecutionPlan {
       String flowName = ConfigUtils.getString(flowConfig, 
ConfigurationKeys.FLOW_NAME_KEY, "");
       String flowGroup = ConfigUtils.getString(flowConfig, 
ConfigurationKeys.FLOW_GROUP_KEY, "");
       String jobName = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.JOB_NAME_KEY, "");
+      String flowFailureOption = ConfigUtils.getString(flowConfig, 
ConfigurationKeys.FLOW_FAILURE_OPTION, DagManager.DEFAULT_FLOW_FAILURE_OPTION);
 
       //Modify the job name to include the flow group, flow name and a 
randomly generated integer to make the job name unique.
       jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, 
flowName, jobName, random.nextInt(Integer.MAX_VALUE));
@@ -101,8 +103,9 @@ public class JobExecutionPlan {
       
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_NAME_KEY, 
ConfigValueFactory.fromAnyRef(jobName)));
       
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_GROUP_KEY,
 ConfigValueFactory.fromAnyRef(flowGroup)));
 
-      //Enable job lock for each job to prevent concurrent executions.
-      
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_LOCK_ENABLED_KEY,
 ConfigValueFactory.fromAnyRef(true)));
+      //Add flow failure option
+      
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_FAILURE_OPTION,
+          ConfigValueFactory.fromAnyRef(flowFailureOption)));
 
       // Reset properties in Spec from Config
       
jobSpec.setConfigAsProperties(ConfigUtils.configToProperties(jobSpec.getConfig()));

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c103a8f6/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index f7aaaed..f23ece1 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -17,6 +17,7 @@
 package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -35,17 +36,19 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
 import org.apache.gobblin.service.monitoring.JobStatus;
@@ -58,8 +61,8 @@ public class DagManagerTest {
   private JobStatusRetriever _jobStatusRetriever;
   private DagManager.DagManagerThread _dagManagerThread;
   private LinkedBlockingQueue<Dag<JobExecutionPlan>> queue;
-  private Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag;
-  private Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs;
+  private Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag;
+  private Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs;
   private Map<String, Dag<JobExecutionPlan>> dags;
 
   @BeforeClass
@@ -74,11 +77,11 @@ public class DagManagerTest {
 
     Field jobToDagField = 
DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
     jobToDagField.setAccessible(true);
-    this.jobToDag = (Map<Dag.DagNode<JobExecutionPlan>, 
Dag<JobExecutionPlan>>) jobToDagField.get(this._dagManagerThread);
+    this.jobToDag = (Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>>) 
jobToDagField.get(this._dagManagerThread);
 
     Field dagToJobsField = 
DagManager.DagManagerThread.class.getDeclaredField("dagToJobs");
     dagToJobsField.setAccessible(true);
-    this.dagToJobs = (Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>>) 
dagToJobsField.get(this._dagManagerThread);
+    this.dagToJobs = (Map<String, LinkedList<DagNode<JobExecutionPlan>>>) 
dagToJobsField.get(this._dagManagerThread);
 
     Field dagsField = 
DagManager.DagManagerThread.class.getDeclaredField("dags");
     dagsField.setAccessible(true);
@@ -87,12 +90,13 @@ public class DagManagerTest {
   }
 
   /**
-   * Create a {@link Dag < JobExecutionPlan >} with one parent and remaining 
nodes as its children.
+   * Create a {@link Dag <JobExecutionPlan>}.
    * @return a Dag.
    */
-  public Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId, int 
numNodes)
+  private Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId, 
String flowFailureOption, boolean flag)
       throws URISyntaxException {
     List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+    int numNodes = (flag) ? 3 : 5;
     for (int i = 0; i < numNodes; i++) {
       String suffix = Integer.toString(i);
       Config jobConfig = ConfigBuilder.create().
@@ -100,9 +104,14 @@ public class DagManagerTest {
           addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id).
           addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
flowExecutionId).
           addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id).
-          addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).build();
-      if (i > 0) {
+          addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).
+          addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION, 
flowFailureOption).build();
+      if ((i == 1) || (i == 2)) {
         jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job0"));
+      } else if (i == 3) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job1"));
+      } else if (i == 4) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job2"));
       }
       JobSpec js = JobSpec.builder("test_job" + 
suffix).withVersion(suffix).withConfig(jobConfig).
           withTemplate(new URI("job" + suffix)).build();
@@ -119,28 +128,27 @@ public class DagManagerTest {
   }
 
   @Test
-  public void testSuccessfulDag() throws URISyntaxException {
+  public void testSuccessfulDag() throws URISyntaxException, IOException {
     long flowExecutionId = System.currentTimeMillis();
     String flowGroupId = "0";
     String flowGroup = "group" + flowGroupId;
     String flowName = "flow" + flowGroupId;
-    String jobGroup = flowGroup;
     String jobName0 = "job0";
     String jobName1 = "job1";
     String jobName2 = "job2";
 
-    Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, 3);
+    Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, 
"FINISH_RUNNING", true);
     String dagId = DagManagerUtils.generateDagId(dag);
 
     //Add a dag to the queue of dags
     this.queue.offer(dag);
-    Iterator<JobStatus> jobStatusIterator1 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName0, 
TimingEvent.LauncherTimings.JOB_RUN);
-    Iterator<JobStatus> jobStatusIterator2 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName0, 
TimingEvent.LauncherTimings.JOB_COMPLETE);
-    Iterator<JobStatus> jobStatusIterator3 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName1, 
TimingEvent.LauncherTimings.JOB_RUN);
-    Iterator<JobStatus> jobStatusIterator4 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName2, 
TimingEvent.LauncherTimings.JOB_RUN);
-    Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName1, 
TimingEvent.LauncherTimings.JOB_RUN);
-    Iterator<JobStatus> jobStatusIterator6 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName2, 
TimingEvent.LauncherTimings.JOB_COMPLETE);
-    Iterator<JobStatus> jobStatusIterator7 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName2, 
TimingEvent.LauncherTimings.JOB_COMPLETE);
+    Iterator<JobStatus> jobStatusIterator1 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, flowGroup, jobName0, 
String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator2 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, flowGroup, jobName0, 
String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIterator3 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, flowGroup, jobName1, 
String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator4 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, flowGroup, jobName2, 
String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, flowGroup, jobName1, 
String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator6 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, flowGroup, jobName2, 
String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIterator7 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, flowGroup, jobName1, 
String.valueOf(ExecutionStatus.COMPLETE));
 
     
Mockito.when(_jobStatusRetriever.getJobStatusesForFlowExecution(Mockito.anyString(),
 Mockito.anyString(),
         Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
@@ -195,78 +203,112 @@ public class DagManagerTest {
     Assert.assertEquals(this.dags.size(), 0);
     Assert.assertEquals(this.jobToDag.size(), 0);
     Assert.assertEquals(this.dagToJobs.size(), 0);
+    Assert.assertEquals(this._dagStateStore.getDags().size(), 0);
   }
 
   @Test (dependsOnMethods = "testSuccessfulDag")
-  public void testFailedDag() throws URISyntaxException {
-    long flowExecutionId = System.currentTimeMillis();
-    String flowGroupId = "0";
-    String flowGroup = "group" + flowGroupId;
-    String flowName = "flow" + flowGroupId;
-    String jobGroup = flowGroup;
-    String jobName0 = "job0";
-    String jobName1 = "job1";
-    String jobName2 = "job2";
-
-    Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, 3);
-    String dagId = DagManagerUtils.generateDagId(dag);
-
-    //Add a dag to the queue of dags
-    this.queue.offer(dag);
-    Iterator<JobStatus> jobStatusIterator1 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName0, 
TimingEvent.LauncherTimings.JOB_START);
-    Iterator<JobStatus> jobStatusIterator2 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName0, 
TimingEvent.LauncherTimings.JOB_COMPLETE);
-
-    Iterator<JobStatus> jobStatusIterator3 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName1, 
TimingEvent.LauncherTimings.JOB_RUN);
-    Iterator<JobStatus> jobStatusIterator4 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName2, 
TimingEvent.LauncherTimings.JOB_RUN);
-
-    Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName1, 
TimingEvent.LauncherTimings.JOB_RUN);
-    Iterator<JobStatus> jobStatusIterator6 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName2, 
TimingEvent.LauncherTimings.JOB_FAILED);
-
-    
Mockito.when(_jobStatusRetriever.getJobStatusesForFlowExecution(Mockito.anyString(),
 Mockito.anyString(),
-        Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
-        thenReturn(jobStatusIterator1).
-        thenReturn(jobStatusIterator2).
-        thenReturn(jobStatusIterator3).
-        thenReturn(jobStatusIterator4).
-        thenReturn(jobStatusIterator5).
-        thenReturn(jobStatusIterator6);
-
-    //Run the thread once. Ensure the first job is running
-    this._dagManagerThread.run();
-    Assert.assertEquals(this.dags.size(), 1);
-    Assert.assertTrue(this.dags.containsKey(dagId));
-    Assert.assertEquals(this.jobToDag.size(), 1);
-    Assert.assertTrue(this.jobToDag.containsKey(dag.getStartNodes().get(0)));
-    Assert.assertEquals(this.dagToJobs.get(dagId).size(), 1);
-    
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getStartNodes().get(0)));
-
-    //Run the thread 2nd time. Ensure the job0 is complete and job1 and job2 
are submitted.
-    this._dagManagerThread.run();
-    Assert.assertEquals(this.dags.size(), 1);
-    Assert.assertTrue(this.dags.containsKey(dagId));
-    Assert.assertEquals(this.jobToDag.size(), 2);
-    Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(0)));
-    Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(1)));
-    Assert.assertEquals(this.dagToJobs.get(dagId).size(), 2);
-    
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(0)));
-    
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(1)));
-
-    //Run the thread 3rd time. Ensure the job0 is complete and job1 and job2 
are running.
-    this._dagManagerThread.run();
-    Assert.assertEquals(this.dags.size(), 1);
-    Assert.assertTrue(this.dags.containsKey(dagId));
-    Assert.assertEquals(this.jobToDag.size(), 2);
-    Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(0)));
-    Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(1)));
-    Assert.assertEquals(this.dagToJobs.get(dagId).size(), 2);
-    
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(0)));
-    
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(1)));
-
-    //Run the thread 4th time. One of the jobs is failed and so the dag is 
failed and all state is cleaned up.
-    this._dagManagerThread.run();
-    Assert.assertEquals(this.dags.size(), 0);
-    Assert.assertEquals(this.jobToDag.size(), 0);
-    Assert.assertEquals(this.dagToJobs.size(), 0);
+  public void testFailedDag() throws URISyntaxException, IOException {
+    for (String failureOption: Lists.newArrayList("FINISH_RUNNING", 
"FINISH_ALL_POSSIBLE")) {
+      long flowExecutionId = System.currentTimeMillis();
+      String flowGroupId = "0";
+      String flowGroup = "group" + flowGroupId;
+      String flowName = "flow" + flowGroupId;
+      String jobName0 = "job0";
+      String jobName1 = "job1";
+      String jobName2 = "job2";
+
+      Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, 
failureOption, false);
+      String dagId = DagManagerUtils.generateDagId(dag);
+
+      //Add a dag to the queue of dags
+      this.queue.offer(dag);
+      Iterator<JobStatus> jobStatusIterator1 =
+          getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, 
jobName0, String.valueOf(ExecutionStatus.RUNNING));
+      Iterator<JobStatus> jobStatusIterator2 =
+          getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, 
jobName0, String.valueOf(ExecutionStatus.COMPLETE));
+      Iterator<JobStatus> jobStatusIterator3 =
+          getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, 
jobName1, String.valueOf(ExecutionStatus.RUNNING));
+      Iterator<JobStatus> jobStatusIterator4 =
+          getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, 
jobName2, String.valueOf(ExecutionStatus.RUNNING));
+      Iterator<JobStatus> jobStatusIterator5 =
+          getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, 
jobName1, String.valueOf(ExecutionStatus.RUNNING));
+      Iterator<JobStatus> jobStatusIterator6 =
+          getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, 
jobName2, String.valueOf(ExecutionStatus.FAILED));
+      Iterator<JobStatus> jobStatusIterator7 =
+          getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, 
jobName1, String.valueOf(ExecutionStatus.COMPLETE));
+      Iterator<JobStatus> jobStatusIterator8 =
+          getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, 
jobName1, String.valueOf(ExecutionStatus.RUNNING));
+      Iterator<JobStatus> jobStatusIterator9 =
+          getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, 
jobName1, String.valueOf(ExecutionStatus.COMPLETE));
+
+
+      Mockito.when(_jobStatusRetriever
+          .getJobStatusesForFlowExecution(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), 
Mockito.anyString())).
+          thenReturn(jobStatusIterator1).
+          thenReturn(jobStatusIterator2).
+          thenReturn(jobStatusIterator3).
+          thenReturn(jobStatusIterator4).
+          thenReturn(jobStatusIterator5).
+          thenReturn(jobStatusIterator6).
+          thenReturn(jobStatusIterator7).
+          thenReturn(jobStatusIterator8).
+          thenReturn(jobStatusIterator9);
+
+      //Run the thread once. Ensure the first job is running
+      this._dagManagerThread.run();
+      Assert.assertEquals(this.dags.size(), 1);
+      Assert.assertTrue(this.dags.containsKey(dagId));
+      Assert.assertEquals(this.jobToDag.size(), 1);
+      Assert.assertTrue(this.jobToDag.containsKey(dag.getStartNodes().get(0)));
+      Assert.assertEquals(this.dagToJobs.get(dagId).size(), 1);
+      
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getStartNodes().get(0)));
+
+      //Run the thread 2nd time. Ensure the job0 is complete and job1 and job2 
are submitted.
+      this._dagManagerThread.run();
+      Assert.assertEquals(this.dags.size(), 1);
+      Assert.assertTrue(this.dags.containsKey(dagId));
+      Assert.assertEquals(this.jobToDag.size(), 2);
+      Assert.assertTrue(this.jobToDag.containsKey(dag.getNodes().get(1)));
+      Assert.assertTrue(this.jobToDag.containsKey(dag.getNodes().get(2)));
+      Assert.assertEquals(this.dagToJobs.get(dagId).size(), 2);
+      
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getNodes().get(1)));
+      
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getNodes().get(2)));
+
+      //Run the thread 3rd time. Ensure the job0 is complete and job1 and job2 
are running.
+      this._dagManagerThread.run();
+      Assert.assertEquals(this.dags.size(), 1);
+      Assert.assertTrue(this.dags.containsKey(dagId));
+      Assert.assertEquals(this.jobToDag.size(), 2);
+      Assert.assertTrue(this.jobToDag.containsKey(dag.getNodes().get(1)));
+      Assert.assertTrue(this.jobToDag.containsKey(dag.getNodes().get(2)));
+      Assert.assertEquals(this.dagToJobs.get(dagId).size(), 2);
+      
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getNodes().get(1)));
+      
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getNodes().get(2)));
+
+      //Run the thread 4th time.
+      this._dagManagerThread.run();
+
+      if ("FINISH_RUNNING".equals(failureOption)) {
+        //One of the jobs is failed; so the dag is failed and all state is 
cleaned up.
+        Assert.assertEquals(this.dags.size(), 0);
+        Assert.assertEquals(this.jobToDag.size(), 0);
+        Assert.assertEquals(this.dagToJobs.size(), 0);
+        Assert.assertEquals(this._dagStateStore.getDags().size(), 0);
+      } else {
+        //One of the jobs is failed; but with finish_all_possible, some jobs 
can continue running.
+        for (int i = 0; i < 3; i++) {
+          Assert.assertEquals(this.dags.size(), 1);
+          Assert.assertEquals(this.jobToDag.size(), 1);
+          Assert.assertEquals(this.dagToJobs.get(dagId).size(), 1);
+          this._dagManagerThread.run();
+        }
+        //Ensure the state is cleaned up.
+        Assert.assertEquals(this.dags.size(), 0);
+        Assert.assertEquals(this.jobToDag.size(), 0);
+        Assert.assertEquals(this.dagToJobs.size(), 0);
+        Assert.assertEquals(this._dagStateStore.getDags().size(), 0);
+      }
+    }
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c103a8f6/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
deleted file mode 100644
index e97d860..0000000
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gobblin.service.modules.orchestration;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-import org.junit.Assert;
-import org.testng.annotations.Test;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
-
-import org.apache.gobblin.config.ConfigBuilder;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
-import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
-
-import static org.testng.Assert.*;
-
-
-public class DagManagerUtilsTest {
-
-  /**
-   * Create a {@link Dag <JobExecutionPlan>} with 2 parents and 1 child (i.e. 
a V-shaped dag).
-   * @return a Dag.
-   */
-  public Dag<JobExecutionPlan> buildDag() throws URISyntaxException {
-    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
-    Config baseConfig = ConfigBuilder.create().
-        addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group0").
-        addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow0").
-        addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
System.currentTimeMillis()).
-        addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group0").build();
-    for (int i = 0; i < 3; i++) {
-      String suffix = Integer.toString(i);
-      Config jobConfig = baseConfig.withValue(ConfigurationKeys.JOB_NAME_KEY, 
ConfigValueFactory.fromAnyRef("job" + suffix));
-      if (i == 2) {
-        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job0,job1"));
-      }
-      JobSpec js = JobSpec.builder("test_job" + 
suffix).withVersion(suffix).withConfig(jobConfig).
-          withTemplate(new URI("job" + suffix)).build();
-      SpecExecutor specExecutor = 
InMemorySpecExecutor.createDummySpecExecutor(new URI("job" + i));
-      JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, 
specExecutor);
-      jobExecutionPlans.add(jobExecutionPlan);
-    }
-    return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
-  }
-
-  @Test
-  public void testGetNext() throws URISyntaxException {
-    Dag<JobExecutionPlan> dag = buildDag();
-
-    Set<Dag.DagNode<JobExecutionPlan>> dagNodeSet = 
DagManagerUtils.getNext(dag);
-    Assert.assertEquals(dagNodeSet.size(), 2);
-
-    //Set 1st job to complete and 2nd job running state
-    JobExecutionPlan jobExecutionPlan1 = dag.getNodes().get(0).getValue();
-    jobExecutionPlan1.setExecutionStatus(ExecutionStatus.COMPLETE);
-    JobExecutionPlan jobExecutionPlan2 = dag.getNodes().get(1).getValue();
-    jobExecutionPlan2.setExecutionStatus(ExecutionStatus.RUNNING);
-
-    //No new job to run
-    dagNodeSet = DagManagerUtils.getNext(dag);
-    Assert.assertEquals(dagNodeSet.size(), 0);
-
-    //Set 2nd job to complete; we must have 3rd job to run next
-    jobExecutionPlan2.setExecutionStatus(ExecutionStatus.COMPLETE);
-    dagNodeSet = DagManagerUtils.getNext(dag);
-    Assert.assertEquals(dagNodeSet.size(), 1);
-
-    //Set the 3rd job to running state, no new jobs to run
-    JobExecutionPlan jobExecutionPlan3 = dag.getNodes().get(2).getValue();
-    jobExecutionPlan3.setExecutionStatus(ExecutionStatus.RUNNING);
-    dagNodeSet = DagManagerUtils.getNext(dag);
-    Assert.assertEquals(dagNodeSet.size(), 0);
-
-    //Set the 3rd job to complete; no new jobs to run
-    jobExecutionPlan3.setExecutionStatus(ExecutionStatus.COMPLETE);
-    dagNodeSet = DagManagerUtils.getNext(dag);
-    Assert.assertEquals(dagNodeSet.size(), 0);
-
-
-    dag = buildDag();
-    dagNodeSet = DagManagerUtils.getNext(dag);
-    Assert.assertEquals(dagNodeSet.size(), 2);
-    //Set 1st job to failed; no new jobs to run
-    jobExecutionPlan1 = dag.getNodes().get(0).getValue();
-    jobExecutionPlan1.setExecutionStatus(ExecutionStatus.FAILED);
-    dagNodeSet = DagManagerUtils.getNext(dag);
-    Assert.assertEquals(dagNodeSet.size(), 0);
-  }
-
-
-}
\ No newline at end of file

Reply via email to