This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new bee3933  [GOBBLIN-808] implement azkaban flow cancel when dag manager 
is enabled
bee3933 is described below

commit bee393352880aad5d9edb05fcc2e5f5f4e5f4811
Author: Arjun <[email protected]>
AuthorDate: Mon Jul 22 12:46:17 2019 -0700

    [GOBBLIN-808] implement azkaban flow cancel when dag manager is enabled
    
    Closes #2674 from arjun4084346/shared-gaas-cancel
---
 .../apache/gobblin/runtime/api/SpecProducer.java   |   9 ++
 gobblin-compaction/build.gradle                    |   2 +
 gobblin-runtime/build.gradle                       |   1 +
 .../InMemorySpecProducer.java                      |  22 ++-
 .../spec_executorInstance/MockedSpecExecutor.java  |  58 ++++++++
 gobblin-service/build.gradle                       |   1 +
 .../service/modules/orchestration/DagManager.java  | 161 ++++++++++++++++----
 .../modules/orchestration/DagManagerUtils.java     |  20 ++-
 .../modules/orchestration/Orchestrator.java        |  33 ++---
 .../scheduler/GobblinServiceJobScheduler.java      |   2 +-
 .../service/modules/spec/JobExecutionPlan.java     |   5 +-
 .../spec/JobExecutionPlanListDeserializer.java     |  14 ++
 .../spec/JobExecutionPlanListSerializer.java       |   9 ++
 .../modules/spec/SerializationConstants.java       |   1 +
 .../modules/orchestration/DagManagerFlowTest.java  | 164 +++++++++++++++++++++
 .../modules/orchestration/DagManagerTest.java      |  23 ++-
 .../modules/orchestration/DagTestUtils.java        |  16 +-
 .../modules/orchestration/FSDagStateStoreTest.java |   5 +-
 .../orchestration/MysqlDagStateStoreTest.java      |  13 +-
 .../monitoring/KafkaAvroJobStatusMonitorTest.java  |   1 -
 20 files changed, 482 insertions(+), 78 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java 
b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
index 880847d..790e1f4 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
@@ -23,6 +23,7 @@ import java.util.Properties;
 import java.util.concurrent.Future;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.util.CompletedFuture;
 
 
 /**
@@ -48,4 +49,12 @@ public interface SpecProducer<V> {
 
   /** List all {@link Spec} being executed on {@link SpecExecutor}. */
   Future<? extends List<V>> listSpecs();
+
+  default String serializeAddSpecResponse(Future<?> response) {
+    return "";
+  }
+
+  default Future<?> deserializeAddSpecResponse(String serializedResponse) {
+    return new CompletedFuture(serializedResponse, null);
+  }
 }
