This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7350c20  [GOBBLIN-1438] Only load failed dags at time of resume
7350c20 is described below

commit 7350c20076afc83dc1110dc5e7ec1ca650a367b5
Author: Jack Moseley <[email protected]>
AuthorDate: Thu May 6 18:01:19 2021 -0700

    [GOBBLIN-1438] Only load failed dags at time of resume
    
    Closes #3272 from jack-moseley/failed-dag-memory
---
 .../service/modules/orchestration/DagManager.java  | 56 ++++++++++----------
 .../modules/orchestration/DagManagerUtils.java     | 16 ++----
 .../modules/orchestration/DagStateStore.java       | 18 +++++++
 .../modules/orchestration/FSDagStateStore.java     | 40 ++++++++++++++-
 .../modules/orchestration/MysqlDagStateStore.java  | 56 ++++++++++++++++++--
 .../modules/orchestration/DagManagerTest.java      | 59 ++++++++++++++++------
 .../modules/orchestration/FSDagStateStoreTest.java | 14 +++++
 .../orchestration/MysqlDagStateStoreTest.java      | 19 +++++--
 8 files changed, 214 insertions(+), 64 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index da46d72..5ca1fed 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service.modules.orchestration;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -188,7 +189,6 @@ public class DagManager extends AbstractIdleService {
   private final int defaultQuota;
   private final Map<String, Integer> perUserQuota;
   private final long failedDagRetentionTime;
-  private final Map<String, Dag<JobExecutionPlan>> failedDags = new 
ConcurrentHashMap<>();
 
   private volatile boolean isActive = false;
 
@@ -372,16 +372,17 @@ public class DagManager extends AbstractIdleService {
         //Initializing state store for persisting Dags.
         this.dagStateStore = createDagStateStore(config, topologySpecMap);
         this.failedDagStateStore = 
createDagStateStore(ConfigUtils.getConfigOrEmpty(config, 
FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
+        Set<String> failedDagIds = 
Collections.synchronizedSet(this.failedDagStateStore.getDagIds());
 
         //On startup, the service creates DagManagerThreads that are scheduled 
at a fixed rate.
         this.dagManagerThreads = new DagManagerThread[numThreads];
         for (int i = 0; i < numThreads; i++) {
           DagManagerThread dagManagerThread = new 
DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
-              queue[i], cancelQueue[i], resumeQueue[i], 
instrumentationEnabled, defaultQuota, perUserQuota, failedDags);
+              queue[i], cancelQueue[i], resumeQueue[i], 
instrumentationEnabled, defaultQuota, perUserQuota, failedDagIds);
           this.dagManagerThreads[i] = dagManagerThread;
           this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, 
this.pollingInterval, TimeUnit.SECONDS);
         }
-        FailedDagRetentionThread failedDagRetentionThread = new 
FailedDagRetentionThread(failedDagStateStore, failedDags, 
failedDagRetentionTime);
+        FailedDagRetentionThread failedDagRetentionThread = new 
FailedDagRetentionThread(failedDagStateStore, failedDagIds, 
failedDagRetentionTime);
         
this.scheduledExecutorPool.scheduleAtFixedRate(failedDagRetentionThread, 0, 
retentionPollingInterval, TimeUnit.MINUTES);
         List<Dag<JobExecutionPlan>> dags = dagStateStore.getDags();
         log.info("Loading " + dags.size() + " dags from dag state store");
@@ -416,7 +417,7 @@ public class DagManager extends AbstractIdleService {
     private static final Map<String, Integer> proxyUserToJobCount = new 
ConcurrentHashMap<>();
     private static final Map<String, Integer> requesterToJobCount = new 
ConcurrentHashMap<>();
     private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
-    private Map<String, Dag<JobExecutionPlan>> failedDags;
+    private Set<String> failedDagIds;
     private final Map<String, Dag<JobExecutionPlan>> resumingDags = new 
HashMap<>();
     // dagToJobs holds a map of dagId to running jobs of that dag
     final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs = new 
HashMap<>();
@@ -443,18 +444,11 @@ public class DagManager extends AbstractIdleService {
      */
     DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore 
dagStateStore, DagStateStore failedDagStateStore,
         BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> 
cancelQueue, BlockingQueue<String> resumeQueue,
-        boolean instrumentationEnabled, int defaultQuota, Map<String, Integer> 
perUserQuota, Map<String, Dag<JobExecutionPlan>> failedDags) {
+        boolean instrumentationEnabled, int defaultQuota, Map<String, Integer> 
perUserQuota, Set<String> failedDagIds) {
       this.jobStatusRetriever = jobStatusRetriever;
       this.dagStateStore = dagStateStore;
       this.failedDagStateStore = failedDagStateStore;
-      try {
-        this.failedDags = failedDags;
-        for (Dag<JobExecutionPlan> dag : failedDagStateStore.getDags()) {
-          this.failedDags.put(DagManagerUtils.generateDagId(dag), dag);
-        }
-      } catch (IOException e) {
-        log.error("Failed to load previously failed dags into memory", e);
-      }
+      this.failedDagIds = failedDagIds;
       this.queue = queue;
       this.cancelQueue = cancelQueue;
       this.resumeQueue = resumeQueue;
@@ -524,12 +518,16 @@ public class DagManager extends AbstractIdleService {
      * Begin resuming a dag by setting the status of both the dag and the 
failed/cancelled dag nodes to {@link ExecutionStatus#PENDING_RESUME},
      * and also sending events so that this status will be reflected in the 
job status state store.
      */
-    private void beginResumingDag(String dagId) {
-      Dag<JobExecutionPlan> dag = this.failedDags.get(dagId);
-      if (dag == null) {
+    private void beginResumingDag(String dagId) throws IOException {
+      if (!this.failedDagIds.contains(dagId)) {
         log.warn("No dag found with dagId " + dagId + ", so cannot resume 
flow");
         return;
       }
+      Dag<JobExecutionPlan> dag = this.failedDagStateStore.getDag(dagId);
+      if (dag == null) {
+        log.error("Dag " + dagId + " was found in memory but not found in 
failed dag state store");
+        return;
+      }
 
       long flowResumeTime = System.currentTimeMillis();
 
@@ -575,7 +573,7 @@ public class DagManager extends AbstractIdleService {
         if (dagReady) {
           this.dagStateStore.writeCheckpoint(dag.getValue());
           this.failedDagStateStore.cleanUp(dag.getValue());
-          this.failedDags.remove(dag.getKey());
+          this.failedDagIds.remove(dag.getKey());
           this.resumingDags.remove(dag.getKey());
           initialize(dag.getValue());
         }
@@ -1190,7 +1188,7 @@ public class DagManager extends AbstractIdleService {
       } catch (IOException e) {
         log.error("Failed to add dag " + dagId + " to failed dag state store", 
e);
       }
-      this.failedDags.put(dagId, this.dags.get(dagId));
+      this.failedDagIds.add(dagId);
     }
 
     /**
@@ -1229,12 +1227,12 @@ public class DagManager extends AbstractIdleService {
    */
   public static class FailedDagRetentionThread implements Runnable {
     private final DagStateStore failedDagStateStore;
-    private final Map<String, Dag<JobExecutionPlan>> failedDags;
+    private final Set<String> failedDagIds;
     private final long failedDagRetentionTime;
 
-    FailedDagRetentionThread(DagStateStore failedDagStateStore, Map<String, 
Dag<JobExecutionPlan>> failedDags, long failedDagRetentionTime) {
+    FailedDagRetentionThread(DagStateStore failedDagStateStore, Set<String> 
failedDagIds, long failedDagRetentionTime) {
       this.failedDagStateStore = failedDagStateStore;
-      this.failedDags = failedDags;
+      this.failedDagIds = failedDagIds;
       this.failedDagRetentionTime = failedDagRetentionTime;
     }
 
@@ -1243,20 +1241,20 @@ public class DagManager extends AbstractIdleService {
       try {
         log.info("Cleaning failed dag state store");
         long startTime = System.currentTimeMillis();
-        List<String> dagKeysToClean = new ArrayList<>();
+        List<String> dagIdsToClean = new ArrayList<>();
 
-        for (Map.Entry<String, Dag<JobExecutionPlan>> entry : 
this.failedDags.entrySet()) {
-          if (this.failedDagRetentionTime > 0L && startTime > 
DagManagerUtils.getFlowExecId(entry.getValue()) + this.failedDagRetentionTime) {
-            this.failedDagStateStore.cleanUp(entry.getValue());
-            dagKeysToClean.add(entry.getKey());
+        for (String dagId : this.failedDagIds) {
+          if (this.failedDagRetentionTime > 0L && startTime > 
DagManagerUtils.getFlowExecId(dagId) + this.failedDagRetentionTime) {
+            this.failedDagStateStore.cleanUp(dagId);
+            dagIdsToClean.add(dagId);
           }
         }
 
-        for (String key : dagKeysToClean) {
-          this.failedDags.remove(key);
+        for (String dagId : dagIdsToClean) {
+          this.failedDagIds.remove(dagId);
         }
 
-      log.info("Cleaned " + dagKeysToClean.size() + " from the failed dag 
state store");
+      log.info("Cleaned " + dagIdsToClean.size() + " dags from the failed dag 
state store");
       } catch (Exception e) {
         log.error("Failed to run retention on failed dag state store", e);
       }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index b55b0e1..e2bd0d1 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -71,6 +71,10 @@ public class DagManagerUtils {
     return 
jobSpec.getConfig().getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
   }
 
+  static long getFlowExecId(String dagId) {
+    return Long.parseLong(dagId.substring(dagId.lastIndexOf('_') + 1));
+  }
+
   /**
    * Generate a dagId from the given {@link Dag} instance.
    * @param dag instance of a {@link Dag}.
@@ -97,18 +101,6 @@ public class DagManagerUtils {
   }
 
   /**
-   * Generate a FlowId from the given {@link Dag} instance.
-   * FlowId, comparing to DagId, doesn't contain FlowExecutionId so different 
{@link Dag} could possibly have same
-   * {@link FlowId}.
-   * @param dag
-   * @return
-   */
-  static String generateFlowIdInString(Dag<JobExecutionPlan> dag) {
-    FlowId flowId = getFlowId(dag);
-    return Joiner.on("_").join(flowId.getFlowGroup(), flowId.getFlowName());
-  }
-
-  /**
    * Returns a fully-qualified {@link Dag} name that includes: (flowGroup, 
flowName, flowExecutionId).
    * @param dag
    * @return fully qualified name of the underlying {@link Dag}.
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
index cee56c2..5624963 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -47,9 +48,26 @@ public interface DagStateStore {
   void cleanUp(Dag<JobExecutionPlan> dag) throws IOException;
 
   /**
+   * Delete the {@link Dag} from the backing store, typically upon completion 
of execution.
+   * @param dagId The ID of the dag to clean up.
+   */
+  void cleanUp(String dagId) throws IOException;
+
+  /**
    * Load all currently running {@link Dag}s from the underlying store. 
Typically, invoked when a new {@link DagManager}
    * takes over or on restart of service.
    * @return a {@link List} of currently running {@link Dag}s.
    */
   List<Dag<JobExecutionPlan>> getDags() throws IOException;
+
+  /**
+   * Return a single dag from the dag state store.
+   * @param dagId The ID of the dag to load.
+   */
+  Dag<JobExecutionPlan> getDag(String dagId) throws IOException;
+
+  /**
+   * Return a list of all dag IDs contained in the dag state store.
+   */
+  Set<String> getDagIds() throws IOException;
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
index de8cd0b..6cc3cdb 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
@@ -21,10 +21,14 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Type;
 import java.net.URI;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.StringUtils;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
@@ -97,7 +101,15 @@ public class FSDagStateStore implements DagStateStore {
    */
   @Override
   public synchronized void cleanUp(Dag<JobExecutionPlan> dag) {
-    String fileName = DagManagerUtils.generateDagId(dag) + DAG_FILE_EXTENSION;
+    cleanUp(DagManagerUtils.generateDagId(dag));
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public synchronized void cleanUp(String dagId) {
+    String fileName = dagId + DAG_FILE_EXTENSION;
 
     //Delete the dag checkpoint file from the checkpoint directory
     File checkpointFile = new File(this.dagCheckpointDir, fileName);
@@ -121,6 +133,18 @@ public class FSDagStateStore implements DagStateStore {
   }
 
   /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Dag<JobExecutionPlan> getDag(String dagId) throws IOException {
+    File file = new File(this.dagCheckpointDir, dagId + DAG_FILE_EXTENSION);
+    if (!file.exists()) {
+      return null;
+    }
+    return getDag(file);
+  }
+
+  /**
    * Return a {@link Dag} given a file name.
    * @param dagFile
    * @return the {@link Dag} associated with the dagFile.
@@ -132,6 +156,20 @@ public class FSDagStateStore implements DagStateStore {
   }
 
   /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Set<String> getDagIds() {
+    Set<String> dagIds = new HashSet<>();
+    File dagCheckpointFolder = new File(this.dagCheckpointDir);
+
+    for (File file : dagCheckpointFolder.listFiles((dir, name) -> 
name.endsWith(DAG_FILE_EXTENSION))) {
+      dagIds.add(StringUtils.removeEnd(file.getName(), DAG_FILE_EXTENSION));
+    }
+    return dagIds;
+  }
+
+  /**
    * Serialize a {@link Dag<JobExecutionPlan>}.
    * @param dag A Dag parametrized by type {@link JobExecutionPlan}.
    * @return a JSON string representation of the Dag object.
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
index 1eb356a0..95ff619 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
@@ -22,12 +22,15 @@ import java.lang.reflect.Type;
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.MysqlStateStoreEntryManager;
 import org.apache.gobblin.metastore.MysqlStateStoreFactory;
 import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.predicates.StateStorePredicate;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.spec_serde.GsonSerDe;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -36,14 +39,15 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
 import 
org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicates;
 import com.google.gson.JsonDeserializer;
 import com.google.gson.JsonSerializer;
 import com.google.gson.reflect.TypeToken;
 import com.typesafe.config.Config;
 
 import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX;
-import static 
org.apache.gobblin.service.modules.orchestration.DagManagerUtils.generateFlowIdInString;
-import static 
org.apache.gobblin.service.modules.orchestration.DagManagerUtils.getFlowExecId;
+import static 
org.apache.gobblin.service.modules.orchestration.DagManagerUtils.generateDagId;
 
 
 /**
@@ -102,13 +106,19 @@ public class MysqlDagStateStore implements DagStateStore {
   @Override
   public void writeCheckpoint(Dag<JobExecutionPlan> dag)
       throws IOException {
-    mysqlStateStore.put(generateFlowIdInString(dag), getFlowExecId(dag) + "", 
convertDagIntoState(dag));
+    mysqlStateStore.put(getStoreNameFromDagId(generateDagId(dag)), 
getTableNameFromDagId(generateDagId(dag)), convertDagIntoState(dag));
   }
 
   @Override
   public void cleanUp(Dag<JobExecutionPlan> dag)
       throws IOException {
-    mysqlStateStore.delete(generateFlowIdInString(dag), getFlowExecId(dag) + 
"");
+    cleanUp(generateDagId(dag));
+  }
+
+  @Override
+  public void cleanUp(String dagId)
+      throws IOException {
+    mysqlStateStore.delete(getStoreNameFromDagId(dagId), 
getTableNameFromDagId(dagId));
   }
 
   @Override
@@ -117,6 +127,44 @@ public class MysqlDagStateStore implements DagStateStore {
     return 
mysqlStateStore.getAll().stream().map(this::convertStateObjIntoDag).collect(Collectors.toList());
   }
 
+  @Override
+  public Dag<JobExecutionPlan> getDag(String dagId) throws IOException {
+    List<State> states = mysqlStateStore.getAll(getStoreNameFromDagId(dagId), 
getTableNameFromDagId(dagId));
+    if (states.isEmpty()) {
+      return null;
+    }
+    return convertStateObjIntoDag(states.get(0));
+  }
+
+  @Override
+  public Set<String> getDagIds() throws IOException {
+    List<MysqlStateStoreEntryManager> entries = 
(List<MysqlStateStoreEntryManager>) mysqlStateStore
+        .getMetadataForTables(new 
StateStorePredicate(Predicates.alwaysTrue()));
+    return entries.stream().map(entry -> entryToDagId(entry.getStoreName(), 
entry.getTableName())).collect(Collectors.toSet());
+  }
+
+  /**
+   * Convert a state store entry into a dag ID
+   * e.g. storeName = group1_name1, tableName = 1234 gives dagId 
group1_name1_1234
+   */
+  private String entryToDagId(String storeName, String tableName) {
+    return Joiner.on("_").join(storeName, tableName);
+  }
+
+  /**
+   * Return a storeName given a dagId. Store name is defined as 
flowGroup_flowName.
+   */
+  private String getStoreNameFromDagId(String dagId) {
+    return dagId.substring(0, dagId.lastIndexOf('_'));
+  }
+
+  /**
+   * Return a tableName given a dagId. Table name is defined as the 
flowExecutionId.
+   */
+  private String getTableNameFromDagId(String dagId) {
+    return dagId.substring(dagId.lastIndexOf('_') + 1);
+  }
+
   /**
    * For {@link Dag} to work with {@link MysqlStateStore}, it needs to be 
packaged into a {@link State} object.
    * The way that it does is simply serialize the {@link Dag} first and use 
the key {@link #DAG_KEY_IN_STATE}
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 70cb769..7749903 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -23,15 +23,16 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.gobblin.metrics.event.TimingEvent;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -69,7 +70,7 @@ public class DagManagerTest {
   private Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag;
   private Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs;
   private Map<String, Dag<JobExecutionPlan>> dags;
-  private Map<String, Dag<JobExecutionPlan>> failedDags;
+  private Set<String> failedDagIds;
 
   @BeforeClass
   public void setUp() throws Exception {
@@ -78,12 +79,13 @@ public class DagManagerTest {
         .withValue(FSDagStateStore.DAG_STATESTORE_DIR, 
ConfigValueFactory.fromAnyRef(this.dagStateStoreDir));
 
     this._dagStateStore = new FSDagStateStore(config, new HashMap<>());
+    DagStateStore failedDagStateStore = new InMemoryDagStateStore();
     this._jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
     this.queue = new LinkedBlockingQueue<>();
     this.cancelQueue = new LinkedBlockingQueue<>();
     this.resumeQueue = new LinkedBlockingQueue<>();
-    this._dagManagerThread = new 
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, 
_dagStateStore, queue, cancelQueue,
-        resumeQueue, true, 5, new HashMap<>(), new ConcurrentHashMap<>());
+    this._dagManagerThread = new 
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, 
failedDagStateStore, queue, cancelQueue,
+        resumeQueue, true, 5, new HashMap<>(), new HashSet<>());
 
     Field jobToDagField = 
DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
     jobToDagField.setAccessible(true);
@@ -97,9 +99,9 @@ public class DagManagerTest {
     dagsField.setAccessible(true);
     this.dags = (Map<String, Dag<JobExecutionPlan>>) 
dagsField.get(this._dagManagerThread);
 
-    Field failedDagsField = 
DagManager.DagManagerThread.class.getDeclaredField("failedDags");
-    failedDagsField.setAccessible(true);
-    this.failedDags = (Map<String, Dag<JobExecutionPlan>>) 
failedDagsField.get(this._dagManagerThread);
+    Field failedDagIdsField = 
DagManager.DagManagerThread.class.getDeclaredField("failedDagIds");
+    failedDagIdsField.setAccessible(true);
+    this.failedDagIds = (Set<String>) 
failedDagIdsField.get(this._dagManagerThread);
   }
 
   /**
@@ -335,7 +337,7 @@ public class DagManagerTest {
   }
 
   @Test (dependsOnMethods = "testFailedDag")
-  public void testResumeDag() throws URISyntaxException, IOException {
+  public void testResumeDag() throws URISyntaxException {
     long flowExecutionId = System.currentTimeMillis();
     String flowGroupId = "0";
     String flowGroup = "group" + flowGroupId;
@@ -394,19 +396,19 @@ public class DagManagerTest {
       this._dagManagerThread.run();
     }
 
-    Assert.assertTrue(this.failedDags.containsKey(dagId));
+    Assert.assertTrue(this.failedDagIds.contains(dagId));
 
     // Resume dag
     this.resumeQueue.offer(dagId);
 
     // Job2 rerunning
     this._dagManagerThread.run();
-    Assert.assertFalse(this.failedDags.containsKey(dagId));
+    Assert.assertFalse(this.failedDagIds.contains(dagId));
     Assert.assertTrue(this.dags.containsKey(dagId));
 
     // Job2 complete
     this._dagManagerThread.run();
-    Assert.assertFalse(this.failedDags.containsKey(dagId));
+    Assert.assertFalse(this.failedDagIds.contains(dagId));
     Assert.assertFalse(this.dags.containsKey(dagId));
   }
 
@@ -618,20 +620,19 @@ public class DagManagerTest {
     this.cancelQueue.offer(dagId);
 
     this._dagManagerThread.run();
-    Assert.assertTrue(this.failedDags.containsKey(dagId));
-    Assert.assertTrue((this.failedDags.get(dagId).getFlowEvent() == null));
+    Assert.assertTrue(this.failedDagIds.contains(dagId));
 
     // Resume dag
     this.resumeQueue.offer(dagId);
 
     // Job2 rerunning
     this._dagManagerThread.run();
-    Assert.assertFalse(this.failedDags.containsKey(dagId));
+    Assert.assertFalse(this.failedDagIds.contains(dagId));
     Assert.assertTrue(this.dags.containsKey(dagId));
 
     // Job2 complete
     this._dagManagerThread.run();
-    Assert.assertFalse(this.failedDags.containsKey(dagId));
+    Assert.assertFalse(this.failedDagIds.contains(dagId));
     Assert.assertFalse(this.dags.containsKey(dagId));
   }
 
@@ -639,4 +640,32 @@ public class DagManagerTest {
   public void cleanUp() throws Exception {
     FileUtils.deleteDirectory(new File(this.dagStateStoreDir));
   }
+
+  public static class InMemoryDagStateStore implements DagStateStore {
+    private final Map<String, Dag<JobExecutionPlan>> dags = new 
ConcurrentHashMap<>();
+
+    public void writeCheckpoint(Dag<JobExecutionPlan> dag) {
+      dags.put(DagManagerUtils.generateDagId(dag), dag);
+    }
+
+    public void cleanUp(Dag<JobExecutionPlan> dag) {
+      cleanUp(DagManagerUtils.generateDagId(dag));
+    }
+
+    public void cleanUp(String dagId) {
+      dags.remove(dagId);
+    }
+
+    public List<Dag<JobExecutionPlan>> getDags() {
+      return new ArrayList<>(dags.values());
+    }
+
+    public Dag<JobExecutionPlan> getDag(String dagId) {
+      return dags.get(dagId);
+    }
+
+    public Set<String> getDagIds() {
+      return dags.keySet();
+    }
+  }
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
index b2fa5f5..849b880 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
@@ -23,6 +23,7 @@ import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.commons.io.FileUtils;
@@ -108,6 +109,11 @@ public class FSDagStateStoreTest {
     Assert.assertTrue(dagFile.exists());
     this._dagStateStore.cleanUp(dag);
     Assert.assertFalse(dagFile.exists());
+
+    this._dagStateStore.writeCheckpoint(dag);
+    Assert.assertTrue(dagFile.exists());
+    this._dagStateStore.cleanUp(DagManagerUtils.generateDagId(dag));
+    Assert.assertFalse(dagFile.exists());
   }
 
   @Test (dependsOnMethods = "testCleanUp")
@@ -123,6 +129,8 @@ public class FSDagStateStoreTest {
 
     List<Dag<JobExecutionPlan>> dags = this._dagStateStore.getDags();
     Assert.assertEquals(dags.size(), 2);
+    Dag<JobExecutionPlan> singleDag = 
this._dagStateStore.getDag(DagManagerUtils.generateDagId(dags.get(0)));
+    dags.add(singleDag);
     for (Dag<JobExecutionPlan> dag: dags) {
       Assert.assertEquals(dag.getNodes().size(), 2);
       Assert.assertEquals(dag.getStartNodes().size(), 1);
@@ -133,6 +141,12 @@ public class FSDagStateStoreTest {
       
Assert.assertTrue(Boolean.parseBoolean(dag.getNodes().get(0).getValue().getJobFuture().get().get().toString()));
       
Assert.assertTrue(Boolean.parseBoolean(dag.getNodes().get(1).getValue().getJobFuture().get().get().toString()));
     }
+
+    Set<String> dagIds = this._dagStateStore.getDagIds();
+    Assert.assertEquals(dagIds.size(), 2);
+    for (Dag<JobExecutionPlan> dag: dags) {
+      Assert.assertTrue(dagIds.contains(DagManagerUtils.generateDagId(dag)));
+    }
   }
 
   @AfterClass
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
index a4d39e6..60f0f6c 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
@@ -21,6 +21,7 @@ import java.net.URI;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.dbcp.BasicDataSource;
 import org.testng.Assert;
@@ -72,12 +73,24 @@ public class MysqlDagStateStoreTest {
 
 
   @Test
-  public void testWriteCheckpointAndGetAll() throws Exception{
+  public void testWriteCheckpointAndGet() throws Exception{
     Dag<JobExecutionPlan> dag_0 = DagTestUtils.buildDag("random_0", 123L);
     Dag<JobExecutionPlan> dag_1 = DagTestUtils.buildDag("random_1", 456L);
     _dagStateStore.writeCheckpoint(dag_0);
     _dagStateStore.writeCheckpoint(dag_1);
 
+    // Verify get one dag
+    Dag<JobExecutionPlan> dag = 
_dagStateStore.getDag(DagManagerUtils.generateDagId(dag_0));
+    Assert.assertEquals(dag.getNodes().get(0), dag_0.getNodes().get(0));
+    Assert.assertEquals(dag.getNodes().get(1), dag_0.getNodes().get(1));
+
+    // Verify get dagIds
+    Set<String> dagIds = _dagStateStore.getDagIds();
+    Assert.assertEquals(dagIds.size(), 2);
+    Assert.assertTrue(dagIds.contains(DagManagerUtils.generateDagId(dag_0)));
+    Assert.assertTrue(dagIds.contains(DagManagerUtils.generateDagId(dag_1)));
+
+    // Verify get all dags
     List<Dag<JobExecutionPlan>> dags = _dagStateStore.getDags();
     Assert.assertEquals(dags.size(), 2);
 
@@ -121,7 +134,7 @@ public class MysqlDagStateStoreTest {
     }
   }
 
-  @Test (dependsOnMethods = "testWriteCheckpointAndGetAll")
+  @Test (dependsOnMethods = "testWriteCheckpointAndGet")
   public void testCleanUp() throws Exception {
     Dag<JobExecutionPlan> dag_0 = DagTestUtils.buildDag("random_0", 123L);
     Dag<JobExecutionPlan> dag_1 = DagTestUtils.buildDag("random_1", 456L);
@@ -132,7 +145,7 @@ public class MysqlDagStateStoreTest {
     Assert.assertEquals(dags.size(), 2);
 
     _dagStateStore.cleanUp(dags.get(0));
-    _dagStateStore.cleanUp(dags.get(1));
+    _dagStateStore.cleanUp(DagManagerUtils.generateDagId(dags.get(1)));
 
     dags = _dagStateStore.getDags();
     Assert.assertEquals(dags.size(), 0);

Reply via email to