This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2c5122d [GOBBLIN-1380] Add retention to failed dag state store
2c5122d is described below
commit 2c5122d2156e7b3621b1f13cc8080d96de03c700
Author: Jack Moseley <[email protected]>
AuthorDate: Tue Feb 9 11:20:57 2021 -0800
[GOBBLIN-1380] Add retention to failed dag state store
Closes #3220 from jack-moseley/dag-retention
---
.../service/modules/orchestration/DagManager.java | 63 ++++++++++++++++++++--
.../modules/orchestration/DagManagerTest.java | 3 +-
2 files changed, 61 insertions(+), 5 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index bd22f74..2b8872e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -127,6 +127,12 @@ public class DagManager extends AbstractIdleService {
private static final String DEFAULT_JOB_STATUS_RETRIEVER_CLASS =
FsJobStatusRetriever.class.getName();
private static final String DAG_STATESTORE_CLASS_KEY = DAG_MANAGER_PREFIX +
"dagStateStoreClass";
private static final String FAILED_DAG_STATESTORE_PREFIX =
"failedDagStateStore";
+ private static final String FAILED_DAG_RETENTION_TIME_UNIT =
FAILED_DAG_STATESTORE_PREFIX + ".retention.timeUnit";
+ private static final String DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT = "DAYS";
+ private static final String FAILED_DAG_RETENTION_TIME =
FAILED_DAG_STATESTORE_PREFIX + ".retention.time";
+ private static final long DEFAULT_FAILED_DAG_RETENTION_TIME = 7L;
+ public static final String FAILED_DAG_POLLING_INTERVAL =
FAILED_DAG_STATESTORE_PREFIX + ".retention.pollingIntervalMinutes";
+ public static final Integer DEFAULT_FAILED_DAG_POLLING_INTERVAL = 60;
private static final String USER_JOB_QUOTA_KEY = DAG_MANAGER_PREFIX +
"defaultJobQuota";
private static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
private static final String PER_USER_QUOTA = DAG_MANAGER_PREFIX +
"perUserQuota";
@@ -172,12 +178,15 @@ public class DagManager extends AbstractIdleService {
@Getter
private final Integer numThreads;
private final Integer pollingInterval;
+ private final Integer retentionPollingInterval;
@Getter
private final JobStatusRetriever jobStatusRetriever;
private final Config config;
private final Optional<EventSubmitter> eventSubmitter;
private final int defaultQuota;
private final Map<String, Integer> perUserQuota;
+ private final long failedDagRetentionTime;
+ private final Map<String, Dag<JobExecutionPlan>> failedDags = new
ConcurrentHashMap<>();
private volatile boolean isActive = false;
@@ -189,6 +198,7 @@ public class DagManager extends AbstractIdleService {
this.resumeQueue = initializeDagQueue(this.numThreads);
this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
this.pollingInterval = ConfigUtils.getInt(config,
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
+ this.retentionPollingInterval = ConfigUtils.getInt(config,
FAILED_DAG_POLLING_INTERVAL, DEFAULT_FAILED_DAG_POLLING_INTERVAL);
this.instrumentationEnabled = instrumentationEnabled;
if (instrumentationEnabled) {
MetricContext metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
@@ -208,6 +218,9 @@ public class DagManager extends AbstractIdleService {
} catch (ReflectiveOperationException e) {
throw new RuntimeException("Exception encountered during DagManager
initialization", e);
}
+
+ TimeUnit timeUnit = TimeUnit.valueOf(ConfigUtils.getString(config,
FAILED_DAG_RETENTION_TIME_UNIT, DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT));
+ this.failedDagRetentionTime =
timeUnit.toMillis(ConfigUtils.getLong(config, FAILED_DAG_RETENTION_TIME,
DEFAULT_FAILED_DAG_RETENTION_TIME));
}
JobStatusRetriever createJobStatusRetriever(Config config) throws
ReflectiveOperationException {
@@ -362,10 +375,12 @@ public class DagManager extends AbstractIdleService {
this.dagManagerThreads = new DagManagerThread[numThreads];
for (int i = 0; i < numThreads; i++) {
DagManagerThread dagManagerThread = new
DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
- queue[i], cancelQueue[i], resumeQueue[i],
instrumentationEnabled, defaultQuota, perUserQuota);
+ queue[i], cancelQueue[i], resumeQueue[i],
instrumentationEnabled, defaultQuota, perUserQuota, failedDags);
this.dagManagerThreads[i] = dagManagerThread;
this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0,
this.pollingInterval, TimeUnit.SECONDS);
}
+ FailedDagRetentionThread failedDagRetentionThread = new
FailedDagRetentionThread(failedDagStateStore, failedDags,
failedDagRetentionTime);
+
this.scheduledExecutorPool.scheduleAtFixedRate(failedDagRetentionThread, 0,
retentionPollingInterval, TimeUnit.MINUTES);
List<Dag<JobExecutionPlan>> dags = dagStateStore.getDags();
log.info("Loading " + dags.size() + " dags from dag state store");
for (Dag<JobExecutionPlan> dag : dags) {
@@ -399,7 +414,7 @@ public class DagManager extends AbstractIdleService {
private static final Map<String, Integer> proxyUserToJobCount = new
ConcurrentHashMap<>();
private static final Map<String, Integer> requesterToJobCount = new
ConcurrentHashMap<>();
private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
- private final Map<String, Dag<JobExecutionPlan>> failedDags = new
HashMap<>();
+ private Map<String, Dag<JobExecutionPlan>> failedDags;
private final Map<String, Dag<JobExecutionPlan>> resumingDags = new
HashMap<>();
// dagToJobs holds a map of dagId to running jobs of that dag
final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs = new
HashMap<>();
@@ -425,11 +440,12 @@ public class DagManager extends AbstractIdleService {
*/
DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore
dagStateStore, DagStateStore failedDagStateStore,
BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String>
cancelQueue, BlockingQueue<String> resumeQueue,
- boolean instrumentationEnabled, int defaultQuota, Map<String, Integer>
perUserQuota) {
+ boolean instrumentationEnabled, int defaultQuota, Map<String, Integer>
perUserQuota, Map<String, Dag<JobExecutionPlan>> failedDags) {
this.jobStatusRetriever = jobStatusRetriever;
this.dagStateStore = dagStateStore;
this.failedDagStateStore = failedDagStateStore;
try {
+ this.failedDags = failedDags;
for (Dag<JobExecutionPlan> dag : failedDagStateStore.getDags()) {
this.failedDags.put(DagManagerUtils.generateDagId(dag), dag);
}
@@ -1108,7 +1124,7 @@ public class DagManager extends AbstractIdleService {
/**
* Perform clean up. Remove a dag from the dagstore if the dag is complete
and update internal state.
*/
- private void cleanUp() {
+ private void cleanUp() throws IOException {
List<String> dagIdstoClean = new ArrayList<>();
//Clean up failed dags
for (String dagId : this.failedDagIdsFinishRunning) {
@@ -1176,6 +1192,45 @@ public class DagManager extends AbstractIdleService {
}
}
+ /**
+ * Thread that runs retention on failed dags based on their original start
time (which is the flow execution ID).
+ */
+ public static class FailedDagRetentionThread implements Runnable {
+ private final DagStateStore failedDagStateStore;
+ private final Map<String, Dag<JobExecutionPlan>> failedDags;
+ private final long failedDagRetentionTime;
+
+ FailedDagRetentionThread(DagStateStore failedDagStateStore, Map<String,
Dag<JobExecutionPlan>> failedDags, long failedDagRetentionTime) {
+ this.failedDagStateStore = failedDagStateStore;
+ this.failedDags = failedDags;
+ this.failedDagRetentionTime = failedDagRetentionTime;
+ }
+
+ @Override
+ public void run() {
+ try {
+ log.info("Cleaning failed dag state store");
+ long startTime = System.currentTimeMillis();
+ List<String> dagKeysToClean = new ArrayList<>();
+
+ for (Map.Entry<String, Dag<JobExecutionPlan>> entry :
this.failedDags.entrySet()) {
+ if (this.failedDagRetentionTime > 0L && startTime >
DagManagerUtils.getFlowExecId(entry.getValue()) + this.failedDagRetentionTime) {
+ this.failedDagStateStore.cleanUp(entry.getValue());
+ dagKeysToClean.add(entry.getKey());
+ }
+ }
+
+ for (String key : dagKeysToClean) {
+ this.failedDags.remove(key);
+ }
+
+ log.info("Cleaned " + dagKeysToClean.size() + " from the failed dag
state store");
+ } catch (Exception e) {
+ log.error("Failed to run retention on failed dag state store", e);
+ }
+ }
+ }
+
/** Stop the service. */
@Override
protected void shutDown()
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index a39457f..62fa545 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.io.FileUtils;
@@ -81,7 +82,7 @@ public class DagManagerTest {
this.cancelQueue = new LinkedBlockingQueue<>();
this.resumeQueue = new LinkedBlockingQueue<>();
this._dagManagerThread = new
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore,
_dagStateStore, queue, cancelQueue,
- resumeQueue, true, 5, new HashMap<>());
+ resumeQueue, true, 5, new HashMap<>(), new ConcurrentHashMap<>());
Field jobToDagField =
DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
jobToDagField.setAccessible(true);