\ No newline at end of file
diff --git a/gobblin-compaction/build.gradle b/gobblin-compaction/build.gradle
index 88bc161..7f8a450 100644
--- a/gobblin-compaction/build.gradle
+++ b/gobblin-compaction/build.gradle
@@ -29,6 +29,8 @@ dependencies {
 
   // Given orc-mapreduce depends on hive version of hive-storage-api(2.4.0) 
and conflicted
   // with hive-exec-core in older version(1.0.1), we need to shadow 
ord-mapreduce's transitive deps.
+  // and include direct orc-mapreduce library just as a compileOnly dependency
+  compileOnly externalDependency.orcMapreduce
   compile project(path: ":gobblin-modules:gobblin-orc-dep", 
configuration:"shadow")
 
   compile externalDependency.calciteCore
diff --git a/gobblin-runtime/build.gradle b/gobblin-runtime/build.gradle
index 71dd510..b28db6b 100644
--- a/gobblin-runtime/build.gradle
+++ b/gobblin-runtime/build.gradle
@@ -74,6 +74,7 @@ dependencies {
   compile externalDependency.lombok
   compile externalDependency.metricsCore
   compile externalDependency.metricsJvm
+  compile externalDependency.mockito
   compile externalDependency.pegasus.data
   compile externalDependency.quartz
   compile externalDependency.slf4j
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java
index cc74757..5e2b0bd 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java
@@ -22,18 +22,19 @@ import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.util.CompletedFuture;
 
-import lombok.extern.slf4j.Slf4j;
-
 @Slf4j
 public class InMemorySpecProducer implements SpecProducer<Spec>, Serializable {
   private final Map<URI, Spec> provisionedSpecs;
@@ -80,4 +81,21 @@ public class InMemorySpecProducer implements 
SpecProducer<Spec>, Serializable {
   public Future<? extends List<Spec>> listSpecs() {
     return new 
CompletedFuture<>(Lists.newArrayList(provisionedSpecs.values()), null);
   }
+
+  @Override
+  public String serializeAddSpecResponse(Future future) {
+    CompletedFuture<Boolean> completedFuture = (CompletedFuture) future;
+
+    try {
+      return completedFuture.get().toString();
+    } catch (ExecutionException e) {
+      log.error("Error during future serialization in {}.", getClass(), e);
+      return "";
+    }
+  }
+
+  @Override
+  public Future<?> deserializeAddSpecResponse(String serializedResponse) {
+    return new CompletedFuture(Boolean.valueOf(serializedResponse), null);
+  }
 }
\ No newline at end of file
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
new file mode 100644
index 0000000..10f7786
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_executorInstance;
+
+import java.net.URI;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.mockito.Mockito;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.util.CompletedFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+
+public class MockedSpecExecutor extends InMemorySpecExecutor {
+  private SpecProducer<Spec> mockedSpecProducer;
+
+  public MockedSpecExecutor(Config config) {
+    super(config);
+    this.mockedSpecProducer = Mockito.mock(SpecProducer.class);
+    when(mockedSpecProducer.addSpec(any())).thenReturn(new 
CompletedFuture(Boolean.TRUE, null));
+    }
+
+  public static SpecExecutor createDummySpecExecutor(URI uri) {
+    Properties properties = new Properties();
+    properties.setProperty(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
uri.toString());
+    return new MockedSpecExecutor(ConfigFactory.parseProperties(properties));
+  }
+
+  @Override
+  public Future<? extends SpecProducer> getProducer(){
+    return new CompletedFuture(this.mockedSpecProducer, null);
+  }
+}
diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle
index 2701022..bafa6d1 100644
--- a/gobblin-service/build.gradle
+++ b/gobblin-service/build.gradle
@@ -30,6 +30,7 @@ dependencies {
   compile 
project(":gobblin-restli:gobblin-flow-config-service:gobblin-flow-config-service-server")
   compile 
project(":gobblin-restli:gobblin-flow-config-service:gobblin-flow-config-service-client")
   compile project(":gobblin-restli:gobblin-restli-utils")
+  compile project(":gobblin-modules:gobblin-azkaban")
 
   compile externalDependency.avro
   compile externalDependency.avroMapredH2
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 2fbb796..e8cf1a4 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -26,9 +26,13 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -42,6 +46,7 @@ import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
@@ -49,6 +54,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.ContextAwareCounter;
 import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
@@ -57,7 +63,7 @@ import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -79,7 +85,8 @@ import static 
org.apache.gobblin.service.ExecutionStatus.valueOf;
 /**
  * This class implements a manager to manage the life cycle of a {@link Dag}. 
A {@link Dag} is submitted to the
  * {@link DagManager} by the {@link Orchestrator#orchestrate(Spec)} method. On 
receiving a {@link Dag}, the
- * {@link DagManager} first persists the {@link Dag} to the {@link 
DagStateStore}, and then submits it to a {@link BlockingQueue}.
+ * {@link DagManager} first persists the {@link Dag} to the {@link 
DagStateStore}, and then submits it to the specific
+ * {@link DagManagerThread}'s {@link BlockingQueue} based on the 
flowExecutionId of the Flow.
  * This guarantees that each {@link Dag} received by the {@link DagManager} 
can be recovered in case of a leadership
  * change or service restart.
  *
@@ -89,6 +96,11 @@ import static 
org.apache.gobblin.service.ExecutionStatus.valueOf;
  * jobs. Upon completion of a job, it will either schedule the next job in the 
Dag (on SUCCESS) or mark the Dag as failed
  * (on FAILURE). Upon completion of a Dag execution, it will perform the 
required clean up actions.
  *
+ * For deleteSpec/cancellation requests for a flow URI, {@link DagManager} 
finds out the flowExecutionId using
+ * {@link JobStatusRetriever}, and forwards the request to the {@link 
DagManagerThread} which handled the addSpec request
+ * for this flow. We need separate {@value queue} and {@value cancelQueue} for 
each {@link DagManagerThread} because
+ * cancellation needs the information which is stored only in the same {@link 
DagManagerThread}.
+ *
  * The {@link DagManager} is active only in the leader mode. To ensure, each 
{@link Dag} managed by a {@link DagManager} is
  * checkpointed to a persistent location. On start up or leadership change,
  * the {@link DagManager} loads all the checkpointed {@link Dag}s and adds 
them to the {@link  BlockingQueue}.
@@ -105,8 +117,8 @@ public class DagManager extends AbstractIdleService {
   private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
   private static final Integer DEFAULT_NUM_THREADS = 3;
   private static final Integer TERMINATION_TIMEOUT = 30;
-  private static final String NUM_THREADS_KEY = DAG_MANAGER_PREFIX + 
"numThreads";
-  private static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
DAG_MANAGER_PREFIX + "pollingInterval";
+  public static final String NUM_THREADS_KEY = DAG_MANAGER_PREFIX + 
"numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
DAG_MANAGER_PREFIX + "pollingInterval";
   private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = 
JOB_STATUS_RETRIEVER_KEY + ".class";
   private static final String DEFAULT_JOB_STATUS_RETRIEVER_CLASS = 
FsJobStatusRetriever.class.getName();
   private static final String DAG_STATESTORE_CLASS_KEY = DAG_MANAGER_PREFIX + 
"dagStateStoreClass";
@@ -136,7 +148,10 @@ public class DagManager extends AbstractIdleService {
     }
   }
 
-  private BlockingQueue<Dag<JobExecutionPlan>> queue;
+  private BlockingQueue<Dag<JobExecutionPlan>>[] queue;
+  private BlockingQueue<String>[] cancelQueue;
+  DagManagerThread[] dagManagerThreads;
+
   private ScheduledExecutorService scheduledExecutorPool;
   private boolean instrumentationEnabled;
   private DagStateStore dagStateStore;
@@ -144,6 +159,7 @@ public class DagManager extends AbstractIdleService {
 
   private final Integer numThreads;
   private final Integer pollingInterval;
+  @Getter
   private final JobStatusRetriever jobStatusRetriever;
   private final KafkaJobStatusMonitor jobStatusMonitor;
   private final Config config;
@@ -153,8 +169,9 @@ public class DagManager extends AbstractIdleService {
 
   public DagManager(Config config, boolean instrumentationEnabled) {
     this.config = config;
-    this.queue = new LinkedBlockingDeque<>();
     this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
+    this.queue = initializeDagQueue(this.numThreads);
+    this.cancelQueue = initializeDagQueue(this.numThreads);
     this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
     this.pollingInterval = ConfigUtils.getInt(config, 
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
     this.instrumentationEnabled = instrumentationEnabled;
@@ -166,15 +183,41 @@ public class DagManager extends AbstractIdleService {
     }
 
     try {
-      Class jobStatusRetrieverClass = 
Class.forName(ConfigUtils.getString(config, JOB_STATUS_RETRIEVER_CLASS_KEY, 
DEFAULT_JOB_STATUS_RETRIEVER_CLASS));
-      this.jobStatusMonitor = new 
KafkaJobStatusMonitorFactory().createJobStatusMonitor(config);
-      this.jobStatusRetriever =
-          (JobStatusRetriever) 
GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass, 
config);
+      this.jobStatusMonitor = createJobStatusMonitor(config);
+      this.jobStatusRetriever = createJobStatusRetriever(config);
     } catch (ReflectiveOperationException e) {
       throw new RuntimeException("Exception encountered during DagManager 
initialization", e);
     }
   }
 
+  JobStatusRetriever createJobStatusRetriever(Config config) throws 
ReflectiveOperationException {
+    Class jobStatusRetrieverClass = 
Class.forName(ConfigUtils.getString(config, JOB_STATUS_RETRIEVER_CLASS_KEY, 
DEFAULT_JOB_STATUS_RETRIEVER_CLASS));
+    return (JobStatusRetriever) 
GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass, 
config);
+  }
+
+  KafkaJobStatusMonitor createJobStatusMonitor(Config config) throws 
ReflectiveOperationException {
+    return new KafkaJobStatusMonitorFactory().createJobStatusMonitor(config);
+  }
+
+  DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap) {
+    try {
+      Class dagStateStoreClass = Class.forName(ConfigUtils.getString(config, 
DAG_STATESTORE_CLASS_KEY, FSDagStateStore.class.getName()));
+      return (DagStateStore) 
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, 
topologySpecMap);
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  // Initializes and returns an array of Queue of size numThreads
+  private static LinkedBlockingDeque[] initializeDagQueue(int numThreads) {
+    LinkedBlockingDeque[] queue = new LinkedBlockingDeque[numThreads];
+
+    for (int i=0; i< numThreads; i++) {
+      queue[i] = new LinkedBlockingDeque<>();
+    }
+    return queue;
+  }
+
   public DagManager(Config config) {
     this(config, true);
   }
@@ -192,14 +235,19 @@ public class DagManager extends AbstractIdleService {
    * submitted dag to the {@link DagStateStore} and then adds the dag to a 
{@link BlockingQueue} to be picked up
    * by one of the {@link DagManagerThread}s.
    */
-  synchronized void offer(Dag<JobExecutionPlan> dag) throws IOException {
+  synchronized void addDag(Dag<JobExecutionPlan> dag) throws IOException {
     //Persist the dag
     this.dagStateStore.writeCheckpoint(dag);
-    submitEventsAndSetStatus(dag);
-    //Add it to the queue of dags
-    if (!this.queue.offer(dag)) {
+    long flowExecutionId = DagManagerUtils.getFlowExecId(dag);
+    int queueId = (int) (flowExecutionId % this.numThreads);
+    // Add the dag to the specific queue determined by flowExecutionId
+    // Flow cancellation request has to be forwarded to the same 
DagManagerThread where the
+    // flow create request was forwarded. This is because Azkaban Exec Id is 
stored in the DagNode of the
+    // specific DagManagerThread queue
+    if (!this.queue[queueId].offer(dag)) {
       throw new IOException("Could not add dag" + 
DagManagerUtils.generateDagId(dag) + "to queue");
     }
+    submitEventsAndSetStatus(dag);
   }
 
   private void submitEventsAndSetStatus(Dag<JobExecutionPlan> dag) {
@@ -213,6 +261,26 @@ public class DagManager extends AbstractIdleService {
     }
   }
 
+  /**
+   * Method to submit a {@link URI} for cancellation requsts to the {@link 
DagManager}.
+   * The {@link DagManager} adds the dag to the {@link BlockingQueue} to be 
picked up by one of the {@link DagManagerThread}s.
+   */
+  synchronized public void stopDag(URI uri) throws IOException {
+    String flowGroup = 
FlowConfigResourceLocalHandler.FlowUriUtils.getFlowGroup(uri);
+    String flowName = 
FlowConfigResourceLocalHandler.FlowUriUtils.getFlowName(uri);
+
+    List<Long> flowExecutionIds = 
this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
+    log.info("Found {} flows to cancel.", flowExecutionIds.size());
+
+    for (long flowExecutionId : flowExecutionIds) {
+      int queueId = (int) (flowExecutionId % this.numThreads);
+      String dagId = DagManagerUtils.generateDagId(flowGroup, flowName, 
flowExecutionId);
+      if (!this.cancelQueue[queueId].offer(dagId)) {
+        throw new IOException("Could not add dag " + dagId + " to cancellation 
queue.");
+      }
+    }
+  }
+
   public synchronized void setTopologySpecMap(Map<URI, TopologySpec> 
topologySpecMap) {
     this.topologySpecMap = topologySpecMap;
   }
@@ -234,20 +302,22 @@ public class DagManager extends AbstractIdleService {
         log.info("Activating DagManager.");
         log.info("Scheduling {} DagManager threads", numThreads);
         //Initializing state store for persisting Dags.
-        Class dagStateStoreClass = Class.forName(ConfigUtils.getString(config, 
DAG_STATESTORE_CLASS_KEY, FSDagStateStore.class.getName()));
-        this.dagStateStore = (DagStateStore) 
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, 
topologySpecMap);
+        this.dagStateStore = createDagStateStore(config, topologySpecMap);
 
         //On startup, the service creates DagManagerThreads that are scheduled 
at a fixed rate.
+        this.dagManagerThreads = new DagManagerThread[numThreads];
         for (int i = 0; i < numThreads; i++) {
-          this.scheduledExecutorPool.scheduleAtFixedRate(new 
DagManagerThread(jobStatusRetriever, dagStateStore, queue, 
instrumentationEnabled), 0, this.pollingInterval,
-              TimeUnit.SECONDS);
+          DagManagerThread dagManagerThread = new 
DagManagerThread(jobStatusRetriever, dagStateStore,
+              queue[i], cancelQueue[i], instrumentationEnabled);
+          this.dagManagerThreads[i] = dagManagerThread;
+          this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, 
this.pollingInterval, TimeUnit.SECONDS);
         }
         if ((this.jobStatusMonitor != null) && 
(!this.jobStatusMonitor.isRunning())) {
           log.info("Starting job status monitor");
           jobStatusMonitor.startAsync().awaitRunning();
         }
         for (Dag<JobExecutionPlan> dag : dagStateStore.getDags()) {
-          offer(dag);
+          addDag(dag);
         }
       } else { //Mark the DagManager inactive.
         log.info("Inactivating the DagManager. Shutting down all DagManager 
threads");
@@ -255,12 +325,12 @@ public class DagManager extends AbstractIdleService {
         try {
           this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, 
TimeUnit.SECONDS);
         } catch (InterruptedException e) {
-          log.error("Exception {} encountered when shutting down DagManager 
threads.", e);
+          log.error("Exception encountered when shutting down DagManager 
threads.", e);
         }
         log.info("Shutting down JobStatusMonitor");
         this.jobStatusMonitor.shutDown();
       }
-    } catch (IOException | ReflectiveOperationException e) {
+    } catch (IOException e) {
       log.error("Exception encountered when activating the new DagManager", e);
       throw new RuntimeException(e);
     }
@@ -277,7 +347,7 @@ public class DagManager extends AbstractIdleService {
   public static class DagManagerThread implements Runnable {
     private final Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new HashMap<>();
     private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
-    private final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs 
= new HashMap<>();
+    final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs = new 
HashMap<>();
     private final Set<String> failedDagIdsFinishRunning = new HashSet<>();
     private final Set<String> failedDagIdsFinishAllPossible = new HashSet<>();
     private final MetricContext metricContext;
@@ -287,15 +357,17 @@ public class DagManager extends AbstractIdleService {
     private JobStatusRetriever jobStatusRetriever;
     private DagStateStore dagStateStore;
     private BlockingQueue<Dag<JobExecutionPlan>> queue;
+    private BlockingQueue<String> cancelQueue;
 
     /**
      * Constructor.
      */
     DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore 
dagStateStore,
-        BlockingQueue<Dag<JobExecutionPlan>> queue, boolean 
instrumentationEnabled) {
+        BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> 
cancelQueue, boolean instrumentationEnabled) {
       this.jobStatusRetriever = jobStatusRetriever;
       this.dagStateStore = dagStateStore;
       this.queue = queue;
+      this.cancelQueue = cancelQueue;
       if (instrumentationEnabled) {
         this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
         this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(this.metricContext, 
"org.apache.gobblin.service").build());
@@ -314,10 +386,15 @@ public class DagManager extends AbstractIdleService {
     @Override
     public void run() {
       try {
-        Object nextItem = queue.poll();
+        String nextDagToCancel = cancelQueue.poll();
+        //Poll the cancelQueue for a new Dag to cancel.
+        if (nextDagToCancel != null) {
+          cancelDag(nextDagToCancel);
+        }
+
+        Dag<JobExecutionPlan> dag = queue.poll();
         //Poll the queue for a new Dag to execute.
-        if (nextItem != null) {
-          Dag<JobExecutionPlan> dag = (Dag<JobExecutionPlan>) nextItem;
+        if (dag != null) {
           if (dag.isEmpty()) {
             log.info("Empty dag; ignoring the dag");
           }
@@ -337,6 +414,30 @@ public class DagManager extends AbstractIdleService {
       }
     }
 
+    private void cancelDag(String dagToCancel) throws ExecutionException, 
InterruptedException {
+      log.info("Cancel flow with DagId {}", dagToCancel);
+      if (this.dagToJobs.containsKey(dagToCancel)) {
+        List<DagNode<JobExecutionPlan>> dagNodesToCancel = 
this.dagToJobs.get(dagToCancel);
+        log.info("Found {} DagNodes to cancel.", dagNodesToCancel.size());
+        for (DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
+          Properties props = new Properties();
+          if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+            Future future = dagNodeToCancel.getValue().getJobFuture().get();
+            if (future instanceof CompletableFuture &&
+              future.get() instanceof AzkabanExecuteFlowStatus.ExecuteId) {
+              CompletableFuture<AzkabanExecuteFlowStatus.ExecuteId> 
completableFuture = (CompletableFuture) future;
+              String azkabanExecId = completableFuture.get().getExecId();
+              props.put(ConfigurationKeys.AZKABAN_EXEC_ID, azkabanExecId);
+              log.info("Cancel job with azkaban exec id {}.", azkabanExecId);
+            }
+          }
+          DagManagerUtils.getSpecProducer(dagNodeToCancel).deleteSpec(null, 
props);
+        }
+      } else {
+        log.warn("Did not find Dag with id {}, it might be already 
cancelled.", dagToCancel);
+      }
+    }
+
     /**
      * This method determines the next set of jobs to execute from the dag and 
submits them for execution.
      * This method updates internal data structures tracking currently running 
Dags and jobs.
@@ -450,8 +551,8 @@ public class DagManager extends AbstractIdleService {
     }
 
     /**
-     * Obtain next dag
-     * @param dagId The dagId that has been processed.
+     * Submit next set of Dag nodes in the Dag identified by the provided dagId
+     * @param dagId The dagId that should be processed.
      * @return
      * @throws IOException
      */
@@ -493,7 +594,9 @@ public class DagManager extends AbstractIdleService {
         // The SpecProducer implementations submit the job to the underlying 
executor and return when the submission is complete,
         // either successfully or unsuccessfully. To catch any exceptions in 
the job submission, the DagManagerThread
         // blocks (by calling Future#get()) until the submission is completed.
-        producer.addSpec(jobSpec).get();
+        Future addSpecFuture = producer.addSpec(jobSpec);
+        dagNode.getValue().setJobFuture(Optional.of(addSpecFuture));
+
         if (this.metricContext != null) {
           getRunningJobsCounter(dagNode).inc();
         }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 110ae0e..46fc59a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -58,9 +58,23 @@ public class DagManagerUtils {
    * @return a String id associated corresponding to the {@link Dag} instance.
    */
   static String generateDagId(Dag<JobExecutionPlan> dag) {
-    FlowId flowId = getFlowId(dag);
-    Long flowExecutionId = getFlowExecId(dag);
-    return Joiner.on("_").join(flowId.getFlowGroup(), flowId.getFlowName(), 
flowExecutionId);
+    return 
generateDagId(dag.getStartNodes().get(0).getValue().getJobSpec().getConfig());
+  }
+
+  static String generateDagId(Dag.DagNode<JobExecutionPlan> dagNode) {
+    return generateDagId(dagNode.getValue().getJobSpec().getConfig());
+  }
+
+  private static String generateDagId(Config jobConfig) {
+    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+
+    return generateDagId(flowGroup, flowName, flowExecutionId);
+  }
+
+  static String generateDagId(String flowGroup, String flowName, long 
flowExecutionId) {
+    return Joiner.on("_").join(flowGroup, flowName, flowExecutionId);
   }
 
   /**
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index d8d5480..f987682 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
+import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
 import java.util.Collections;
@@ -283,7 +284,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
       if (this.dagManager.isPresent()) {
         //Send the dag to the DagManager.
-        this.dagManager.get().offer(jobExecutionPlanDag);
+        this.dagManager.get().addDag(jobExecutionPlanDag);
       } else {
         // Schedule all compiled JobSpecs on their respective Executor
         for (Dag.DagNode<JobExecutionPlan> dagNode : 
jobExecutionPlanDag.getNodes()) {
@@ -341,33 +342,17 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     }
   }
 
-  public void remove(Spec spec, Properties headers) {
+  public void remove(Spec spec, Properties headers) throws IOException {
     // TODO: Evolve logic to cache and reuse previously compiled JobSpecs
     // .. this will work for Identity compiler but not always for multi-hop.
     // Note: Current logic assumes compilation is consistent between all 
executions
     if (spec instanceof FlowSpec) {
-      Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(spec);
-
-      if (jobExecutionPlanDag.isEmpty()) {
-        _log.warn("Cannot determine an executor to delete Spec: " + spec);
-        return;
-      }
-
-      // Delete all compiled JobSpecs on their respective Executor
-      for (Dag.DagNode<JobExecutionPlan> dagNode: 
jobExecutionPlanDag.getNodes()) {
-        JobExecutionPlan jobExecutionPlan = dagNode.getValue();
-        // Delete this spec on selected executor
-        SpecProducer producer = null;
-        try {
-          producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
-          Spec jobSpec = jobExecutionPlan.getJobSpec();
-
-          _log.info(String.format("Going to delete JobSpec: %s on Executor: 
%s", jobSpec, producer));
-          producer.deleteSpec(jobSpec.getUri(), headers);
-        } catch (Exception e) {
-          _log.error("Cannot successfully delete spec: " + 
jobExecutionPlan.getJobSpec() + " on executor: " + producer
-              + " for flow: " + spec, e);
-        }
+      if (this.dagManager.isPresent()) {
+        //Send the dag to the DagManager.
+        _log.info("Forwarding cancel request for flow URI {} to DagManager.", 
spec.getUri());
+        this.dagManager.get().stopDag(spec.getUri());
+      } else {
+        _log.warn("Operation not supported.");
       }
     } else {
       throw new RuntimeException("Spec not of type FlowSpec, cannot delete: " 
+ spec);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index a691811..baac7d6 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -349,7 +349,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
             "Spec with URI: %s was not found in cache. May be it was cleaned, 
if not please " + "clean it manually",
             deletedSpecURI));
       }
-    } catch (JobException e) {
+    } catch (JobException | IOException e) {
       _log.warn(String.format("Spec with URI: %s was not unscheduled 
cleaning", deletedSpecURI), e);
     }
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index acdafea..27d76cb 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -19,10 +19,12 @@ package org.apache.gobblin.service.modules.spec;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.concurrent.Future;
 
 import org.apache.commons.lang3.StringUtils;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
 import com.google.common.base.Strings;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -52,7 +54,7 @@ import org.apache.gobblin.util.ConfigUtils;
  * where the {@link JobSpec} will be executed.
  */
 @Data
-@EqualsAndHashCode(exclude = {"executionStatus", "currentAttempts"})
+@EqualsAndHashCode(exclude = {"executionStatus", "currentAttempts", 
"jobFuture"})
 public class JobExecutionPlan {
   public static final String JOB_MAX_ATTEMPTS = "job.maxAttempts";
 
@@ -61,6 +63,7 @@ public class JobExecutionPlan {
   private ExecutionStatus executionStatus = ExecutionStatus.PENDING;
   private final int maxAttempts;
   private int currentAttempts = 0;
+  private Optional<Future> jobFuture = Optional.absent();
 
   public static class Factory {
     public static final String JOB_NAME_COMPONENT_SEPARATION_CHAR = "_";
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
index 11b5e3f..1f988c2 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
@@ -23,7 +23,10 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
+import com.google.common.base.Optional;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonDeserializationContext;
 import com.google.gson.JsonDeserializer;
@@ -107,6 +110,17 @@ public class JobExecutionPlanListDeserializer implements 
JsonDeserializer<List<J
 
       JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(jobSpec, 
specExecutor);
       jobExecutionPlan.setExecutionStatus(executionStatus);
+
+      try {
+        String jobExecutionFuture = 
serializedJobExecutionPlan.get(SerializationConstants.JOB_EXECUTION_FUTURE).getAsString();
+        Future future = 
specExecutor.getProducer().get().deserializeAddSpecResponse(jobExecutionFuture);
+        jobExecutionPlan.setJobFuture(Optional.fromNullable(future));
+
+      } catch (ExecutionException | InterruptedException e) {
+        log.warn("Error during deserialization of JobExecutionFuture.");
+        throw new RuntimeException(e);
+      }
+
       jobExecutionPlans.add(jobExecutionPlan);
     }
     return jobExecutionPlans;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
index 3de0209..5a38c19 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
@@ -83,6 +83,15 @@ public class JobExecutionPlanListSerializer implements 
JsonSerializer<List<JobEx
       String executionStatus = jobExecutionPlan.getExecutionStatus().name();
       
jobExecutionPlanJson.addProperty(SerializationConstants.EXECUTION_STATUS_KEY, 
executionStatus);
 
+      try {
+        String jobExecutionFuture = 
jobExecutionPlan.getSpecExecutor().getProducer().get()
+            
.serializeAddSpecResponse(jobExecutionPlan.getJobFuture().orNull());
+        
jobExecutionPlanJson.addProperty(SerializationConstants.JOB_EXECUTION_FUTURE, 
jobExecutionFuture);
+      } catch (InterruptedException | ExecutionException e) {
+        log.warn("Error during serialization of JobExecutionFuture.");
+        throw new RuntimeException(e);
+      }
+
       jsonArray.add(jobExecutionPlanJson);
     }
     return jsonArray;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
index aa8ea9e..5359465 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
@@ -31,4 +31,5 @@ public class SerializationConstants {
   public static final String SPEC_EXECUTOR_URI_KEY = "uri";
 
   public static final String EXECUTION_STATUS_KEY = "executionStatus";
+  public static final String JOB_EXECUTION_FUTURE = "jobExecutionFuture";
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
new file mode 100644
index 0000000..1300458
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicate;
+import com.typesafe.config.Config;
+
+import javax.annotation.Nullable;
+
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
+import org.apache.gobblin.testing.AssertWithBackoff;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class DagManagerFlowTest {
+  DagManager dagManager;
+
+  @BeforeClass
+  public void setUp() {
+    Properties props = new Properties();
+    props.put(DagManager.JOB_STATUS_POLLING_INTERVAL_KEY, 1);
+    dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props), 
false);
+    dagManager.setActive(true);
+  }
+
+  @Test
+  void testAddDeleteSpec() throws Exception {
+    Dag<JobExecutionPlan> dag1 = DagManagerTest.buildDag("0", 123456780L, 
"FINISH_RUNNING", 1);
+    Dag<JobExecutionPlan> dag2 = DagManagerTest.buildDag("1", 123456781L, 
"FINISH_RUNNING", 1);
+    Dag<JobExecutionPlan> dag3 = DagManagerTest.buildDag("2", 123456782L, 
"FINISH_RUNNING", 1);
+
+    // mock add spec
+    dagManager.addDag(dag1);
+    dagManager.addDag(dag2);
+    dagManager.addDag(dag3);
+
+    // check existence of dag in dagToJobs map
+    AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
+        assertTrue(input -> 
dagManager.dagManagerThreads[0].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag1)),
 "Waiting for the map to update");
+    AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
+        assertTrue(input -> 
dagManager.dagManagerThreads[1].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag2)),
 "Waiting for the map to update");
+    AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
+        assertTrue(input -> 
dagManager.dagManagerThreads[2].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag3)),
 "Waiting for the map to update");
+
+    // mock delete spec
+    
dagManager.stopDag(FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(new
 FlowId().setFlowGroup("group0").setFlowName("flow0")));
+    
dagManager.stopDag(FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(new
 FlowId().setFlowGroup("group1").setFlowName("flow1")));
+    
dagManager.stopDag(FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(new
 FlowId().setFlowGroup("group2").setFlowName("flow2")));
+
+    // verify deleteSpec() of specProducer is called once
+    
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new 
DeletePredicate(dag1), "Waiting for the map to update");
+    
AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new 
DeletePredicate(dag2), "Waiting for the map to update");
+    
AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new 
DeletePredicate(dag3), "Waiting for the map to update");
+
+    // mock flow cancellation tracking event
+    Mockito.doReturn(DagManagerTest.getMockJobStatus("flow0", "group0", 
123456780L, "group0", "job0", String.valueOf(
+        ExecutionStatus.CANCELLED)))
+        
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow0",
 "group0", 123456780L, "job0", "group0");
+
+    Mockito.doReturn(DagManagerTest.getMockJobStatus("flow1", "group1", 
123456781L, "group1", "job0", String.valueOf(
+        ExecutionStatus.CANCELLED)))
+        
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow1",
 "group1", 123456781L, "job0", "group1");
+
+    Mockito.doReturn(DagManagerTest.getMockJobStatus("flow2", "group2", 
123456782L, "group2", "job0", String.valueOf(
+        ExecutionStatus.CANCELLED)))
+        
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow2",
 "group2", 123456782L, "job0", "group2");
+
+    // check removal of dag in dagToJobs map
+    AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
+        assertTrue(input -> 
!dagManager.dagManagerThreads[0].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag1)),
 "Waiting for the map to update");
+    AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
+        assertTrue(input -> 
!dagManager.dagManagerThreads[1].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag2)),
 "Waiting for the map to update");
+    AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
+        assertTrue(input -> 
!dagManager.dagManagerThreads[2].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag3)),
 "Waiting for the map to update");
