This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9e0a2be76 [GOBBLIN-1778] Add house keeping thread in DagManager to
periodically sync in memory state with mysql table (#3635)
9e0a2be76 is described below
commit 9e0a2be76c18346a11b5bd9717a42008708c792c
Author: Zihan Li <[email protected]>
AuthorDate: Mon Feb 6 14:16:24 2023 -0800
[GOBBLIN-1778] Add house keeping thread in DagManager to periodically sync
in memory state with mysql table (#3635)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1778] Add house keeping thread in DagManager to periodically
sync in memory state with mysql table
* fix unit test
* address comments
---------
Co-authored-by: Zihan Li <[email protected]>
---
.../apache/gobblin/metrics/InnerMetricContext.java | 2 +-
.../service/modules/orchestration/DagManager.java | 52 +++++++++++++++-------
.../modules/orchestration/DagManagerMetrics.java | 9 ++--
.../modules/orchestration/DagManagerFlowTest.java | 7 +++
4 files changed, 50 insertions(+), 20 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java
index 273897412..28ce9bd46 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java
@@ -299,7 +299,7 @@ public class InnerMetricContext extends MetricRegistry
implements ReportableCont
@Override
public synchronized boolean remove(String name) {
MetricContext metricContext = this.metricContext.get();
- if (metricContext != null) {
+ if (metricContext != null && this.contextAwareMetrics.get(name) != null) {
metricContext.removeFromMetrics(this.contextAwareMetrics.get(name).getContextAwareMetric());
}
return this.contextAwareMetrics.remove(name) != null &&
removeChildrenMetrics(name);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 0dd31d274..6f26dc3e5 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -135,7 +135,8 @@ public class DagManager extends AbstractIdleService {
// Default job start SLA time if configured, measured in minutes. Default is
10 minutes
private static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
private static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
-
+ private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
+ private static final int INITIAL_HOUSEKEEPING_THREAD_DELAY = 2;
/**
* Action to be performed on a {@link Dag}, in case of a job failure.
Currently, we allow 2 modes:
* <ul>
@@ -188,6 +189,9 @@ public class DagManager extends AbstractIdleService {
private final boolean instrumentationEnabled;
private DagStateStore dagStateStore;
private Map<URI, TopologySpec> topologySpecMap;
+ private int houseKeepingThreadInitialDelay =
INITIAL_HOUSEKEEPING_THREAD_DELAY;
+ @Getter
+ private ScheduledExecutorService houseKeepingThreadPool;
@Getter
private final Integer numThreads;
@@ -388,7 +392,7 @@ public class DagManager extends AbstractIdleService {
topologySpecMap);
Set<String> failedDagIds =
Collections.synchronizedSet(failedDagStateStore.getDagIds());
- this.dagManagerMetrics.activate();
+ this.dagManagerMetrics.activate();
UserQuotaManager quotaManager =
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
ConfigUtils.getString(config,
ServiceConfigKeys.QUOTA_MANAGER_CLASS,
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER), config);
@@ -405,10 +409,15 @@ public class DagManager extends AbstractIdleService {
}
FailedDagRetentionThread failedDagRetentionThread = new
FailedDagRetentionThread(failedDagStateStore, failedDagIds,
failedDagRetentionTime);
this.scheduledExecutorPool.scheduleAtFixedRate(failedDagRetentionThread, 0,
retentionPollingInterval, TimeUnit.MINUTES);
- List<Dag<JobExecutionPlan>> dags = dagStateStore.getDags();
- log.info("Loading " + dags.size() + " dags from dag state store");
- for (Dag<JobExecutionPlan> dag : dags) {
- addDag(dag, false, false);
+ loadDagFromDagStateStore();
+ this.houseKeepingThreadPool =
Executors.newSingleThreadScheduledExecutor();
+ for (int delay = houseKeepingThreadInitialDelay; delay <
MAX_HOUSEKEEPING_THREAD_DELAY; delay *= 2) {
+ this.houseKeepingThreadPool.schedule(() -> {
+ try {
+ loadDagFromDagStateStore();
+ } catch (Exception e ) {
+ log.error("failed to sync dag state store due to ", e);
+ }}, delay, TimeUnit.MINUTES);
}
if (dagActionStore.isPresent()) {
Collection<DagActionStore.DagAction> dagActions =
dagActionStore.get().getDagActions();
@@ -429,6 +438,7 @@ public class DagManager extends AbstractIdleService {
log.info("Inactivating the DagManager. Shutting down all DagManager
threads");
this.scheduledExecutorPool.shutdown();
this.dagManagerMetrics.cleanup();
+ this.houseKeepingThreadPool.shutdown();
try {
this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT,
TimeUnit.SECONDS);
} catch (InterruptedException e) {
@@ -441,6 +451,16 @@ public class DagManager extends AbstractIdleService {
}
}
+ private void loadDagFromDagStateStore() throws IOException {
+ List<Dag<JobExecutionPlan>> dags = dagStateStore.getDags();
+ log.info("Loading " + dags.size() + " dags from dag state store");
+ for (Dag<JobExecutionPlan> dag : dags) {
+ if (this.isActive) {
+ addDag(dag, false, false);
+ }
+ }
+ }
+
/**
* Each {@link DagManagerThread} performs 2 actions when scheduled:
* <ol>
@@ -789,10 +809,10 @@ public class DagManager extends AbstractIdleService {
submitJob(node);
}
} catch (Exception e) {
- // Error occurred while processing dag, continue processing other
dags assigned to this thread
- log.error(String.format("Exception caught in DagManager while
processing dag %s due to ",
- DagManagerUtils.getFullyQualifiedDagName(node)), e);
- }
+ // Error occurred while processing dag, continue processing other
dags assigned to this thread
+ log.error(String.format("Exception caught in DagManager while
processing dag %s due to ",
+ DagManagerUtils.getFullyQualifiedDagName(node)), e);
+ }
}
for (Map.Entry<String, Set<DagNode<JobExecutionPlan>>> entry:
nextSubmitted.entrySet()) {
@@ -1170,11 +1190,11 @@ public class DagManager extends AbstractIdleService {
log.info("Cleaning up dagId {}", dagId);
// clears flow event after cancelled job to allow resume event status to
be set
this.dags.get(dagId).setFlowEvent(null);
- try {
- this.dagStateStore.cleanUp(dags.get(dagId));
- } catch (IOException ioe) {
- log.error(String.format("Failed to clean %s from backStore due to:",
dagId), ioe);
- }
+ try {
+ this.dagStateStore.cleanUp(dags.get(dagId));
+ } catch (IOException ioe) {
+ log.error(String.format("Failed to clean %s from backStore due to:",
dagId), ioe);
+ }
this.dags.remove(dagId);
this.dagToJobs.remove(dagId);
}
@@ -1222,7 +1242,7 @@ public class DagManager extends AbstractIdleService {
}
}
- log.info("Cleaned " + numCleaned + " dags from the failed dag state
store");
+ log.info("Cleaned " + numCleaned + " dags from the failed dag state
store");
} catch (Exception e) {
log.error("Failed to run retention on failed dag state store", e);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
index acf7c0310..a81bf39b1 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
@@ -244,8 +244,11 @@ public class DagManagerMetrics {
}
public void cleanup() {
- // The DMThread's metrics mappings follow the lifecycle of the DMThread
itself and so are lost by DM deactivation-reactivation but the
RootMetricContext is a (persistent) singleton.
- // To avoid IllegalArgumentException by the RMC preventing (re-)add of a
metric already known, remove all metrics that a new DMThread thread would
attempt to add (in DagManagerThread::initialize) whenever running
post-re-enablement
- RootMetricContext.get().removeMatching(getMetricsFilterForDagManager());
+ // Add null check so that unit test will not affect each other when we
de-active non-instrumented DagManager
+ if(this.metricContext != null) {
+ // The DMThread's metrics mappings follow the lifecycle of the DMThread
itself and so are lost by DM deactivation-reactivation but the
RootMetricContext is a (persistent) singleton.
+ // To avoid IllegalArgumentException by the RMC preventing (re-)add of a
metric already known, remove all metrics that a new DMThread thread would
attempt to add (in DagManagerThread::initialize) whenever running
post-re-enablement
+ RootMetricContext.get().removeMatching(getMetricsFilterForDagManager());
+ }
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 59aeda41f..9366b7a8a 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -33,6 +33,7 @@ import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
import org.mockito.Mockito;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -97,6 +98,12 @@ public class DagManagerFlowTest {
Assert.assertEquals(dagActionStore.getDagActions().size(), 0);
}
+ @AfterClass
+ public void cleanUp() throws Exception {
+ dagManager.setActive(false);
+ Assert.assertEquals(dagManager.getHouseKeepingThreadPool().isShutdown(),
true);
+ }
+
@Test
void testAddDeleteSpec() throws Exception {
long flowExecutionId1 = System.currentTimeMillis();