This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7350c20 [GOBBLIN-1438] Only load failed dags at time of resume
7350c20 is described below
commit 7350c20076afc83dc1110dc5e7ec1ca650a367b5
Author: Jack Moseley <[email protected]>
AuthorDate: Thu May 6 18:01:19 2021 -0700
[GOBBLIN-1438] Only load failed dags at time of resume
Closes #3272 from jack-moseley/failed-dag-memory
---
.../service/modules/orchestration/DagManager.java | 56 ++++++++++----------
.../modules/orchestration/DagManagerUtils.java | 16 ++----
.../modules/orchestration/DagStateStore.java | 18 +++++++
.../modules/orchestration/FSDagStateStore.java | 40 ++++++++++++++-
.../modules/orchestration/MysqlDagStateStore.java | 56 ++++++++++++++++++--
.../modules/orchestration/DagManagerTest.java | 59 ++++++++++++++++------
.../modules/orchestration/FSDagStateStoreTest.java | 14 +++++
.../orchestration/MysqlDagStateStoreTest.java | 19 +++++--
8 files changed, 214 insertions(+), 64 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index da46d72..5ca1fed 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -188,7 +189,6 @@ public class DagManager extends AbstractIdleService {
private final int defaultQuota;
private final Map<String, Integer> perUserQuota;
private final long failedDagRetentionTime;
- private final Map<String, Dag<JobExecutionPlan>> failedDags = new
ConcurrentHashMap<>();
private volatile boolean isActive = false;
@@ -372,16 +372,17 @@ public class DagManager extends AbstractIdleService {
//Initializing state store for persisting Dags.
this.dagStateStore = createDagStateStore(config, topologySpecMap);
this.failedDagStateStore =
createDagStateStore(ConfigUtils.getConfigOrEmpty(config,
FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
+ Set<String> failedDagIds =
Collections.synchronizedSet(this.failedDagStateStore.getDagIds());
//On startup, the service creates DagManagerThreads that are scheduled
at a fixed rate.
this.dagManagerThreads = new DagManagerThread[numThreads];
for (int i = 0; i < numThreads; i++) {
DagManagerThread dagManagerThread = new
DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
- queue[i], cancelQueue[i], resumeQueue[i],
instrumentationEnabled, defaultQuota, perUserQuota, failedDags);
+ queue[i], cancelQueue[i], resumeQueue[i],
instrumentationEnabled, defaultQuota, perUserQuota, failedDagIds);
this.dagManagerThreads[i] = dagManagerThread;
this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0,
this.pollingInterval, TimeUnit.SECONDS);
}
- FailedDagRetentionThread failedDagRetentionThread = new
FailedDagRetentionThread(failedDagStateStore, failedDags,
failedDagRetentionTime);
+ FailedDagRetentionThread failedDagRetentionThread = new
FailedDagRetentionThread(failedDagStateStore, failedDagIds,
failedDagRetentionTime);
this.scheduledExecutorPool.scheduleAtFixedRate(failedDagRetentionThread, 0,
retentionPollingInterval, TimeUnit.MINUTES);
List<Dag<JobExecutionPlan>> dags = dagStateStore.getDags();
log.info("Loading " + dags.size() + " dags from dag state store");
@@ -416,7 +417,7 @@ public class DagManager extends AbstractIdleService {
private static final Map<String, Integer> proxyUserToJobCount = new
ConcurrentHashMap<>();
private static final Map<String, Integer> requesterToJobCount = new
ConcurrentHashMap<>();
private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
- private Map<String, Dag<JobExecutionPlan>> failedDags;
+ private Set<String> failedDagIds;
private final Map<String, Dag<JobExecutionPlan>> resumingDags = new
HashMap<>();
// dagToJobs holds a map of dagId to running jobs of that dag
final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs = new
HashMap<>();
@@ -443,18 +444,11 @@ public class DagManager extends AbstractIdleService {
*/
DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore
dagStateStore, DagStateStore failedDagStateStore,
BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String>
cancelQueue, BlockingQueue<String> resumeQueue,
- boolean instrumentationEnabled, int defaultQuota, Map<String, Integer>
perUserQuota, Map<String, Dag<JobExecutionPlan>> failedDags) {
+ boolean instrumentationEnabled, int defaultQuota, Map<String, Integer>
perUserQuota, Set<String> failedDagIds) {
this.jobStatusRetriever = jobStatusRetriever;
this.dagStateStore = dagStateStore;
this.failedDagStateStore = failedDagStateStore;
- try {
- this.failedDags = failedDags;
- for (Dag<JobExecutionPlan> dag : failedDagStateStore.getDags()) {
- this.failedDags.put(DagManagerUtils.generateDagId(dag), dag);
- }
- } catch (IOException e) {
- log.error("Failed to load previously failed dags into memory", e);
- }
+ this.failedDagIds = failedDagIds;
this.queue = queue;
this.cancelQueue = cancelQueue;
this.resumeQueue = resumeQueue;
@@ -524,12 +518,16 @@ public class DagManager extends AbstractIdleService {
* Begin resuming a dag by setting the status of both the dag and the
failed/cancelled dag nodes to {@link ExecutionStatus#PENDING_RESUME},
* and also sending events so that this status will be reflected in the
job status state store.
*/
- private void beginResumingDag(String dagId) {
- Dag<JobExecutionPlan> dag = this.failedDags.get(dagId);
- if (dag == null) {
+ private void beginResumingDag(String dagId) throws IOException {
+ if (!this.failedDagIds.contains(dagId)) {
log.warn("No dag found with dagId " + dagId + ", so cannot resume
flow");
return;
}
+ Dag<JobExecutionPlan> dag = this.failedDagStateStore.getDag(dagId);
+ if (dag == null) {
+ log.error("Dag " + dagId + " was found in memory but not found in
failed dag state store");
+ return;
+ }
long flowResumeTime = System.currentTimeMillis();
@@ -575,7 +573,7 @@ public class DagManager extends AbstractIdleService {
if (dagReady) {
this.dagStateStore.writeCheckpoint(dag.getValue());
this.failedDagStateStore.cleanUp(dag.getValue());
- this.failedDags.remove(dag.getKey());
+ this.failedDagIds.remove(dag.getKey());
this.resumingDags.remove(dag.getKey());
initialize(dag.getValue());
}
@@ -1190,7 +1188,7 @@ public class DagManager extends AbstractIdleService {
} catch (IOException e) {
log.error("Failed to add dag " + dagId + " to failed dag state store",
e);
}
- this.failedDags.put(dagId, this.dags.get(dagId));
+ this.failedDagIds.add(dagId);
}
/**
@@ -1229,12 +1227,12 @@ public class DagManager extends AbstractIdleService {
*/
public static class FailedDagRetentionThread implements Runnable {
private final DagStateStore failedDagStateStore;
- private final Map<String, Dag<JobExecutionPlan>> failedDags;
+ private final Set<String> failedDagIds;
private final long failedDagRetentionTime;
- FailedDagRetentionThread(DagStateStore failedDagStateStore, Map<String,
Dag<JobExecutionPlan>> failedDags, long failedDagRetentionTime) {
+ FailedDagRetentionThread(DagStateStore failedDagStateStore, Set<String>
failedDagIds, long failedDagRetentionTime) {
this.failedDagStateStore = failedDagStateStore;
- this.failedDags = failedDags;
+ this.failedDagIds = failedDagIds;
this.failedDagRetentionTime = failedDagRetentionTime;
}
@@ -1243,20 +1241,20 @@ public class DagManager extends AbstractIdleService {
try {
log.info("Cleaning failed dag state store");
long startTime = System.currentTimeMillis();
- List<String> dagKeysToClean = new ArrayList<>();
+ List<String> dagIdsToClean = new ArrayList<>();
- for (Map.Entry<String, Dag<JobExecutionPlan>> entry :
this.failedDags.entrySet()) {
- if (this.failedDagRetentionTime > 0L && startTime >
DagManagerUtils.getFlowExecId(entry.getValue()) + this.failedDagRetentionTime) {
- this.failedDagStateStore.cleanUp(entry.getValue());
- dagKeysToClean.add(entry.getKey());
+ for (String dagId : this.failedDagIds) {
+ if (this.failedDagRetentionTime > 0L && startTime >
DagManagerUtils.getFlowExecId(dagId) + this.failedDagRetentionTime) {
+ this.failedDagStateStore.cleanUp(dagId);
+ dagIdsToClean.add(dagId);
}
}
- for (String key : dagKeysToClean) {
- this.failedDags.remove(key);
+ for (String dagId : dagIdsToClean) {
+ this.failedDagIds.remove(dagId);
}
- log.info("Cleaned " + dagKeysToClean.size() + " from the failed dag
state store");
+ log.info("Cleaned " + dagIdsToClean.size() + " dags from the failed dag
state store");
} catch (Exception e) {
log.error("Failed to run retention on failed dag state store", e);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index b55b0e1..e2bd0d1 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -71,6 +71,10 @@ public class DagManagerUtils {
return
jobSpec.getConfig().getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
}
+ static long getFlowExecId(String dagId) {
+ return Long.parseLong(dagId.substring(dagId.lastIndexOf('_') + 1));
+ }
+
/**
* Generate a dagId from the given {@link Dag} instance.
* @param dag instance of a {@link Dag}.
@@ -97,18 +101,6 @@ public class DagManagerUtils {
}
/**
- * Generate a FlowId from the given {@link Dag} instance.
- * FlowId, comparing to DagId, doesn't contain FlowExecutionId so different
{@link Dag} could possibly have same
- * {@link FlowId}.
- * @param dag
- * @return
- */
- static String generateFlowIdInString(Dag<JobExecutionPlan> dag) {
- FlowId flowId = getFlowId(dag);
- return Joiner.on("_").join(flowId.getFlowGroup(), flowId.getFlowName());
- }
-
- /**
* Returns a fully-qualified {@link Dag} name that includes: (flowGroup,
flowName, flowExecutionId).
* @param dag
* @return fully qualified name of the underlying {@link Dag}.
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
index cee56c2..5624963 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
import java.util.List;
+import java.util.Set;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -47,9 +48,26 @@ public interface DagStateStore {
void cleanUp(Dag<JobExecutionPlan> dag) throws IOException;
/**
+ * Delete the {@link Dag} from the backing store, typically upon completion
of execution.
+ * @param dagId The ID of the dag to clean up.
+ */
+ void cleanUp(String dagId) throws IOException;
+
+ /**
* Load all currently running {@link Dag}s from the underlying store.
Typically, invoked when a new {@link DagManager}
* takes over or on restart of service.
* @return a {@link List} of currently running {@link Dag}s.
*/
List<Dag<JobExecutionPlan>> getDags() throws IOException;
+
+ /**
+ * Return a single dag from the dag state store.
+ * @param dagId The ID of the dag to load.
+ */
+ Dag<JobExecutionPlan> getDag(String dagId) throws IOException;
+
+ /**
+ * Return a list of all dag IDs contained in the dag state store.
+ */
+ Set<String> getDagIds() throws IOException;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
index de8cd0b..6cc3cdb 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
@@ -21,10 +21,14 @@ import java.io.File;
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URI;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
@@ -97,7 +101,15 @@ public class FSDagStateStore implements DagStateStore {
*/
@Override
public synchronized void cleanUp(Dag<JobExecutionPlan> dag) {
- String fileName = DagManagerUtils.generateDagId(dag) + DAG_FILE_EXTENSION;
+ cleanUp(DagManagerUtils.generateDagId(dag));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public synchronized void cleanUp(String dagId) {
+ String fileName = dagId + DAG_FILE_EXTENSION;
//Delete the dag checkpoint file from the checkpoint directory
File checkpointFile = new File(this.dagCheckpointDir, fileName);
@@ -121,6 +133,18 @@ public class FSDagStateStore implements DagStateStore {
}
/**
+ * {@inheritDoc}
+ */
+ @Override
+ public Dag<JobExecutionPlan> getDag(String dagId) throws IOException {
+ File file = new File(this.dagCheckpointDir, dagId + DAG_FILE_EXTENSION);
+ if (!file.exists()) {
+ return null;
+ }
+ return getDag(file);
+ }
+
+ /**
* Return a {@link Dag} given a file name.
* @param dagFile
* @return the {@link Dag} associated with the dagFile.
@@ -132,6 +156,20 @@ public class FSDagStateStore implements DagStateStore {
}
/**
+ * {@inheritDoc}
+ */
+ @Override
+ public Set<String> getDagIds() {
+ Set<String> dagIds = new HashSet<>();
+ File dagCheckpointFolder = new File(this.dagCheckpointDir);
+
+ for (File file : dagCheckpointFolder.listFiles((dir, name) ->
name.endsWith(DAG_FILE_EXTENSION))) {
+ dagIds.add(StringUtils.removeEnd(file.getName(), DAG_FILE_EXTENSION));
+ }
+ return dagIds;
+ }
+
+ /**
* Serialize a {@link Dag<JobExecutionPlan>}.
* @param dag A Dag parametrized by type {@link JobExecutionPlan}.
* @return a JSON string representation of the Dag object.
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
index 1eb356a0..95ff619 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
@@ -22,12 +22,15 @@ import java.lang.reflect.Type;
import java.net.URI;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.MysqlStateStoreEntryManager;
import org.apache.gobblin.metastore.MysqlStateStoreFactory;
import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.predicates.StateStorePredicate;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_serde.GsonSerDe;
import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -36,14 +39,15 @@ import
org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
import
org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer;
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicates;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonSerializer;
import com.google.gson.reflect.TypeToken;
import com.typesafe.config.Config;
import static
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX;
-import static
org.apache.gobblin.service.modules.orchestration.DagManagerUtils.generateFlowIdInString;
-import static
org.apache.gobblin.service.modules.orchestration.DagManagerUtils.getFlowExecId;
+import static
org.apache.gobblin.service.modules.orchestration.DagManagerUtils.generateDagId;
/**
@@ -102,13 +106,19 @@ public class MysqlDagStateStore implements DagStateStore {
@Override
public void writeCheckpoint(Dag<JobExecutionPlan> dag)
throws IOException {
- mysqlStateStore.put(generateFlowIdInString(dag), getFlowExecId(dag) + "",
convertDagIntoState(dag));
+ mysqlStateStore.put(getStoreNameFromDagId(generateDagId(dag)),
getTableNameFromDagId(generateDagId(dag)), convertDagIntoState(dag));
}
@Override
public void cleanUp(Dag<JobExecutionPlan> dag)
throws IOException {
- mysqlStateStore.delete(generateFlowIdInString(dag), getFlowExecId(dag) +
"");
+ cleanUp(generateDagId(dag));
+ }
+
+ @Override
+ public void cleanUp(String dagId)
+ throws IOException {
+ mysqlStateStore.delete(getStoreNameFromDagId(dagId),
getTableNameFromDagId(dagId));
}
@Override
@@ -117,6 +127,44 @@ public class MysqlDagStateStore implements DagStateStore {
return
mysqlStateStore.getAll().stream().map(this::convertStateObjIntoDag).collect(Collectors.toList());
}
+ @Override
+ public Dag<JobExecutionPlan> getDag(String dagId) throws IOException {
+ List<State> states = mysqlStateStore.getAll(getStoreNameFromDagId(dagId),
getTableNameFromDagId(dagId));
+ if (states.isEmpty()) {
+ return null;
+ }
+ return convertStateObjIntoDag(states.get(0));
+ }
+
+ @Override
+ public Set<String> getDagIds() throws IOException {
+ List<MysqlStateStoreEntryManager> entries =
(List<MysqlStateStoreEntryManager>) mysqlStateStore
+ .getMetadataForTables(new
StateStorePredicate(Predicates.alwaysTrue()));
+ return entries.stream().map(entry -> entryToDagId(entry.getStoreName(),
entry.getTableName())).collect(Collectors.toSet());
+ }
+
+ /**
+ * Convert a state store entry into a dag ID
+ * e.g. storeName = group1_name1, tableName = 1234 gives dagId
group1_name1_1234
+ */
+ private String entryToDagId(String storeName, String tableName) {
+ return Joiner.on("_").join(storeName, tableName);
+ }
+
+ /**
+ * Return a storeName given a dagId. Store name is defined as
flowGroup_flowName.
+ */
+ private String getStoreNameFromDagId(String dagId) {
+ return dagId.substring(0, dagId.lastIndexOf('_'));
+ }
+
+ /**
+ * Return a tableName given a dagId. Table name is defined as the
flowExecutionId.
+ */
+ private String getTableNameFromDagId(String dagId) {
+ return dagId.substring(dagId.lastIndexOf('_') + 1);
+ }
+
/**
* For {@link Dag} to work with {@link MysqlStateStore}, it needs to be
packaged into a {@link State} object.
* The way that it does is simply serialize the {@link Dag} first and use
the key {@link #DAG_KEY_IN_STATE}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 70cb769..7749903 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -23,15 +23,16 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.io.FileUtils;
-import org.apache.gobblin.metrics.event.TimingEvent;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -69,7 +70,7 @@ public class DagManagerTest {
private Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag;
private Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs;
private Map<String, Dag<JobExecutionPlan>> dags;
- private Map<String, Dag<JobExecutionPlan>> failedDags;
+ private Set<String> failedDagIds;
@BeforeClass
public void setUp() throws Exception {
@@ -78,12 +79,13 @@ public class DagManagerTest {
.withValue(FSDagStateStore.DAG_STATESTORE_DIR,
ConfigValueFactory.fromAnyRef(this.dagStateStoreDir));
this._dagStateStore = new FSDagStateStore(config, new HashMap<>());
+ DagStateStore failedDagStateStore = new InMemoryDagStateStore();
this._jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
this.queue = new LinkedBlockingQueue<>();
this.cancelQueue = new LinkedBlockingQueue<>();
this.resumeQueue = new LinkedBlockingQueue<>();
- this._dagManagerThread = new
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore,
_dagStateStore, queue, cancelQueue,
- resumeQueue, true, 5, new HashMap<>(), new ConcurrentHashMap<>());
+ this._dagManagerThread = new
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore,
failedDagStateStore, queue, cancelQueue,
+ resumeQueue, true, 5, new HashMap<>(), new HashSet<>());
Field jobToDagField =
DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
jobToDagField.setAccessible(true);
@@ -97,9 +99,9 @@ public class DagManagerTest {
dagsField.setAccessible(true);
this.dags = (Map<String, Dag<JobExecutionPlan>>)
dagsField.get(this._dagManagerThread);
- Field failedDagsField =
DagManager.DagManagerThread.class.getDeclaredField("failedDags");
- failedDagsField.setAccessible(true);
- this.failedDags = (Map<String, Dag<JobExecutionPlan>>)
failedDagsField.get(this._dagManagerThread);
+ Field failedDagIdsField =
DagManager.DagManagerThread.class.getDeclaredField("failedDagIds");
+ failedDagIdsField.setAccessible(true);
+ this.failedDagIds = (Set<String>)
failedDagIdsField.get(this._dagManagerThread);
}
/**
@@ -335,7 +337,7 @@ public class DagManagerTest {
}
@Test (dependsOnMethods = "testFailedDag")
- public void testResumeDag() throws URISyntaxException, IOException {
+ public void testResumeDag() throws URISyntaxException {
long flowExecutionId = System.currentTimeMillis();
String flowGroupId = "0";
String flowGroup = "group" + flowGroupId;
@@ -394,19 +396,19 @@ public class DagManagerTest {
this._dagManagerThread.run();
}
- Assert.assertTrue(this.failedDags.containsKey(dagId));
+ Assert.assertTrue(this.failedDagIds.contains(dagId));
// Resume dag
this.resumeQueue.offer(dagId);
// Job2 rerunning
this._dagManagerThread.run();
- Assert.assertFalse(this.failedDags.containsKey(dagId));
+ Assert.assertFalse(this.failedDagIds.contains(dagId));
Assert.assertTrue(this.dags.containsKey(dagId));
// Job2 complete
this._dagManagerThread.run();
- Assert.assertFalse(this.failedDags.containsKey(dagId));
+ Assert.assertFalse(this.failedDagIds.contains(dagId));
Assert.assertFalse(this.dags.containsKey(dagId));
}
@@ -618,20 +620,19 @@ public class DagManagerTest {
this.cancelQueue.offer(dagId);
this._dagManagerThread.run();
- Assert.assertTrue(this.failedDags.containsKey(dagId));
- Assert.assertTrue((this.failedDags.get(dagId).getFlowEvent() == null));
+ Assert.assertTrue(this.failedDagIds.contains(dagId));
// Resume dag
this.resumeQueue.offer(dagId);
// Job2 rerunning
this._dagManagerThread.run();
- Assert.assertFalse(this.failedDags.containsKey(dagId));
+ Assert.assertFalse(this.failedDagIds.contains(dagId));
Assert.assertTrue(this.dags.containsKey(dagId));
// Job2 complete
this._dagManagerThread.run();
- Assert.assertFalse(this.failedDags.containsKey(dagId));
+ Assert.assertFalse(this.failedDagIds.contains(dagId));
Assert.assertFalse(this.dags.containsKey(dagId));
}
@@ -639,4 +640,32 @@ public class DagManagerTest {
public void cleanUp() throws Exception {
FileUtils.deleteDirectory(new File(this.dagStateStoreDir));
}
+
+ public static class InMemoryDagStateStore implements DagStateStore {
+ private final Map<String, Dag<JobExecutionPlan>> dags = new
ConcurrentHashMap<>();
+
+ public void writeCheckpoint(Dag<JobExecutionPlan> dag) {
+ dags.put(DagManagerUtils.generateDagId(dag), dag);
+ }
+
+ public void cleanUp(Dag<JobExecutionPlan> dag) {
+ cleanUp(DagManagerUtils.generateDagId(dag));
+ }
+
+ public void cleanUp(String dagId) {
+ dags.remove(dagId);
+ }
+
+ public List<Dag<JobExecutionPlan>> getDags() {
+ return new ArrayList<>(dags.values());
+ }
+
+ public Dag<JobExecutionPlan> getDag(String dagId) {
+ return dags.get(dagId);
+ }
+
+ public Set<String> getDagIds() {
+ return dags.keySet();
+ }
+ }
}
\ No newline at end of file
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
index b2fa5f5..849b880 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
@@ -23,6 +23,7 @@ import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.commons.io.FileUtils;
@@ -108,6 +109,11 @@ public class FSDagStateStoreTest {
Assert.assertTrue(dagFile.exists());
this._dagStateStore.cleanUp(dag);
Assert.assertFalse(dagFile.exists());
+
+ this._dagStateStore.writeCheckpoint(dag);
+ Assert.assertTrue(dagFile.exists());
+ this._dagStateStore.cleanUp(DagManagerUtils.generateDagId(dag));
+ Assert.assertFalse(dagFile.exists());
}
@Test (dependsOnMethods = "testCleanUp")
@@ -123,6 +129,8 @@ public class FSDagStateStoreTest {
List<Dag<JobExecutionPlan>> dags = this._dagStateStore.getDags();
Assert.assertEquals(dags.size(), 2);
+ Dag<JobExecutionPlan> singleDag =
this._dagStateStore.getDag(DagManagerUtils.generateDagId(dags.get(0)));
+ dags.add(singleDag);
for (Dag<JobExecutionPlan> dag: dags) {
Assert.assertEquals(dag.getNodes().size(), 2);
Assert.assertEquals(dag.getStartNodes().size(), 1);
@@ -133,6 +141,12 @@ public class FSDagStateStoreTest {
Assert.assertTrue(Boolean.parseBoolean(dag.getNodes().get(0).getValue().getJobFuture().get().get().toString()));
Assert.assertTrue(Boolean.parseBoolean(dag.getNodes().get(1).getValue().getJobFuture().get().get().toString()));
}
+
+ Set<String> dagIds = this._dagStateStore.getDagIds();
+ Assert.assertEquals(dagIds.size(), 2);
+ for (Dag<JobExecutionPlan> dag: dags) {
+ Assert.assertTrue(dagIds.contains(DagManagerUtils.generateDagId(dag)));
+ }
}
@AfterClass
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
index a4d39e6..60f0f6c 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
@@ -21,6 +21,7 @@ import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.dbcp.BasicDataSource;
import org.testng.Assert;
@@ -72,12 +73,24 @@ public class MysqlDagStateStoreTest {
@Test
- public void testWriteCheckpointAndGetAll() throws Exception{
+ public void testWriteCheckpointAndGet() throws Exception{
Dag<JobExecutionPlan> dag_0 = DagTestUtils.buildDag("random_0", 123L);
Dag<JobExecutionPlan> dag_1 = DagTestUtils.buildDag("random_1", 456L);
_dagStateStore.writeCheckpoint(dag_0);
_dagStateStore.writeCheckpoint(dag_1);
+ // Verify get one dag
+ Dag<JobExecutionPlan> dag =
_dagStateStore.getDag(DagManagerUtils.generateDagId(dag_0));
+ Assert.assertEquals(dag.getNodes().get(0), dag_0.getNodes().get(0));
+ Assert.assertEquals(dag.getNodes().get(1), dag_0.getNodes().get(1));
+
+ // Verify get dagIds
+ Set<String> dagIds = _dagStateStore.getDagIds();
+ Assert.assertEquals(dagIds.size(), 2);
+ Assert.assertTrue(dagIds.contains(DagManagerUtils.generateDagId(dag_0)));
+ Assert.assertTrue(dagIds.contains(DagManagerUtils.generateDagId(dag_1)));
+
+ // Verify get all dags
List<Dag<JobExecutionPlan>> dags = _dagStateStore.getDags();
Assert.assertEquals(dags.size(), 2);
@@ -121,7 +134,7 @@ public class MysqlDagStateStoreTest {
}
}
- @Test (dependsOnMethods = "testWriteCheckpointAndGetAll")
+ @Test (dependsOnMethods = "testWriteCheckpointAndGet")
public void testCleanUp() throws Exception {
Dag<JobExecutionPlan> dag_0 = DagTestUtils.buildDag("random_0", 123L);
Dag<JobExecutionPlan> dag_1 = DagTestUtils.buildDag("random_1", 456L);
@@ -132,7 +145,7 @@ public class MysqlDagStateStoreTest {
Assert.assertEquals(dags.size(), 2);
_dagStateStore.cleanUp(dags.get(0));
- _dagStateStore.cleanUp(dags.get(1));
+ _dagStateStore.cleanUp(DagManagerUtils.generateDagId(dags.get(1)));
dags = _dagStateStore.getDags();
Assert.assertEquals(dags.size(), 0);