+  }
+}
+
+class DeletePredicate implements Predicate {
+  private final Dag<JobExecutionPlan> dag;
+  public DeletePredicate(Dag<JobExecutionPlan> dag) {
+    this.dag = dag;
+  }
+
+  @Override
+  public boolean apply(@Nullable Object input) {
+    try {
+      
verify(dag.getNodes().get(0).getValue().getSpecExecutor().getProducer().get()).deleteSpec(any(),
 any());
+    } catch (Throwable e) {
+      return false;
+    }
+    return true;
+  }
+}
+
+class MockedDagManager extends DagManager {
+
+  public MockedDagManager(Config config, boolean instrumentationEnabled) {
+    super(config, instrumentationEnabled);
+  }
+
+  @Override
+  JobStatusRetriever createJobStatusRetriever(Config config) {
+    JobStatusRetriever mockedJbStatusRetriever = 
Mockito.mock(JobStatusRetriever.class);
+    
Mockito.doReturn(Collections.emptyIterator()).when(mockedJbStatusRetriever).getJobStatusesForFlowExecution(anyString(),
 anyString(), anyLong(), anyString(), anyString());
+    when(mockedJbStatusRetriever.getLatestExecutionIdsForFlow(eq("flow0"), 
eq("group0"), anyInt())).thenReturn(Collections.singletonList(123456780L));
+    when(mockedJbStatusRetriever.getLatestExecutionIdsForFlow(eq("flow1"), 
eq("group1"), anyInt())).thenReturn(Collections.singletonList(123456781L));
+    when(mockedJbStatusRetriever.getLatestExecutionIdsForFlow(eq("flow2"), 
eq("group2"), anyInt())).thenReturn(Collections.singletonList(123456782L));
+    return  mockedJbStatusRetriever;
+  }
+
+  @Override
+  KafkaJobStatusMonitor createJobStatusMonitor(Config config) {
+    return null;
+  }
+
+  @Override
+  DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap) {
+    DagStateStore mockedDagStateStore = Mockito.mock(DagStateStore.class);
+
+    try {
+      doNothing().when(mockedDagStateStore).writeCheckpoint(any());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return mockedDagStateStore;
+  }
+}
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 0165bb2..c250a49 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -46,7 +46,7 @@ import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
@@ -62,6 +62,7 @@ public class DagManagerTest {
   private JobStatusRetriever _jobStatusRetriever;
   private DagManager.DagManagerThread _dagManagerThread;
   private LinkedBlockingQueue<Dag<JobExecutionPlan>> queue;
+  private LinkedBlockingQueue<String> cancelQueue;
   private Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag;
   private Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs;
   private Map<String, Dag<JobExecutionPlan>> dags;
@@ -75,7 +76,9 @@ public class DagManagerTest {
     this._dagStateStore = new FSDagStateStore(config, new HashMap<>());
     this._jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
     this.queue = new LinkedBlockingQueue<>();
-    this._dagManagerThread = new 
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, queue, true);
+    this.cancelQueue = new LinkedBlockingQueue<>();
+    this._dagManagerThread = new 
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, queue, 
cancelQueue,
+        true);
 
     Field jobToDagField = 
DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
     jobToDagField.setAccessible(true);
@@ -95,10 +98,16 @@ public class DagManagerTest {
    * Create a {@link Dag <JobExecutionPlan>}.
    * @return a Dag.
    */
-  private Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId, 
String flowFailureOption, boolean flag)
+  static Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId, 
String flowFailureOption, boolean flag)
       throws URISyntaxException {
-    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
     int numNodes = (flag) ? 3 : 5;
+    return buildDag(id, flowExecutionId, flowFailureOption, numNodes);
+  }
+
+  static Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId, 
String flowFailureOption, int numNodes)
+      throws URISyntaxException {
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+
     for (int i = 0; i < numNodes; i++) {
       String suffix = Integer.toString(i);
       Config jobConfig = ConfigBuilder.create().
@@ -117,18 +126,18 @@ public class DagManagerTest {
       }
       JobSpec js = JobSpec.builder("test_job" + 
suffix).withVersion(suffix).withConfig(jobConfig).
           withTemplate(new URI("job" + suffix)).build();
-      SpecExecutor specExecutor = 
InMemorySpecExecutor.createDummySpecExecutor(new URI("job" + i));
+      SpecExecutor specExecutor = 
MockedSpecExecutor.createDummySpecExecutor(new URI("job" + i));
       JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, 
specExecutor);
       jobExecutionPlans.add(jobExecutionPlan);
     }
     return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
   }
 
