This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e0a2be76 [GOBBLIN-1778] Add house keeping thread in DagManager to 
periodically sync in memory state with mysql table (#3635)
9e0a2be76 is described below

commit 9e0a2be76c18346a11b5bd9717a42008708c792c
Author: Zihan Li <[email protected]>
AuthorDate: Mon Feb 6 14:16:24 2023 -0800

    [GOBBLIN-1778] Add house keeping thread in DagManager to periodically sync 
in memory state with mysql table (#3635)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1778] Add house keeping thread in DagManager to periodically 
sync in memory state with mysql table
    
    * fix unit test
    
    * address comments
    
    ---------
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../apache/gobblin/metrics/InnerMetricContext.java |  2 +-
 .../service/modules/orchestration/DagManager.java  | 52 +++++++++++++++-------
 .../modules/orchestration/DagManagerMetrics.java   |  9 ++--
 .../modules/orchestration/DagManagerFlowTest.java  |  7 +++
 4 files changed, 50 insertions(+), 20 deletions(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java
index 273897412..28ce9bd46 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java
@@ -299,7 +299,7 @@ public class InnerMetricContext extends MetricRegistry 
implements ReportableCont
   @Override
   public synchronized boolean remove(String name) {
     MetricContext metricContext = this.metricContext.get();
-    if (metricContext != null) {
+    if (metricContext != null && this.contextAwareMetrics.get(name) != null) {
       
metricContext.removeFromMetrics(this.contextAwareMetrics.get(name).getContextAwareMetric());
     }
     return this.contextAwareMetrics.remove(name) != null && 
removeChildrenMetrics(name);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 0dd31d274..6f26dc3e5 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -135,7 +135,8 @@ public class DagManager extends AbstractIdleService {
   // Default job start SLA time if configured, measured in minutes. Default is 
10 minutes
   private static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
   private static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
-
+  private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
+  private static final int INITIAL_HOUSEKEEPING_THREAD_DELAY = 2;
   /**
    * Action to be performed on a {@link Dag}, in case of a job failure. 
Currently, we allow 2 modes:
    * <ul>
@@ -188,6 +189,9 @@ public class DagManager extends AbstractIdleService {
   private final boolean instrumentationEnabled;
   private DagStateStore dagStateStore;
   private Map<URI, TopologySpec> topologySpecMap;
+  private int houseKeepingThreadInitialDelay = 
INITIAL_HOUSEKEEPING_THREAD_DELAY;
+  @Getter
+  private ScheduledExecutorService houseKeepingThreadPool;
 
   @Getter
   private final Integer numThreads;
@@ -388,7 +392,7 @@ public class DagManager extends AbstractIdleService {
                 topologySpecMap);
         Set<String> failedDagIds = 
Collections.synchronizedSet(failedDagStateStore.getDagIds());
 
-       this.dagManagerMetrics.activate();
+        this.dagManagerMetrics.activate();
 
         UserQuotaManager quotaManager = 
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
             ConfigUtils.getString(config, 
ServiceConfigKeys.QUOTA_MANAGER_CLASS, 
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER), config);
@@ -405,10 +409,15 @@ public class DagManager extends AbstractIdleService {
         }
         FailedDagRetentionThread failedDagRetentionThread = new 
FailedDagRetentionThread(failedDagStateStore, failedDagIds, 
failedDagRetentionTime);
         
this.scheduledExecutorPool.scheduleAtFixedRate(failedDagRetentionThread, 0, 
retentionPollingInterval, TimeUnit.MINUTES);
-        List<Dag<JobExecutionPlan>> dags = dagStateStore.getDags();
-        log.info("Loading " + dags.size() + " dags from dag state store");
-        for (Dag<JobExecutionPlan> dag : dags) {
-          addDag(dag, false, false);
+        loadDagFromDagStateStore();
+        this.houseKeepingThreadPool = 
Executors.newSingleThreadScheduledExecutor();
+        for (int delay = houseKeepingThreadInitialDelay; delay < 
MAX_HOUSEKEEPING_THREAD_DELAY; delay *= 2) {
+          this.houseKeepingThreadPool.schedule(() -> {
+            try {
+              loadDagFromDagStateStore();
+            } catch (Exception e ) {
+              log.error("failed to sync dag state store due to ", e);
+            }}, delay, TimeUnit.MINUTES);
         }
         if (dagActionStore.isPresent()) {
           Collection<DagActionStore.DagAction> dagActions = 
dagActionStore.get().getDagActions();
@@ -429,6 +438,7 @@ public class DagManager extends AbstractIdleService {
         log.info("Inactivating the DagManager. Shutting down all DagManager 
threads");
         this.scheduledExecutorPool.shutdown();
         this.dagManagerMetrics.cleanup();
+        this.houseKeepingThreadPool.shutdown();
         try {
           this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, 
TimeUnit.SECONDS);
         } catch (InterruptedException e) {
@@ -441,6 +451,16 @@ public class DagManager extends AbstractIdleService {
     }
   }
 
+  private void loadDagFromDagStateStore() throws IOException {
+    List<Dag<JobExecutionPlan>> dags = dagStateStore.getDags();
+    log.info("Loading " + dags.size() + " dags from dag state store");
+    for (Dag<JobExecutionPlan> dag : dags) {
+      if (this.isActive) {
+        addDag(dag, false, false);
+      }
+    }
+  }
+
   /**
    * Each {@link DagManagerThread} performs 2 actions when scheduled:
    * <ol>
@@ -789,10 +809,10 @@ public class DagManager extends AbstractIdleService {
             submitJob(node);
           }
         } catch (Exception e) {
-            // Error occurred while processing dag, continue processing other 
dags assigned to this thread
-            log.error(String.format("Exception caught in DagManager while 
processing dag %s due to ",
-                DagManagerUtils.getFullyQualifiedDagName(node)), e);
-          }
+          // Error occurred while processing dag, continue processing other 
dags assigned to this thread
+          log.error(String.format("Exception caught in DagManager while 
processing dag %s due to ",
+              DagManagerUtils.getFullyQualifiedDagName(node)), e);
+        }
       }
 
       for (Map.Entry<String, Set<DagNode<JobExecutionPlan>>> entry: 
nextSubmitted.entrySet()) {
@@ -1170,11 +1190,11 @@ public class DagManager extends AbstractIdleService {
       log.info("Cleaning up dagId {}", dagId);
       // clears flow event after cancelled job to allow resume event status to 
be set
       this.dags.get(dagId).setFlowEvent(null);
-       try {
-         this.dagStateStore.cleanUp(dags.get(dagId));
-       } catch (IOException ioe) {
-         log.error(String.format("Failed to clean %s from backStore due to:", 
dagId), ioe);
-       }
+      try {
+        this.dagStateStore.cleanUp(dags.get(dagId));
+      } catch (IOException ioe) {
+        log.error(String.format("Failed to clean %s from backStore due to:", 
dagId), ioe);
+      }
       this.dags.remove(dagId);
       this.dagToJobs.remove(dagId);
     }
@@ -1222,7 +1242,7 @@ public class DagManager extends AbstractIdleService {
           }
         }
 
-      log.info("Cleaned " + numCleaned + " dags from the failed dag state 
store");
+        log.info("Cleaned " + numCleaned + " dags from the failed dag state 
store");
       } catch (Exception e) {
         log.error("Failed to run retention on failed dag state store", e);
       }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
index acf7c0310..a81bf39b1 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
@@ -244,8 +244,11 @@ public class DagManagerMetrics {
   }
 
   public void cleanup() {
-    // The DMThread's metrics mappings follow the lifecycle of the DMThread 
itself and so are lost by DM deactivation-reactivation but the 
RootMetricContext is a (persistent) singleton.
-    // To avoid IllegalArgumentException by the RMC preventing (re-)add of a 
metric already known, remove all metrics that a new DMThread thread would 
attempt to add (in DagManagerThread::initialize) whenever running 
post-re-enablement
-    RootMetricContext.get().removeMatching(getMetricsFilterForDagManager());
+    // Add null check so that unit test will not affect each other when we 
de-active non-instrumented DagManager
+    if(this.metricContext != null) {
+      // The DMThread's metrics mappings follow the lifecycle of the DMThread 
itself and so are lost by DM deactivation-reactivation but the 
RootMetricContext is a (persistent) singleton.
+      // To avoid IllegalArgumentException by the RMC preventing (re-)add of a 
metric already known, remove all metrics that a new DMThread thread would 
attempt to add (in DagManagerThread::initialize) whenever running 
post-re-enablement
+      RootMetricContext.get().removeMatching(getMetricsFilterForDagManager());
+    }
   }
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 59aeda41f..9366b7a8a 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -33,6 +33,7 @@ import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
 import org.mockito.Mockito;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -97,6 +98,12 @@ public class DagManagerFlowTest {
     Assert.assertEquals(dagActionStore.getDagActions().size(), 0);
   }
 
+  @AfterClass
+  public void cleanUp() throws Exception {
+    dagManager.setActive(false);
+    Assert.assertEquals(dagManager.getHouseKeepingThreadPool().isShutdown(), 
true);
+  }
+
   @Test
   void testAddDeleteSpec() throws Exception {
     long flowExecutionId1 = System.currentTimeMillis();

Reply via email to