[GOBBLIN-590] Implement Workflow Manager in Gobblin-as-a-Service (GaaS).

Closes #2456 from sv2000/workflow


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/71184cdb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/71184cdb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/71184cdb

Branch: refs/heads/master
Commit: 71184cdb1bb35ac5874fffd1415ca632c6ee92c0
Parents: 391615e
Author: sv2000 <[email protected]>
Authored: Tue Oct 2 09:23:34 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Tue Oct 2 09:23:34 2018 -0700

----------------------------------------------------------------------
 .../gobblin/service/ServiceConfigKeys.java      |   3 +-
 .../gobblin/metrics/event/TimingEvent.java      |   5 +
 .../apache/gobblin/service/FlowStatusTest.java  |  19 +-
 .../modules/core/GobblinServiceManager.java     |  41 +-
 .../service/modules/flow/FlowGraphPath.java     |   2 -
 .../gobblin/service/modules/flowgraph/Dag.java  |   5 +-
 .../modules/orchestration/DagManager.java       | 440 +++++++++++++++++++
 .../modules/orchestration/DagManagerUtils.java  | 109 +++++
 .../modules/orchestration/DagStateStore.java    |  53 +++
 .../modules/orchestration/FSDagStateStore.java  | 148 +++++++
 .../modules/orchestration/Orchestrator.java     |  92 ++--
 .../scheduler/GobblinServiceJobScheduler.java   |  14 +-
 .../service/modules/spec/JobExecutionPlan.java  |  10 +-
 .../spec/JobExecutionPlanListDeserializer.java  | 108 +++++
 .../spec/JobExecutionPlanListSerializer.java    |  90 ++++
 .../modules/spec/SerializationConstants.java    |  33 ++
 .../core/IdentityFlowToJobSpecCompilerTest.java |   4 +-
 .../service/modules/flowgraph/DagTest.java      |   8 +-
 .../modules/orchestration/DagManagerTest.java   | 276 ++++++++++++
 .../orchestration/DagManagerUtilsTest.java      | 117 +++++
 .../orchestration/FSDagStateStoreTest.java      | 154 +++++++
 .../modules/orchestration/OrchestratorTest.java |   2 +-
 22 files changed, 1660 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index c9b034c..255b1f7 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -32,6 +32,7 @@ public class ServiceConfigKeys {
   public static final String GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "restliServer.enabled";
   public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY 
= GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled";
   public static final String GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "gitConfigMonitor.enabled";
+  public static final String GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "dagManager.enabled";
 
   // Helix / ServiceScheduler Keys
   public static final String HELIX_CLUSTER_NAME_KEY = GOBBLIN_SERVICE_PREFIX + 
"helix.cluster.name";
@@ -113,5 +114,5 @@ public class ServiceConfigKeys {
   public static final String GOBBLIN_SERVICE_LOG4J_CONFIGURATION_FILE = 
"log4j-service.properties";
   // GAAS Listerning Port
   public static final String SERVICE_PORT = GOBBLIN_SERVICE_PREFIX + "port";
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
----------------------------------------------------------------------
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index b00465f..71dd947 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -58,6 +58,11 @@ public class TimingEvent {
     public static final String FLOW_COMPILED = "FlowCompiled";
   }
 
+  public static class JobStatusTimings {
+    public static final String JOB_STATUS_POLLED = "JobStatusPolled";
+    public static final String ALL_JOB_STATUSES_POLLED = 
"AllJobStatusesPolled";
+  }
+
   public static class FlowEventConstants {
     public static final String FLOW_NAME_FIELD = "flowName";
     public static final String FLOW_GROUP_FIELD = "flowGroup";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index d5e857e..b8592ad 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.service;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
@@ -40,7 +41,7 @@ import org.apache.gobblin.restli.EmbeddedRestliServer;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 
-@Test(groups = { "gobblin.service" }, singleThreaded=true)
+@Test(groups = { "gobblin.service" }, singleThreaded = true)
 public class FlowStatusTest {
   private FlowStatusClient _client;
   private EmbeddedRestliServer _server;
@@ -51,7 +52,21 @@ public class FlowStatusTest {
     @Override
     public Iterator<org.apache.gobblin.service.monitoring.JobStatus> 
getJobStatusesForFlowExecution(String flowName,
         String flowGroup, long flowExecutionId) {
-      return _listOfJobStatusLists.get((int)flowExecutionId).iterator();
+      return _listOfJobStatusLists.get((int) flowExecutionId).iterator();
+    }
+
+    @Override
+    public Iterator<org.apache.gobblin.service.monitoring.JobStatus> 
getJobStatusesForFlowExecution(String flowName, String flowGroup,
+        long flowExecutionId, String jobGroup, String jobName) {
+      Iterator<org.apache.gobblin.service.monitoring.JobStatus> 
jobStatusIterator = getJobStatusesForFlowExecution(flowName, flowGroup, 
flowExecutionId);
+      List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = 
new ArrayList<>();
+      while (jobStatusIterator.hasNext()) {
+        org.apache.gobblin.service.monitoring.JobStatus jobStatus = 
jobStatusIterator.next();
+        if (jobStatus.getJobGroup().equals(jobGroup) && 
jobStatus.getJobName().equals(jobName)) {
+          jobStatusList.add(jobStatus);
+        }
+      }
+      return jobStatusList.iterator();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 8d934ce..243f7e6 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -85,6 +85,7 @@ import org.apache.gobblin.service.FlowConfigsV2Resource;
 import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.Schedule;
 import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
@@ -116,6 +117,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
   protected final boolean isRestLIServerEnabled;
   protected final boolean isTopologySpecFactoryEnabled;
   protected final boolean isGitConfigMonitorEnabled;
+  protected final boolean isDagManagerEnabled;
 
   protected TopologyCatalog topologyCatalog;
   @Getter
@@ -138,6 +140,8 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
 
   protected GitConfigMonitor gitConfigMonitor;
 
+  protected DagManager dagManager;
+
   @Getter
   protected Config config;
   private final MetricContext metricContext;
@@ -159,8 +163,8 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
     this.serviceLauncher = new ServiceBasedAppLauncher(properties, 
serviceName);
 
     this.fs = buildFileSystem(config);
-    this.serviceWorkDir = serviceWorkDirOptional.isPresent() ? 
serviceWorkDirOptional.get() :
-        getServiceWorkDirPath(this.fs, serviceName, serviceId);
+    this.serviceWorkDir = serviceWorkDirOptional.isPresent() ? 
serviceWorkDirOptional.get()
+        : getServiceWorkDirPath(this.fs, serviceName, serviceId);
 
     // Initialize TopologyCatalog
     this.isTopologyCatalogEnabled = ConfigUtils.getBoolean(config,
@@ -182,14 +186,24 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
       this.isGitConfigMonitorEnabled = ConfigUtils.getBoolean(config,
           ServiceConfigKeys.GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY, 
false);
 
+      this.isDagManagerEnabled = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, false);
+
       if (this.isGitConfigMonitorEnabled) {
         this.gitConfigMonitor = new GitConfigMonitor(config, this.flowCatalog);
         this.serviceLauncher.addService(this.gitConfigMonitor);
       }
+
+      if (this.isDagManagerEnabled) {
+        this.dagManager = new DagManager(config);
+        this.serviceLauncher.addService(this.dagManager);
+      }
     } else {
       this.isGitConfigMonitorEnabled = false;
+      this.isDagManagerEnabled = false;
     }
 
+
+
     // Initialize Helix
     Optional<String> zkConnectionString = 
Optional.fromNullable(ConfigUtils.getString(config,
         ServiceConfigKeys.ZK_CONNECTION_STRING_KEY, null));
@@ -206,7 +220,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
     this.isSchedulerEnabled = ConfigUtils.getBoolean(config,
         ServiceConfigKeys.GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY, true);
     if (isSchedulerEnabled) {
-      this.orchestrator = new Orchestrator(config, 
Optional.of(this.topologyCatalog), Optional.of(LOGGER));
+      this.orchestrator = new Orchestrator(config, 
Optional.of(this.topologyCatalog), Optional.fromNullable(this.dagManager), 
Optional.of(LOGGER));
       SchedulerService schedulerService = new SchedulerService(properties);
 
       this.scheduler = new GobblinServiceJobScheduler(this.serviceName, 
config, this.helixManager,
@@ -327,6 +341,10 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
       if (this.isGitConfigMonitorEnabled) {
         this.gitConfigMonitor.setActive(true);
       }
+
+      if (this.isDagManagerEnabled) {
+        this.dagManager.setActive(true);
+      }
     } else if (this.helixManager.isPresent()) {
       LOGGER.info("Leader lost notification for {} HM.isLeader {}", 
this.helixManager.get().getInstanceName(),
           this.helixManager.get().isLeader());
@@ -338,6 +356,10 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
       if (this.isGitConfigMonitorEnabled) {
         this.gitConfigMonitor.setActive(false);
       }
+
+      if (this.isDagManagerEnabled) {
+        this.dagManager.setActive(false);
+      }
     }
   }
 
@@ -371,6 +393,11 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
         if (this.isGitConfigMonitorEnabled) {
           this.gitConfigMonitor.setActive(true);
         }
+
+        if (this.isDagManagerEnabled) {
+          this.dagManager.setActive(true);
+        }
+
       } else {
         if (this.isSchedulerEnabled) {
           LOGGER.info("[Init] Gobblin Service is running in slave instance 
mode, not enabling Scheduler.");
@@ -385,6 +412,11 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
       if (this.isGitConfigMonitorEnabled) {
         this.gitConfigMonitor.setActive(true);
       }
+
+      if (this.isDagManagerEnabled) {
+        this.dagManager.setActive(true);
+      }
+
     }
 
     // Populate TopologyCatalog with all Topologies generated by 
TopologySpecFactory
@@ -479,7 +511,8 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
     private ContextAwareHistogram serviceLeadershipChange;
 
     public Metrics(final MetricContext metricContext, Config config) {
-      int timeWindowSizeInMinutes = ConfigUtils.getInt(config, 
ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, 
ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
+      int timeWindowSizeInMinutes = ConfigUtils.getInt(config, 
ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
+          ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
       this.serviceLeadershipChange = 
metricContext.contextAwareHistogram(SERVICE_LEADERSHIP_CHANGE, 
timeWindowSizeInMinutes, TimeUnit.MINUTES);
       this.contextAwareMetrics.add(this.serviceLeadershipChange);
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
index 1b7ce11..4b81f1f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
@@ -17,13 +17,11 @@
 
 package org.apache.gobblin.service.modules.flow;
 
-import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 
 import lombok.Getter;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
index 3606897..b41d1d0 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.service.modules.flowgraph;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -79,11 +80,11 @@ public class Dag<T> {
   }
 
   public List<DagNode<T>> getChildren(DagNode node) {
-    return parentChildMap.getOrDefault(node, null);
+    return parentChildMap.getOrDefault(node, Collections.EMPTY_LIST);
   }
 
   public List<DagNode<T>> getParents(DagNode node) {
-    return node.parentNodes;
+    return (node.parentNodes != null)? node.parentNodes : 
Collections.EMPTY_LIST;
   }
 
   public boolean isEmpty() {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
new file mode 100644
index 0000000..ac304fb
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * This class implements a manager to manage the life cycle of a {@link Dag}. 
A {@link Dag} is submitted to the
+ * {@link DagManager} by the {@link Orchestrator#orchestrate(Spec)} method. On 
receiving a {@link Dag}, the
+ * {@link DagManager} first persists the {@link Dag} to the {@link 
DagStateStore}, and then submits it to a {@link BlockingQueue}.
+ * This guarantees that each {@link Dag} received by the {@link DagManager} 
can be recovered in case of a leadership
+ * change or service restart.
+ *
+ * The implementation of the {@link DagManager} is multi-threaded. Each {@link 
DagManagerThread} polls the
+ * {@link BlockingQueue} for new Dag submissions at fixed intervals. It deques 
any newly submitted Dags and coordinates
+ * the execution of individual jobs in the Dag. The coordination logic 
involves polling the {@link JobStatus}es of running
+ * jobs. Upon completion of a job, it will either schedule the next job in the 
Dag (on SUCCESS) or mark the Dag as failed
+ * (on FAILURE). Upon completion of a Dag execution, it will perform the 
required clean up actions.
+ *
+ * The {@link DagManager} is active only in the leader mode. To ensure, each 
{@link Dag} managed by a {@link DagManager} is
+ * checkpointed to a persistent location. On start up or leadership change,
+ * the {@link DagManager} loads all the checkpointed {@link Dag}s and adds 
them to the {@link  BlockingQueue}.
+ * Current implementation supports only FileSystem-based checkpointing of the 
Dag statuses.
+ */
+@Alpha
+@Slf4j
+public class DagManager extends AbstractIdleService {
+  private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+  private static final Integer TERMINATION_TIMEOUT = 30;
+
+  public static final String DAG_MANAGER_PREFIX = 
"gobblin.service.dagManager.";
+  public static final String NUM_THREADS_KEY = DAG_MANAGER_PREFIX + 
"numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
DAG_MANAGER_PREFIX + "pollingInterval";
+  public static final String JOB_STATUS_RETRIEVER_KEY = DAG_MANAGER_PREFIX + 
"jobStatusRetriever";
+  public static final String DAG_STORE_CLASS_KEY = DAG_MANAGER_PREFIX + 
"dagStateStoreClass";
+  public static final String DAG_STATESTORE_DIR = DAG_MANAGER_PREFIX + 
"dagStateStoreDir";
+
+  private BlockingQueue<Dag<JobExecutionPlan>> queue;
+  private ScheduledExecutorService scheduledExecutorPool;
+  private boolean instrumentationEnabled;
+
+  private final Integer numThreads;
+  private final Integer pollingInterval;
+  private final JobStatusRetriever jobStatusRetriever;
+  private final DagStateStore dagStateStore;
+  private volatile boolean isActive = false;
+
+  public DagManager(Config config, boolean instrumentationEnabled) {
+    this.queue = new LinkedBlockingDeque<>();
+    this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
+    this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
+    this.pollingInterval = ConfigUtils.getInt(config, 
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
+    this.instrumentationEnabled = instrumentationEnabled;
+
+    try {
+      Class jobStatusRetrieverClass = 
Class.forName(config.getString(JOB_STATUS_RETRIEVER_KEY));
+      this.jobStatusRetriever =
+          (JobStatusRetriever) 
GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass);
+      Class dagStateStoreClass = 
Class.forName(config.getString(DAG_STORE_CLASS_KEY));
+      this.dagStateStore = (DagStateStore) 
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config);
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException("Exception encountered during DagManager 
initialization", e);
+    }
+  }
+
+  public DagManager(Config config) {
+    this(config, true);
+  }
+
+  /** Start the service. On startup, the service launches a fixed pool of 
{@link DagManagerThread}s, which are scheduled at
+   * fixed intervals. The service also loads any {@link Dag}s
+   */
+  @Override
+  protected void startUp() {
+    //On startup, the service creates tasks that are scheduled at a fixed rate.
+    for (int i = 0; i < numThreads; i++) {
+      this.scheduledExecutorPool.scheduleAtFixedRate(new 
DagManagerThread(jobStatusRetriever, dagStateStore, queue, 
instrumentationEnabled), 0, this.pollingInterval,
+          TimeUnit.SECONDS);
+    }
+  }
+
+  /**
+   * Method to submit a {@link Dag} to the {@link DagManager}. The {@link 
DagManager} first persists the
+   * submitted dag to the {@link DagStateStore} and then adds the dag to a 
{@link BlockingQueue} to be picked up
+   * by one of the {@link DagManagerThread}s.
+   */
+  public synchronized void offer(Dag<JobExecutionPlan> dag) throws IOException 
{
+    //Persist the dag
+    this.dagStateStore.writeCheckpoint(dag);
+    //Add it to the queue of dags
+    if (!this.queue.offer(dag)) {
+      throw new IOException("Could not add dag" + 
DagManagerUtils.generateDagId(dag) + "to queue");
+    }
+  }
+
+  /**
+   * When a {@link DagManager} becomes active, it loads the serialized 
representations of the currently running {@link Dag}s
+   * from the checkpoint directory, deserializes the {@link Dag}s and adds 
them to a queue to be consumed by
+   * the {@link DagManagerThread}s.
+   * @param active a boolean to indicate if the {@link DagManager} is the 
leader.
+   */
+  public synchronized void setActive(boolean active) {
+    this.isActive = active;
+    try {
+      if (this.isActive) {
+        for (Dag<JobExecutionPlan> dag : dagStateStore.getDags()) {
+          offer(dag);
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Exception encountered when activating the 
new DagManager", e);
+    }
+  }
+
+  /**
+   * Each {@link DagManagerThread} performs 2 actions when scheduled:
+   * <ol>
+   *   <li> Dequeues any newly submitted {@link Dag}s from the Dag queue. All 
the {@link JobExecutionPlan}s which
+   *   are part of the dequed {@link Dag} will be managed this thread. </li>
+   *   <li> Polls the job status store for the current job statuses of all the 
running jobs it manages.</li>
+   * </ol>
+   */
+  public static class DagManagerThread implements Runnable {
+    private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new HashMap<>();
+    private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
+    private final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> 
dagToJobs = new HashMap<>();
+    private final Set<String> failedDagIds = new HashSet<>();
+    private final MetricContext metricContext;
+    private final Optional<EventSubmitter> eventSubmitter;
+
+    private JobStatusRetriever jobStatusRetriever;
+    private DagStateStore dagStateStore;
+    private BlockingQueue<Dag<JobExecutionPlan>> queue;
+
+    /**
+     * Constructor.
+     */
+    public DagManagerThread(JobStatusRetriever jobStatusRetriever, 
DagStateStore dagStateStore, BlockingQueue<Dag<JobExecutionPlan>> queue,
+        boolean instrumentationEnabled) {
+      this.jobStatusRetriever = jobStatusRetriever;
+      this.dagStateStore = dagStateStore;
+      this.queue = queue;
+      if (instrumentationEnabled) {
+        this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+        this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(this.metricContext, 
"org.apache.gobblin.service").build());
+      } else {
+        this.metricContext = null;
+        this.eventSubmitter = Optional.absent();
+      }
+    }
+
+    /**
+     * Main body of the {@link DagManagerThread}.
+     */
+    @Override
+    public void run() {
+      try {
+        Object nextItem = queue.poll();
+        //Poll the queue for a new Dag to execute.
+        if (nextItem != null) {
+          Dag<JobExecutionPlan> dag = (Dag<JobExecutionPlan>) nextItem;
+
+          //Initialize dag.
+          initialize(dag);
+        }
+        log.info("Polling job statuses..");
+        TimingEvent jobStatusPollTimer = this.eventSubmitter.isPresent()
+            ? 
eventSubmitter.get().getTimingEvent(TimingEvent.JobStatusTimings.ALL_JOB_STATUSES_POLLED)
+            : null;
+        //Poll and update the job statuses of running jobs.
+        pollJobStatuses();
+        log.info("Poll done.");
+        if (jobStatusPollTimer != null) {
+          jobStatusPollTimer.stop();
+        }
+        //Clean up any finished dags
+        log.info("Cleaning up finished dags..");
+        cleanUp();
+        log.info("Clean up done");
+      } catch (Exception e) {
+        log.error("Exception encountered in {}", getClass().getName(), e);
+      }
+    }
+
+    /**
+     * This method determines the next set of jobs to execute from the dag and 
submits them for execution.
+     * This method updates internal data structures tracking currently running 
Dags and jobs.
+     */
+    private void initialize(Dag<JobExecutionPlan> dag)
+        throws IOException {
+      //Add Dag to the map of running dags
+      String dagId = DagManagerUtils.generateDagId(dag);
+      log.info("Initializing Dag {}", dagId);
+      if (this.dags.containsKey(dagId)) {
+        log.warn("Already tracking a dag with dagId {}, skipping.", dagId);
+        return;
+      }
+
+      this.dags.put(dagId, dag);
+      log.info("Dag {} - determining if any jobs are already running.");
+      //Are there any jobs already in the running state? This check is for 
Dags already running
+      //before a leadership change occurs.
+      for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+        if (DagManagerUtils.getExecutionStatus(dagNode) == 
ExecutionStatus.RUNNING) {
+          addJobState(dagId, dagNode);
+        }
+      }
+      log.info("Dag {} submitting jobs ready for execution.");
+      //Determine the next set of jobs to run and submit them for execution
+      submitNext(dagId);
+      log.info("Dag {} Initialization complete.");
+    }
+
+    /**
+     * Poll the statuses of running jobs.
+     * @return List of {@link JobStatus}es.
+     */
+    private void pollJobStatuses()
+        throws IOException {
+      this.failedDagIds.clear();
+      for (Dag.DagNode<JobExecutionPlan> node : this.jobToDag.keySet()) {
+        TimingEvent jobStatusPollTimer = this.eventSubmitter.isPresent()
+            ? 
eventSubmitter.get().getTimingEvent(TimingEvent.JobStatusTimings.JOB_STATUS_POLLED)
+            : null;
+        JobStatus jobStatus = pollJobStatus(node);
+        if (jobStatusPollTimer != null) {
+          jobStatusPollTimer.stop();
+        }
+        Preconditions.checkNotNull(jobStatus, "Received null job status for a 
running job " + DagManagerUtils.getJobName(node));
+        JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(node);
+        //TODO: This will be updated when JobStatus schema provides the 
correct execution status.
+        //Currently, it is a placeholder.
+        switch (jobStatus.getEventName()) {
+          case TimingEvent.LauncherTimings.JOB_COMPLETE:
+            jobExecutionPlan.setExecutionStatus(ExecutionStatus.COMPLETE);
+            onJobFinish(node);
+            break;
+          case TimingEvent.LauncherTimings.JOB_FAILED:
+            jobExecutionPlan.setExecutionStatus(ExecutionStatus.FAILED);
+            onJobFinish(node);
+            break;
+          default:
+            jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
+            break;
+        }
+      }
+    }
+
+    /**
+     * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
+     */
+    private JobStatus pollJobStatus(Dag.DagNode<JobExecutionPlan> dagNode) {
+      Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+      String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+      String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+      long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+      String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
+      String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+
+      Iterator<JobStatus> jobStatusIterator =
+          this.jobStatusRetriever.getJobStatusesForFlowExecution(flowGroup, 
flowName, flowExecutionId, jobGroup, jobName);
+      if (jobStatusIterator.hasNext()) {
+        return jobStatusIterator.next();
+      } else {
+        return null;
+      }
+    }
+
+    public void submitNext(String dagId)
+        throws IOException {
+      Dag<JobExecutionPlan> dag = this.dags.get(dagId);
+      //Submit jobs from the dag ready for execution.
+      for (Dag.DagNode<JobExecutionPlan> dagNode : 
DagManagerUtils.getNext(dag)) {
+        submitJob(dagNode);
+        addJobState(dagId, dagNode);
+      }
+      //Checkpoint the dag state
+      this.dagStateStore.writeCheckpoint(dag);
+    }
+
+    /**
+     * Submits a {@link JobSpec} to a {@link 
org.apache.gobblin.runtime.api.SpecExecutor}.
+     */
+    private void submitJob(Dag.DagNode<JobExecutionPlan> dagNode) {
+      JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(dagNode);
+      jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
+      JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
+
+      // Run this spec on selected executor
+      SpecProducer producer = null;
+      try {
+        producer = DagManagerUtils.getSpecProducer(dagNode);
+        Config jobConfig = DagManagerUtils.getJobConfig(dagNode);
+        if (!jobConfig.hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+          log.warn("JobSpec does not contain flowExecutionId.");
+        }
+        log.info("Submitting job: {} on executor: {}", jobSpec, producer);
+        producer.addSpec(jobSpec);
+      } catch (Exception e) {
+        log.error("Cannot submit job: {} on executor: {}", jobSpec, producer, 
e);
+      }
+    }
+
+    /**
+     * Method that defines the actions to be performed when a job finishes 
either successfully or with failure.
+     * This method updates the state of the dag and performs clean up actions 
as necessary.
+     */
+    private void onJobFinish(Dag.DagNode<JobExecutionPlan> dagNode)
+        throws IOException {
+      Dag<JobExecutionPlan> dag = this.jobToDag.get(dagNode);
+      String dagId = DagManagerUtils.generateDagId(dag);
+      String jobName = DagManagerUtils.getJobName(dagNode);
+      ExecutionStatus jobStatus = DagManagerUtils.getExecutionStatus(dagNode);
+      log.info("Job {} of Dag {} has finished with status {}", jobName, dagId, 
jobStatus.name());
+
+      deleteJobState(dagId, dagNode);
+
+      if (jobStatus == ExecutionStatus.COMPLETE) {
+        submitNext(dagId);
+      } else if (jobStatus == ExecutionStatus.FAILED) {
+        this.failedDagIds.add(dagId);
+      }
+    }
+
+    private void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> 
dagNode) {
+      this.jobToDag.remove(dagNode);
+      this.dagToJobs.get(dagId).remove(dagNode);
+    }
+
+    private void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> 
dagNode) {
+      Dag<JobExecutionPlan> dag = this.dags.get(dagId);
+      this.jobToDag.put(dagNode, dag);
+      if (this.dagToJobs.containsKey(dagId)) {
+        this.dagToJobs.get(dagId).add(dagNode);
+      } else {
+        LinkedList<Dag.DagNode<JobExecutionPlan>> dagNodeList = 
Lists.newLinkedList();
+        dagNodeList.add(dagNode);
+        this.dagToJobs.put(dagId, dagNodeList);
+      }
+    }
+
+    private boolean hasRunningJobs(String dagId) {
+      return !this.dagToJobs.get(dagId).isEmpty();
+    }
+
+    /**
+     * Perform clean up. Remove a dag from the dagstore if the dag is complete 
and update internal state.
+     */
+    private void cleanUp() {
+      //Clean up failed dags
+      for (String dagId : this.failedDagIds) {
+        //Skip monitoring of any other jobs of the failed dag.
+        LinkedList<Dag.DagNode<JobExecutionPlan>> dagNodeList = 
this.dagToJobs.get(dagId);
+        while (!dagNodeList.isEmpty()) {
+          Dag.DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
+          deleteJobState(dagId, dagNode);
+        }
+        log.info("Dag {} has finished with status FAILED; Cleaning up dag from 
the state store.", dagId);
+        cleanUpDag(dagId);
+      }
+
+      //Clean up successfully completed dags
+      for (String dagId : this.dags.keySet()) {
+        if (!hasRunningJobs(dagId)) {
+          log.info("Dag {} has finished with status COMPLETE; Cleaning up dag 
from the state store.", dagId);
+          cleanUpDag(dagId);
+        }
+      }
+    }
+
+    private void cleanUpDag(String dagId) {
+      Dag<JobExecutionPlan> dag = this.dags.get(dagId);
+      this.dagToJobs.remove(dagId);
+      this.dags.remove(dagId);
+      this.dagStateStore.cleanUp(dag);
+    }
+  }
+
+  /** Stop the service. */
+  @Override
+  protected void shutDown()
+      throws Exception {
+    this.scheduledExecutorPool.shutdown();
+    this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, 
TimeUnit.SECONDS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
new file mode 100644
index 0000000..348aa78
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+public class DagManagerUtils {
+  /**
+   * Generate a dagId from the given {@link Dag} instance.
+   * @param dag instance of a {@link Dag}.
+   * @return a String id associated corresponding to the {@link Dag} instance.
+   */
+  public static String generateDagId(Dag<JobExecutionPlan> dag) {
+    Config jobConfig = 
dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
+    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    Long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+    return Joiner.on("_").join(flowGroup, flowName, flowExecutionId);
+  }
+
+  public static String getJobName(Dag.DagNode<JobExecutionPlan> dagNode) {
+    return 
dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY);
+  }
+
+  public static JobExecutionPlan 
getJobExecutionPlan(Dag.DagNode<JobExecutionPlan> dagNode) {
+    return dagNode.getValue();
+  }
+
+  public static JobSpec getJobSpec(Dag.DagNode<JobExecutionPlan> dagNode) {
+    return dagNode.getValue().getJobSpec();
+  }
+
+  public static Config getJobConfig(Dag.DagNode<JobExecutionPlan> dagNode) {
+    return dagNode.getValue().getJobSpec().getConfig();
+  }
+
+  public static SpecProducer getSpecProducer(Dag.DagNode<JobExecutionPlan> 
dagNode)
+      throws ExecutionException, InterruptedException {
+    return dagNode.getValue().getSpecExecutor().getProducer().get();
+  }
+
+  public static ExecutionStatus 
getExecutionStatus(Dag.DagNode<JobExecutionPlan> dagNode) {
+    return dagNode.getValue().getExecutionStatus();
+  }
+
+  /**
+   * Traverse the dag to determine the next set of nodes to be executed. It 
starts with the startNodes of the dag and
+   * identifies each node yet to be executed and for which each of its parent 
nodes is in the {@link ExecutionStatus#COMPLETE}
+   * state.
+   */
+  public static Set<Dag.DagNode<JobExecutionPlan>> 
getNext(Dag<JobExecutionPlan> dag) {
+    Set<Dag.DagNode<JobExecutionPlan>> nextNodesToExecute = new HashSet<>();
+    LinkedList<Dag.DagNode<JobExecutionPlan>> nodesToExpand = 
Lists.newLinkedList(dag.getStartNodes());
+
+    while (!nodesToExpand.isEmpty()) {
+      Dag.DagNode<JobExecutionPlan> node = nodesToExpand.poll();
+      boolean addFlag = true;
+      if (getExecutionStatus(node) == ExecutionStatus.$UNKNOWN) {
+        //Add a node to be executed next, only if all of its parent nodes are 
COMPLETE.
+        List<Dag.DagNode<JobExecutionPlan>> parentNodes = dag.getParents(node);
+        for (Dag.DagNode<JobExecutionPlan> parentNode : parentNodes) {
+          if (getExecutionStatus(parentNode) != ExecutionStatus.COMPLETE) {
+            addFlag = false;
+            break;
+          }
+        }
+        if (addFlag) {
+          nextNodesToExecute.add(node);
+        }
+      } else if (getExecutionStatus(node) == ExecutionStatus.COMPLETE) {
+        //Explore the children of COMPLETED node as next candidates for 
execution.
+        nodesToExpand.addAll(dag.getChildren(node));
+      } else {
+        return new HashSet<>();
+      }
+    }
+    return nextNodesToExecute;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
new file mode 100644
index 0000000..0984a91
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface for storing and retrieving currently running {@link 
Dag<JobExecutionPlan>}s. In case of a leadership
+ * change in the {@link 
org.apache.gobblin.service.modules.core.GobblinServiceManager}, the 
corresponding {@link DagManager}
+ * loads the running {@link Dag}s from the {@link DagStateStore} to resume 
their execution.
+ */
+@Alpha
+public interface DagStateStore {
+  /**
+   * Persist the {@link Dag} to the backing store.
+   * @param dag
+   */
+  public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException;
+
+  /**
+   * Delete the {@link Dag} from the backing store, typically upon completion 
of execution.
+   * @param dag
+   */
+  public void cleanUp(Dag<JobExecutionPlan> dag);
+
+  /**
+   * Load all currently running {@link Dag}s from the underlying store. 
Typically, invoked when a new {@link DagManager}
+   * takes over or on restart of service.
+   * @return a {@link List} of currently running {@link Dag}s.
+   */
+  public List<Dag<JobExecutionPlan>> getDags() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
new file mode 100644
index 0000000..0720210
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonSerializer;
+import com.google.gson.reflect.TypeToken;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import 
org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer;
+
+
+@Alpha
+@Slf4j
+public class FSDagStateStore implements DagStateStore {
+  public static final String DAG_FILE_EXTENSION = ".dag";
+  /** Use gson for ser/de */
+  //private static final Gson GSON = 
GsonInterfaceAdapter.getGson(Object.class);
+
+  /** Type token for ser/de JobExecutionPlan list */
+  private static final Type LIST_JOBEXECUTIONPLAN_TYPE = new 
TypeToken<List<JobExecutionPlan>>(){}.getType();
+
+  private final String dagCheckpointDir;
+  private final Gson gson;
+
+  public FSDagStateStore(Config config) {
+    this.dagCheckpointDir = config.getString(DagManager.DAG_STATESTORE_DIR);
+    JsonSerializer<List<JobExecutionPlan>> serializer = new 
JobExecutionPlanListSerializer();
+    JsonDeserializer<List<JobExecutionPlan>> deserializer = new 
JobExecutionPlanListDeserializer();
+    this.gson = new 
GsonBuilder().registerTypeAdapter(LIST_JOBEXECUTIONPLAN_TYPE, serializer)
+        .registerTypeAdapter(LIST_JOBEXECUTIONPLAN_TYPE, 
deserializer).create();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public synchronized void writeCheckpoint(Dag<JobExecutionPlan> dag) throws 
IOException {
+    // write to a temporary name then rename to make the operation atomic when 
the file system allows a file to be
+    // replaced
+    String fileName = DagManagerUtils.generateDagId(dag) + DAG_FILE_EXTENSION;
+    String serializedDag = serializeDag(dag);
+
+    File checkpointDir = new File(this.dagCheckpointDir);
+    if (!checkpointDir.exists()) {
+      if (!checkpointDir.mkdirs()) {
+        throw new IOException("Could not create dir - " + 
this.dagCheckpointDir);
+      }
+    }
+
+    File tmpCheckpointFile = new File(this.dagCheckpointDir, fileName + 
".tmp");
+    File checkpointFile = new File(this.dagCheckpointDir, fileName);
+
+    Files.write(serializedDag, tmpCheckpointFile, Charsets.UTF_8);
+    Files.move(tmpCheckpointFile, checkpointFile);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public synchronized void cleanUp(Dag<JobExecutionPlan> dag) {
+    String fileName = DagManagerUtils.generateDagId(dag) + DAG_FILE_EXTENSION;
+
+    //Delete the dag checkpoint file from the checkpoint directory
+    File checkpointFile = new File(this.dagCheckpointDir, fileName);
+    if (!checkpointFile.delete()) {
+      log.error("Could not delete checkpoint file: {}", 
checkpointFile.getName());
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<Dag<JobExecutionPlan>> getDags() throws IOException {
+    List<Dag<JobExecutionPlan>> runningDags = Lists.newArrayList();
+    File dagCheckpointFolder = new File(this.dagCheckpointDir);
+    for (File file : dagCheckpointFolder.listFiles((dir, name) -> 
name.endsWith(DAG_FILE_EXTENSION))) {
+        runningDags.add(getDag(file));
+    }
+    return runningDags;
+  }
+
+  /**
+   * Return a {@link Dag} given a file name.
+   * @param dagFile
+   * @return the {@link Dag} associated with the dagFiel.
+   */
+  @VisibleForTesting
+  public Dag<JobExecutionPlan> getDag(File dagFile) throws IOException {
+    String serializedDag = Files.toString(dagFile, Charsets.UTF_8);
+    return deserializeDag(serializedDag);
+  }
+
+  /**
+   * Serialize a {@link Dag<JobExecutionPlan>}.
+   * @param dag A Dag parametrized by type {@link JobExecutionPlan}.
+   * @return a JSON string representation of the Dag object.
+   */
+  private String serializeDag(Dag<JobExecutionPlan> dag) {
+    List<JobExecutionPlan> jobExecutionPlanList = 
dag.getNodes().stream().map(Dag.DagNode::getValue).collect(Collectors.toList());
+    return gson.toJson(jobExecutionPlanList, LIST_JOBEXECUTIONPLAN_TYPE);
+  }
+
+  /**
+   * De-serialize a Dag.
+   * @param jsonDag A string representation of a Dag.
+   * @return a {@link Dag} parametrized by {@link JobExecutionPlan}.
+   */
+  private Dag<JobExecutionPlan> deserializeDag(String jsonDag) {
+    return new JobExecutionPlanDagFactory().createDag(gson.fromJson(jsonDag, 
LIST_JOBEXECUTIONPLAN_TYPE));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 1f7f737..ea0d0bd 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -75,6 +75,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
   protected final Logger _log;
   protected final SpecCompiler specCompiler;
   protected final Optional<TopologyCatalog> topologyCatalog;
+  protected final Optional<DagManager> dagManager;
 
   protected final MetricContext metricContext;
   protected final Optional<EventSubmitter> eventSubmitter;
@@ -87,7 +88,8 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
   private final ClassAliasResolver<SpecCompiler> aliasResolver;
 
-  public Orchestrator(Config config, Optional<TopologyCatalog> 
topologyCatalog, Optional<Logger> log, boolean instrumentationEnabled) {
+  public Orchestrator(Config config, Optional<TopologyCatalog> 
topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log,
+      boolean instrumentationEnabled) {
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
     if (instrumentationEnabled) {
       this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
IdentityFlowToJobSpecCompiler.class);
@@ -95,8 +97,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
       this.flowOrchestrationFailedMeter = 
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER));
       this.flowOrchestrationTimer = 
Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER));
       this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(this.metricContext, 
"org.apache.gobblin.service").build());
-    }
-    else {
+    } else {
       this.metricContext = null;
       this.flowOrchestrationSuccessFulMeter = Optional.absent();
       this.flowOrchestrationFailedMeter = Optional.absent();
@@ -106,6 +107,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
     this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
     this.topologyCatalog = topologyCatalog;
+    this.dagManager = dagManager;
     try {
       String specCompilerClassName = 
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
       if 
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
@@ -121,25 +123,25 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     }
   }
 
-  public Orchestrator(Config config, Optional<TopologyCatalog> 
topologyCatalog, Optional<Logger> log) {
-    this(config, topologyCatalog, log, true);
+  public Orchestrator(Config config, Optional<TopologyCatalog> 
topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log) {
+    this(config, topologyCatalog, dagManager, log, true);
   }
 
-  public Orchestrator(Config config, Optional<TopologyCatalog> 
topologyCatalog, Logger log) {
-    this(config, topologyCatalog, Optional.of(log));
+  public Orchestrator(Config config, Optional<TopologyCatalog> 
topologyCatalog, Optional<DagManager> dagManager, Logger log) {
+    this(config, topologyCatalog, dagManager, Optional.of(log));
   }
 
   public Orchestrator(Config config, Logger log) {
-    this(config, Optional.<TopologyCatalog>absent(), Optional.of(log));
+    this(config, Optional.<TopologyCatalog>absent(), 
Optional.<DagManager>absent(), Optional.of(log));
   }
 
   /** Constructor with no logging */
   public Orchestrator(Config config, Optional<TopologyCatalog> 
topologyCatalog) {
-    this(config, topologyCatalog, Optional.<Logger>absent());
+    this(config, topologyCatalog, Optional.<DagManager>absent(), 
Optional.<Logger>absent());
   }
 
   public Orchestrator(Config config) {
-    this(config, Optional.<TopologyCatalog>absent(), 
Optional.<Logger>absent());
+    this(config, Optional.<TopologyCatalog>absent(), 
Optional.<DagManager>absent(), Optional.<Logger>absent());
   }
 
   @VisibleForTesting
@@ -216,35 +218,39 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
         flowCompilationTimer.stop(flowMetadata);
       }
 
-      // Schedule all compiled JobSpecs on their respective Executor
-      for (Dag.DagNode<JobExecutionPlan> dagNode: 
jobExecutionPlanDag.getNodes()) {
-        JobExecutionPlan jobExecutionPlan = dagNode.getValue();
-
-        // Run this spec on selected executor
-        SpecProducer producer = null;
-        try {
-          producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
-          Spec jobSpec = jobExecutionPlan.getJobSpec();
-
-          if 
(!((JobSpec)jobSpec).getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
 {
-            _log.warn("JobSpec does not contain flowExecutionId.");
-          }
-
-          Map<String, String> jobMetadata = getJobMetadata(flowMetadata, 
jobExecutionPlan);
-          _log.info(String.format("Going to orchestrate JobSpec: %s on 
Executor: %s", jobSpec, producer));
-
-          TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent()
-              ? 
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED)
-              : null;
-
-          producer.addSpec(jobSpec);
-
-          if (jobOrchestrationTimer != null) {
-            jobOrchestrationTimer.stop(jobMetadata);
+      if (this.dagManager.isPresent()) {
+        //Send the dag to the DagManager.
+        this.dagManager.get().offer(jobExecutionPlanDag);
+      } else {
+        // Schedule all compiled JobSpecs on their respective Executor
+        for (Dag.DagNode<JobExecutionPlan> dagNode : 
jobExecutionPlanDag.getNodes()) {
+          JobExecutionPlan jobExecutionPlan = dagNode.getValue();
+
+          // Run this spec on selected executor
+          SpecProducer producer = null;
+          try {
+            producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
+            Spec jobSpec = jobExecutionPlan.getJobSpec();
+
+            if (!((JobSpec) 
jobSpec).getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+              _log.warn("JobSpec does not contain flowExecutionId.");
+            }
+
+            Map<String, String> jobMetadata = getJobMetadata(flowMetadata, 
jobExecutionPlan);
+            _log.info(String.format("Going to orchestrate JobSpec: %s on 
Executor: %s", jobSpec, producer));
+
+            TimingEvent jobOrchestrationTimer = 
this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
+                getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : 
null;
+
+            producer.addSpec(jobSpec);
+
+            if (jobOrchestrationTimer != null) {
+              jobOrchestrationTimer.stop(jobMetadata);
+            }
+          } catch (Exception e) {
+            _log.error("Cannot successfully setup spec: " + 
jobExecutionPlan.getJobSpec() + " on executor: " + producer
+                + " for flow: " + spec, e);
           }
-        } catch(Exception e) {
-          _log.error("Cannot successfully setup spec: " + 
jobExecutionPlan.getJobSpec() + " on executor: " + producer +
-              " for flow: " + spec, e);
         }
       }
     } else {
@@ -255,7 +261,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
-  private Map<String,String> getFlowMetadata(FlowSpec flowSpec) {
+  private Map<String, String> getFlowMetadata(FlowSpec flowSpec) {
     Map<String, String> metadata = Maps.newHashMap();
 
     metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY));
@@ -267,7 +273,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     return metadata;
   }
 
-  private Map<String,String> getJobMetadata(Map<String,String> flowMetadata, 
JobExecutionPlan jobExecutionPlan) {
+  private Map<String, String> getJobMetadata(Map<String, String> flowMetadata, 
JobExecutionPlan jobExecutionPlan) {
     Map<String, String> jobMetadata = Maps.newHashMap();
     JobSpec jobSpec = jobExecutionPlan.getJobSpec();
     SpecExecutor specExecutor = jobExecutionPlan.getSpecExecutor();
@@ -306,9 +312,9 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
           _log.info(String.format("Going to delete JobSpec: %s on Executor: 
%s", jobSpec, producer));
           producer.deleteSpec(jobSpec.getUri(), headers);
-        } catch(Exception e) {
-          _log.error("Cannot successfully delete spec: " + 
jobExecutionPlan.getJobSpec() + " on executor: " + producer +
-              " for flow: " + spec, e);
+        } catch (Exception e) {
+          _log.error("Cannot successfully delete spec: " + 
jobExecutionPlan.getJobSpec() + " on executor: " + producer
+              + " for flow: " + spec, e);
         }
       }
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 1ec791a..ee4ecb7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -21,8 +21,6 @@ import java.net.URI;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.helix.HelixManager;
@@ -57,6 +55,7 @@ import org.apache.gobblin.scheduler.BaseGobblinJob;
 import org.apache.gobblin.scheduler.JobScheduler;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PropertiesUtils;
@@ -95,10 +94,10 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
   }
 
   public GobblinServiceJobScheduler(String serviceName, Config config, 
Optional<HelixManager> helixManager,
-      Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> 
topologyCatalog, SchedulerService schedulerService,
-      Optional<Logger> log)
+      Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> 
topologyCatalog, Optional<DagManager> dagManager,
+      SchedulerService schedulerService, Optional<Logger> log)
       throws Exception {
-    this(serviceName, config, helixManager, flowCatalog, topologyCatalog, new 
Orchestrator(config, topologyCatalog, log),
+    this(serviceName, config, helixManager, flowCatalog, topologyCatalog, new 
Orchestrator(config, topologyCatalog, dagManager, log),
         schedulerService, log);
   }
 
@@ -124,9 +123,8 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
           }
         }
       }
-    }
-    // Since we are going to change status to isActive=false, unschedule all 
flows
-    else {
+    } else {
+      // Since we are going to change status to isActive=false, unschedule all 
flows
       for (Spec spec : this.scheduledFlowSpecs.values()) {
         onDeleteSpec(spec.getUri(), spec.getVersion());
       }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index c0c9297..438bed0 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -26,13 +26,14 @@ import com.google.common.base.Joiner;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
 
-import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -41,10 +42,11 @@ import org.apache.gobblin.util.ConfigUtils;
  * where the {@link JobSpec} will be executed.
  */
 @Data
-@AllArgsConstructor
+@EqualsAndHashCode (exclude = {"executionStatus"})
 public class JobExecutionPlan {
-  private JobSpec jobSpec;
-  private SpecExecutor specExecutor;
+  private final JobSpec jobSpec;
+  private final SpecExecutor specExecutor;
+  private ExecutionStatus executionStatus = ExecutionStatus.$UNKNOWN;
 
   public static class Factory {
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
new file mode 100644
index 0000000..0ab13ba
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.spec;
+
+import java.lang.reflect.Type;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+@Slf4j
+public class JobExecutionPlanListDeserializer implements 
JsonDeserializer<List<JobExecutionPlan>> {
+  /**
+   * Gson invokes this call-back method during deserialization when it 
encounters a field of the
+   * specified type.
+   * <p>In the implementation of this call-back method, you should consider 
invoking
+   * {@link JsonDeserializationContext#deserialize(JsonElement, Type)} method 
to create objects
+   * for any non-trivial field of the returned object. However, you should 
never invoke it on the
+   * the same type passing {@code json} since that will cause an infinite loop 
(Gson will call your
+   * call-back method again).
+   *
+   * @param json The Json data being deserialized
+   * @param typeOfT The type of the Object to deserialize to
+   * @param context
+   * @return a deserialized object of the specified type typeOfT which is a 
subclass of {@code T}
+   * @throws JsonParseException if json is not in the expected format of 
{@code typeofT}
+   */
+  @Override
+  public List<JobExecutionPlan> deserialize(JsonElement json, Type typeOfT, 
JsonDeserializationContext context)
+      throws JsonParseException {
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+    JsonArray jsonArray = json.getAsJsonArray();
+
+    for (JsonElement jsonElement: jsonArray) {
+      JsonObject serializedJobExecutionPlan = (JsonObject) jsonElement;
+      JsonObject jobSpecJson = (JsonObject) 
serializedJobExecutionPlan.get(SerializationConstants.JOB_SPEC_KEY);
+      JsonObject specExecutorJson = (JsonObject) 
serializedJobExecutionPlan.get(SerializationConstants.SPEC_EXECUTOR_KEY);
+      ExecutionStatus executionStatus = 
ExecutionStatus.valueOf(serializedJobExecutionPlan.
+          get(SerializationConstants.EXECUTION_STATUS_KEY).getAsString());
+
+      String uri = 
jobSpecJson.get(SerializationConstants.JOB_SPEC_URI_KEY).getAsString();
+      String version = 
jobSpecJson.get(SerializationConstants.JOB_SPEC_VERSION_KEY).getAsString();
+      String description = 
jobSpecJson.get(SerializationConstants.JOB_SPEC_DESCRIPTION_KEY).getAsString();
+      String templateURI = 
jobSpecJson.get(SerializationConstants.JOB_SPEC_TEMPLATE_URI_KEY).getAsString();
+      String config = 
jobSpecJson.get(SerializationConstants.JOB_SPEC_CONFIG_KEY).getAsString();
+      Config jobConfig = ConfigFactory.parseString(config);
+      JobSpec jobSpec;
+      try {
+        JobSpec.Builder builder = (uri == null) ? JobSpec.builder() : 
JobSpec.builder(uri);
+        builder = (templateURI == null) ? builder : builder.withTemplate(new 
URI(templateURI));
+        builder = (version == null) ? builder : builder.withVersion(version);
+        builder = (description == null) ? builder : 
builder.withDescription(description);
+        jobSpec = builder.withConfig(jobConfig).build();
+      } catch (URISyntaxException e) {
+        log.error("Error deserializing JobSpec {}", config);
+        throw new RuntimeException(e);
+      }
+
+      Config specExecutorConfig = 
ConfigFactory.parseString(specExecutorJson.get(SerializationConstants.SPEC_EXECUTOR_CONFIG_KEY).getAsString());
+      String className = 
specExecutorJson.get(SerializationConstants.SPEC_EXECUTOR_CLASS_KEY).getAsString();
+      SpecExecutor specExecutor;
+      try {
+        specExecutor =
+            (SpecExecutor) 
GobblinConstructorUtils.invokeLongestConstructor(Class.forName(className), 
specExecutorConfig);
+      } catch (ReflectiveOperationException e) {
+        log.error("Error deserializing specExecuor {}", specExecutorConfig);
+        throw new RuntimeException(e);
+      }
+
+      JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(jobSpec, 
specExecutor);
+      jobExecutionPlan.setExecutionStatus(executionStatus);
+      jobExecutionPlans.add(jobExecutionPlan);
+    }
+    return jobExecutionPlans;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
new file mode 100644
index 0000000..3de0209
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.spec;
+
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.JobSpec;
+
+@Slf4j
+public class JobExecutionPlanListSerializer implements 
JsonSerializer<List<JobExecutionPlan>> {
+  /**
+   * Gson invokes this call-back method during serialization when it 
encounters a field of the
+   * specified type.
+   *
+   * <p>In the implementation of this call-back method, you should consider 
invoking
+   * {@link JsonSerializationContext#serialize(Object, Type)} method to create 
JsonElements for any
+   * non-trivial field of the {@code src} object. However, you should never 
invoke it on the
+   * {@code src} object itself since that will cause an infinite loop (Gson 
will call your
+   * call-back method again).</p>
+   *
+   * @param src the object that needs to be converted to Json.
+   * @param typeOfSrc the actual type (fully genericized version) of the 
source object.
+   * @param context
+   * @return a JsonElement corresponding to the specified object.
+   */
+  @Override
+  public JsonElement serialize(List<JobExecutionPlan> src, Type typeOfSrc, 
JsonSerializationContext context) {
+    JsonArray jsonArray = new JsonArray();
+
+    for (JobExecutionPlan jobExecutionPlan: src) {
+      JsonObject jobExecutionPlanJson = new JsonObject();
+      JsonObject jobSpecJson = new JsonObject();
+      JobSpec jobSpec = jobExecutionPlan.getJobSpec();
+      String uri = (jobSpec.getUri() != null) ? jobSpec.getUri().toString() : 
null;
+      jobSpecJson.addProperty(SerializationConstants.JOB_SPEC_URI_KEY, uri);
+      jobSpecJson.addProperty(SerializationConstants.JOB_SPEC_VERSION_KEY, 
jobSpec.getVersion());
+      jobSpecJson.addProperty(SerializationConstants.JOB_SPEC_DESCRIPTION_KEY, 
jobSpec.getDescription());
+      String jobSpecTemplateURI = (jobSpec.getTemplateURI().isPresent()) ? 
jobSpec.getTemplateURI().get().toString() : null;
+      
jobSpecJson.addProperty(SerializationConstants.JOB_SPEC_TEMPLATE_URI_KEY, 
jobSpecTemplateURI);
+      jobSpecJson.addProperty(SerializationConstants.JOB_SPEC_CONFIG_KEY, 
jobSpec.getConfig().root().render(ConfigRenderOptions.concise()));
+      jobExecutionPlanJson.add(SerializationConstants.JOB_SPEC_KEY, 
jobSpecJson);
+      Config specExecutorConfig;
+      try {
+         specExecutorConfig = 
jobExecutionPlan.getSpecExecutor().getConfig().get();
+      } catch (InterruptedException | ExecutionException e) {
+        log.error("Error serializing JobExecutionPlan {}", 
jobExecutionPlan.toString());
+        throw new RuntimeException(e);
+      }
+      JsonObject specExecutorJson = new JsonObject();
+      
specExecutorJson.addProperty(SerializationConstants.SPEC_EXECUTOR_CONFIG_KEY,
+          specExecutorConfig.root().render(ConfigRenderOptions.concise()));
+      
specExecutorJson.addProperty(SerializationConstants.SPEC_EXECUTOR_CLASS_KEY,
+          jobExecutionPlan.getSpecExecutor().getClass().getName());
+      jobExecutionPlanJson.add(SerializationConstants.SPEC_EXECUTOR_KEY, 
specExecutorJson);
+
+      String executionStatus = jobExecutionPlan.getExecutionStatus().name();
+      
jobExecutionPlanJson.addProperty(SerializationConstants.EXECUTION_STATUS_KEY, 
executionStatus);
+
+      jsonArray.add(jobExecutionPlanJson);
+    }
+    return jsonArray;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
new file mode 100644
index 0000000..ecffad9
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.spec;
+
+public class SerializationConstants {
+  public static final String JOB_SPEC_URI_KEY= "uri";
+  public static final String JOB_SPEC_VERSION_KEY = "version";
+  public static final String JOB_SPEC_DESCRIPTION_KEY = "description";
+  public static final String JOB_SPEC_TEMPLATE_URI_KEY = "templateURI";
+  public static final String JOB_SPEC_CONFIG_KEY = "config";
+  public static final String JOB_SPEC_KEY = "jobSpec";
+
+  public static final String SPEC_EXECUTOR_CONFIG_KEY = "config";
+  public static final String SPEC_EXECUTOR_CLASS_KEY = "class";
+  public static final String SPEC_EXECUTOR_KEY = "specExecutor";
+
+  public static final String EXECUTION_STATUS_KEY = "executionStatus";
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
index c1134a8..d4f2bc2 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
@@ -211,7 +211,7 @@ public class IdentityFlowToJobSpecCompilerTest {
     
Assert.assertTrue(jobSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
 
     //Assert the start node has no children.
-    Assert.assertNull(jobExecutionPlanDag.getChildren(dagNode));
+    Assert.assertEquals(jobExecutionPlanDag.getChildren(dagNode).size(), 0);
   }
 
   @Test
@@ -245,7 +245,7 @@ public class IdentityFlowToJobSpecCompilerTest {
     
Assert.assertTrue(jobSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
 
     //Assert the start node has no children.
-    Assert.assertNull(jobExecutionPlanDag.getChildren(dagNode));
+    Assert.assertEquals(jobExecutionPlanDag.getChildren(dagNode).size(), 0);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java
index 688f5de..48922d0 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java
@@ -77,8 +77,8 @@ public class DagTest {
     Assert.assertTrue(childSet.contains("val5"));
 
     //Ensure end nodes have no children
-    Assert.assertNull(dag.getChildren(dagNode4));
-    Assert.assertNull(dag.getChildren(dagNode5));
+    Assert.assertEquals(dag.getChildren(dagNode4).size(), 0);
+    Assert.assertEquals(dag.getChildren(dagNode5).size(), 0);
   }
 
   @Test
@@ -172,7 +172,7 @@ public class DagTest {
     Assert.assertEquals(dagNew.getStartNodes().size(), 3);
     for (Dag.DagNode<String> dagNode: Lists.newArrayList(dagNode1, dagNode6, 
dagNode7)) {
       Assert.assertTrue(dagNew.getStartNodes().contains(dagNode));
-      Assert.assertNull(dagNew.getParents(dagNode));
+      Assert.assertEquals(dagNew.getParents(dagNode).size(), 0);
       if (dagNode == dagNode1) {
         List<Dag.DagNode<String>> nextNodes = dagNew.getChildren(dagNode);
         Assert.assertEquals(nextNodes.size(), 2);
@@ -188,7 +188,7 @@ public class DagTest {
     Assert.assertEquals(dagNew.getEndNodes().size(), 3);
     for (Dag.DagNode<String> dagNode: Lists.newArrayList(dagNode4, dagNode5, 
dagNode8)) {
       Assert.assertTrue(dagNew.getEndNodes().contains(dagNode));
-      Assert.assertNull(dagNew.getChildren(dagNode));
+      Assert.assertEquals(dagNew.getChildren(dagNode).size(), 0);
       if (dagNode == dagNode8) {
         Assert.assertEquals(dagNew.getParents(dagNode).size(), 2);
         Assert.assertTrue(dagNew.getParents(dagNode).contains(dagNode6));

Reply via email to