-  private Iterator<JobStatus> getMockJobStatus(String flowName, String 
flowGroup, Long flowExecutionId, String jobGroup, String jobName, String 
eventName) {
+  static Iterator<JobStatus> getMockJobStatus(String flowName, String 
flowGroup, Long flowExecutionId, String jobGroup, String jobName, String 
eventName) {
     return getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, 
jobName, eventName, false);
   }
 
-  private Iterator<JobStatus> getMockJobStatus(String flowName, String 
flowGroup,  Long flowExecutionId, String jobGroup, String jobName, String 
eventName, boolean shouldRetry) {
+  private static Iterator<JobStatus> getMockJobStatus(String flowName, String 
flowGroup,  Long flowExecutionId, String jobGroup, String jobName, String 
eventName, boolean shouldRetry) {
     return 
Iterators.singletonIterator(JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(jobGroup).jobName(jobName).flowExecutionId(flowExecutionId).
         message("Test 
message").eventName(eventName).startTime(5000L).shouldRetry(shouldRetry).build());
   }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagTestUtils.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagTestUtils.java
index f63c6ad..e87c4d6 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagTestUtils.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagTestUtils.java
@@ -23,6 +23,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.JobSpec;
@@ -33,11 +39,8 @@ import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import org.apache.gobblin.util.CompletedFuture;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.hadoop.fs.Path;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
 
 
 public class DagTestUtils {
@@ -86,6 +89,11 @@ public class DagTestUtils {
       SpecExecutor specExecutor = 
buildNaiveTopologySpec("mySpecExecutor").getSpecExecutor();
       JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, 
specExecutor);
       jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
+
+      // Future of type CompletedFuture is used because in tests 
InMemorySpecProducer is used and that responds with CompletedFuture
+      CompletedFuture future = new CompletedFuture<>(Boolean.TRUE, null);
+      jobExecutionPlan.setJobFuture(Optional.of(future));
+
       jobExecutionPlans.add(jobExecutionPlan);
     }
     return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
index a38929a..b2fa5f5 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
@@ -23,6 +23,7 @@ import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
@@ -110,7 +111,7 @@ public class FSDagStateStoreTest {
   }
 
   @Test (dependsOnMethods = "testCleanUp")
-  public void testGetDags() throws IOException, URISyntaxException {
+  public void testGetDags() throws IOException, URISyntaxException, 
ExecutionException, InterruptedException {
     //Set up a new FSDagStateStore instance.
     setUp();
     List<Long> flowExecutionIds = 
Lists.newArrayList(System.currentTimeMillis(), System.currentTimeMillis() + 1);
@@ -129,6 +130,8 @@ public class FSDagStateStoreTest {
       Assert.assertEquals(dag.getParentChildMap().size(), 1);
       
Assert.assertEquals(dag.getNodes().get(0).getValue().getSpecExecutor().getUri(),
 specExecURI);
       
Assert.assertEquals(dag.getNodes().get(1).getValue().getSpecExecutor().getUri(),
 specExecURI);
+      
Assert.assertTrue(Boolean.parseBoolean(dag.getNodes().get(0).getValue().getJobFuture().get().get().toString()));
+      
Assert.assertTrue(Boolean.parseBoolean(dag.getNodes().get(1).getValue().getJobFuture().get().get().toString()));
     }
   }
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
index 6ad85d6..a4d39e6 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
@@ -23,6 +23,12 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.dbcp.BasicDataSource;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
@@ -34,11 +40,6 @@ import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.typesafe.config.Config;
 
 
 /**
@@ -97,6 +98,8 @@ public class MysqlDagStateStoreTest {
       
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY), 
"flow" + "random_0");
       
Assert.assertEquals(jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), 
123L);
       Assert.assertEquals(plan.getExecutionStatus(), ExecutionStatus.RUNNING);
+      
Assert.assertTrue(Boolean.parseBoolean(plan.getJobFuture().get().get().toString()));
+      
Assert.assertTrue(Boolean.parseBoolean(plan.getJobFuture().get().get().toString()));
     }
 
     dagDeserialized = dags.get(1);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
index ad48f34..14668ee 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
@@ -28,7 +28,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;

Reply via email to