This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c5122d  [GOBBLIN-1380] Add retention to failed dag state store
2c5122d is described below

commit 2c5122d2156e7b3621b1f13cc8080d96de03c700
Author: Jack Moseley <[email protected]>
AuthorDate: Tue Feb 9 11:20:57 2021 -0800

    [GOBBLIN-1380] Add retention to failed dag state store
    
    Closes #3220 from jack-moseley/dag-retention
---
 .../service/modules/orchestration/DagManager.java  | 63 ++++++++++++++++++++--
 .../modules/orchestration/DagManagerTest.java      |  3 +-
 2 files changed, 61 insertions(+), 5 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index bd22f74..2b8872e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -127,6 +127,12 @@ public class DagManager extends AbstractIdleService {
   private static final String DEFAULT_JOB_STATUS_RETRIEVER_CLASS = 
FsJobStatusRetriever.class.getName();
   private static final String DAG_STATESTORE_CLASS_KEY = DAG_MANAGER_PREFIX + 
"dagStateStoreClass";
   private static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
+  private static final String FAILED_DAG_RETENTION_TIME_UNIT = 
FAILED_DAG_STATESTORE_PREFIX + ".retention.timeUnit";
+  private static final String DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT = "DAYS";
+  private static final String FAILED_DAG_RETENTION_TIME = 
FAILED_DAG_STATESTORE_PREFIX + ".retention.time";
+  private static final long DEFAULT_FAILED_DAG_RETENTION_TIME = 7L;
+  public static final String FAILED_DAG_POLLING_INTERVAL = 
FAILED_DAG_STATESTORE_PREFIX + ".retention.pollingIntervalMinutes";
+  public static final Integer DEFAULT_FAILED_DAG_POLLING_INTERVAL = 60;
   private static final String USER_JOB_QUOTA_KEY = DAG_MANAGER_PREFIX + 
"defaultJobQuota";
   private static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
   private static final String PER_USER_QUOTA = DAG_MANAGER_PREFIX + 
"perUserQuota";
@@ -172,12 +178,15 @@ public class DagManager extends AbstractIdleService {
   @Getter
   private final Integer numThreads;
   private final Integer pollingInterval;
+  private final Integer retentionPollingInterval;
   @Getter
   private final JobStatusRetriever jobStatusRetriever;
   private final Config config;
   private final Optional<EventSubmitter> eventSubmitter;
   private final int defaultQuota;
   private final Map<String, Integer> perUserQuota;
+  private final long failedDagRetentionTime;
+  private final Map<String, Dag<JobExecutionPlan>> failedDags = new 
ConcurrentHashMap<>();
 
   private volatile boolean isActive = false;
 
@@ -189,6 +198,7 @@ public class DagManager extends AbstractIdleService {
     this.resumeQueue = initializeDagQueue(this.numThreads);
     this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
     this.pollingInterval = ConfigUtils.getInt(config, 
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
+    this.retentionPollingInterval = ConfigUtils.getInt(config, 
FAILED_DAG_POLLING_INTERVAL, DEFAULT_FAILED_DAG_POLLING_INTERVAL);
     this.instrumentationEnabled = instrumentationEnabled;
     if (instrumentationEnabled) {
       MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
@@ -208,6 +218,9 @@ public class DagManager extends AbstractIdleService {
     } catch (ReflectiveOperationException e) {
       throw new RuntimeException("Exception encountered during DagManager 
initialization", e);
     }
+
+    TimeUnit timeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, 
FAILED_DAG_RETENTION_TIME_UNIT, DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT));
+    this.failedDagRetentionTime = 
timeUnit.toMillis(ConfigUtils.getLong(config, FAILED_DAG_RETENTION_TIME, 
DEFAULT_FAILED_DAG_RETENTION_TIME));
   }
 
   JobStatusRetriever createJobStatusRetriever(Config config) throws 
ReflectiveOperationException {
@@ -362,10 +375,12 @@ public class DagManager extends AbstractIdleService {
         this.dagManagerThreads = new DagManagerThread[numThreads];
         for (int i = 0; i < numThreads; i++) {
           DagManagerThread dagManagerThread = new 
DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
-              queue[i], cancelQueue[i], resumeQueue[i], 
instrumentationEnabled, defaultQuota, perUserQuota);
+              queue[i], cancelQueue[i], resumeQueue[i], 
instrumentationEnabled, defaultQuota, perUserQuota, failedDags);
           this.dagManagerThreads[i] = dagManagerThread;
           this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, 
this.pollingInterval, TimeUnit.SECONDS);
         }
+        FailedDagRetentionThread failedDagRetentionThread = new 
FailedDagRetentionThread(failedDagStateStore, failedDags, 
failedDagRetentionTime);
+        
this.scheduledExecutorPool.scheduleAtFixedRate(failedDagRetentionThread, 0, 
retentionPollingInterval, TimeUnit.MINUTES);
         List<Dag<JobExecutionPlan>> dags = dagStateStore.getDags();
         log.info("Loading " + dags.size() + " dags from dag state store");
         for (Dag<JobExecutionPlan> dag : dags) {
@@ -399,7 +414,7 @@ public class DagManager extends AbstractIdleService {
     private static final Map<String, Integer> proxyUserToJobCount = new 
ConcurrentHashMap<>();
     private static final Map<String, Integer> requesterToJobCount = new 
ConcurrentHashMap<>();
     private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
-    private final Map<String, Dag<JobExecutionPlan>> failedDags = new 
HashMap<>();
+    private Map<String, Dag<JobExecutionPlan>> failedDags;
     private final Map<String, Dag<JobExecutionPlan>> resumingDags = new 
HashMap<>();
     // dagToJobs holds a map of dagId to running jobs of that dag
     final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs = new 
HashMap<>();
@@ -425,11 +440,12 @@ public class DagManager extends AbstractIdleService {
      */
     DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore 
dagStateStore, DagStateStore failedDagStateStore,
         BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> 
cancelQueue, BlockingQueue<String> resumeQueue,
-        boolean instrumentationEnabled, int defaultQuota, Map<String, Integer> 
perUserQuota) {
+        boolean instrumentationEnabled, int defaultQuota, Map<String, Integer> 
perUserQuota, Map<String, Dag<JobExecutionPlan>> failedDags) {
       this.jobStatusRetriever = jobStatusRetriever;
       this.dagStateStore = dagStateStore;
       this.failedDagStateStore = failedDagStateStore;
       try {
+        this.failedDags = failedDags;
         for (Dag<JobExecutionPlan> dag : failedDagStateStore.getDags()) {
           this.failedDags.put(DagManagerUtils.generateDagId(dag), dag);
         }
@@ -1108,7 +1124,7 @@ public class DagManager extends AbstractIdleService {
     /**
      * Perform clean up. Remove a dag from the dagstore if the dag is complete 
and update internal state.
      */
-    private void cleanUp() {
+    private void cleanUp() throws IOException {
       List<String> dagIdstoClean = new ArrayList<>();
       //Clean up failed dags
       for (String dagId : this.failedDagIdsFinishRunning) {
@@ -1176,6 +1192,45 @@ public class DagManager extends AbstractIdleService {
     }
   }
 
+  /**
+   * Thread that runs retention on failed dags based on their original start 
time (which is the flow execution ID).
+   */
+  public static class FailedDagRetentionThread implements Runnable {
+    private final DagStateStore failedDagStateStore;
+    private final Map<String, Dag<JobExecutionPlan>> failedDags;
+    private final long failedDagRetentionTime;
+
+    FailedDagRetentionThread(DagStateStore failedDagStateStore, Map<String, 
Dag<JobExecutionPlan>> failedDags, long failedDagRetentionTime) {
+      this.failedDagStateStore = failedDagStateStore;
+      this.failedDags = failedDags;
+      this.failedDagRetentionTime = failedDagRetentionTime;
+    }
+
+    @Override
+    public void run() {
+      try {
+        log.info("Cleaning failed dag state store");
+        long startTime = System.currentTimeMillis();
+        List<String> dagKeysToClean = new ArrayList<>();
+
+        for (Map.Entry<String, Dag<JobExecutionPlan>> entry : 
this.failedDags.entrySet()) {
+          if (this.failedDagRetentionTime > 0L && startTime > 
DagManagerUtils.getFlowExecId(entry.getValue()) + this.failedDagRetentionTime) {
+            this.failedDagStateStore.cleanUp(entry.getValue());
+            dagKeysToClean.add(entry.getKey());
+          }
+        }
+
+        for (String key : dagKeysToClean) {
+          this.failedDags.remove(key);
+        }
+
+      log.info("Cleaned " + dagKeysToClean.size() + " from the failed dag 
state store");
+      } catch (Exception e) {
+        log.error("Failed to run retention on failed dag state store", e);
+      }
+    }
+  }
+
   /** Stop the service. */
   @Override
   protected void shutDown()
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index a39457f..62fa545 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.io.FileUtils;
@@ -81,7 +82,7 @@ public class DagManagerTest {
     this.cancelQueue = new LinkedBlockingQueue<>();
     this.resumeQueue = new LinkedBlockingQueue<>();
     this._dagManagerThread = new 
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, 
_dagStateStore, queue, cancelQueue,
-        resumeQueue, true, 5, new HashMap<>());
+        resumeQueue, true, 5, new HashMap<>(), new ConcurrentHashMap<>());
 
     Field jobToDagField = 
DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
     jobToDagField.setAccessible(true);

Reply via email to