This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ffcd2d9dc [GOBBLIN-1631]Emit heartbeat for dagManagerThread (#3492)
ffcd2d9dc is described below
commit ffcd2d9dcf5d46c79db5bdafe894f4ce72d31aff
Author: Zihan Li <[email protected]>
AuthorDate: Wed Apr 13 15:17:00 2022 -0700
[GOBBLIN-1631]Emit heartbeat for dagManagerThread (#3492)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1631]Emit heartbeat for dagManagerThread
Co-authored-by: Zihan Li <[email protected]>
---
.../gobblin/service/modules/orchestration/DagManager.java | 10 ++++++++--
.../gobblin/service/modules/orchestration/DagManagerTest.java | 2 +-
2 files changed, 9 insertions(+), 3 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 2d89591e1..dfa231989 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.service.modules.orchestration;
+import com.codahale.metrics.Meter;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.net.URI;
@@ -135,6 +136,7 @@ public class DagManager extends AbstractIdleService {
private static final long DEFAULT_FAILED_DAG_RETENTION_TIME = 7L;
public static final String FAILED_DAG_POLLING_INTERVAL =
FAILED_DAG_STATESTORE_PREFIX + ".retention.pollingIntervalMinutes";
public static final Integer DEFAULT_FAILED_DAG_POLLING_INTERVAL = 60;
+ public static final String DAG_MANAGER_HEARTBEAT =
"gobblin.dagManager.heartbeat-%s";
// Default job start SLA time if configured, measured in minutes. Default is
10 minutes
private static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
private static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
@@ -379,7 +381,7 @@ public class DagManager extends AbstractIdleService {
for (int i = 0; i < numThreads; i++) {
DagManagerThread dagManagerThread = new
DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
runQueue[i], cancelQueue[i], resumeQueue[i],
instrumentationEnabled, failedDagIds, allSuccessfulMeter,
- allFailedMeter, this.defaultJobStartSlaTimeMillis, quotaManager);
+ allFailedMeter, this.defaultJobStartSlaTimeMillis, quotaManager,
i);
this.dagManagerThreads[i] = dagManagerThread;
this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0,
this.pollingInterval, TimeUnit.SECONDS);
}
@@ -448,13 +450,14 @@ public class DagManager extends AbstractIdleService {
private final BlockingQueue<String> cancelQueue;
private final BlockingQueue<String> resumeQueue;
private final Long defaultJobStartSlaTimeMillis;
+ private final Optional<Meter> dagManagerThreadHeartbeat;
/**
* Constructor.
*/
DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore
dagStateStore, DagStateStore failedDagStateStore,
BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String>
cancelQueue, BlockingQueue<String> resumeQueue,
boolean instrumentationEnabled, Set<String> failedDagIds,
ContextAwareMeter allSuccessfulMeter,
- ContextAwareMeter allFailedMeter, Long defaultJobStartSla,
UserQuotaManager quotaManager) {
+ ContextAwareMeter allFailedMeter, Long defaultJobStartSla,
UserQuotaManager quotaManager, int dagMangerThreadId) {
this.jobStatusRetriever = jobStatusRetriever;
this.dagStateStore = dagStateStore;
this.failedDagStateStore = failedDagStateStore;
@@ -474,10 +477,12 @@ public class DagManager extends AbstractIdleService {
ContextAwareGauge<Long> orchestrationDelayMetric =
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
orchestrationDelay::get);
this.metricContext.register(orchestrationDelayMetric);
+ this.dagManagerThreadHeartbeat =
Optional.of(this.metricContext.contextAwareMeter(String.format(DAG_MANAGER_HEARTBEAT,
dagMangerThreadId)));
} else {
this.metricContext = null;
this.eventSubmitter = Optional.absent();
this.jobStatusPolledTimer = Optional.absent();
+ this.dagManagerThreadHeartbeat = Optional.absent();
}
}
@@ -522,6 +527,7 @@ public class DagManager extends AbstractIdleService {
log.debug("Cleaning up finished dags..");
cleanUp();
log.debug("Clean up done");
+ Instrumented.markMeter(dagManagerThreadHeartbeat);
} catch (Exception e) {
log.error(String.format("Exception encountered in %s",
getClass().getName()), e);
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 9daad5fbc..e09f5bbbe 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -101,7 +101,7 @@ public class DagManagerTest {
this._gobblinServiceQuotaManager = new UserQuotaManager(quotaConfig);
this._dagManagerThread = new
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore,
_failedDagStateStore, queue, cancelQueue,
resumeQueue, true, new HashSet<>(),
metricContext.contextAwareMeter("successMeter"),
- metricContext.contextAwareMeter("failedMeter"), START_SLA_DEFAULT,
_gobblinServiceQuotaManager);
+ metricContext.contextAwareMeter("failedMeter"), START_SLA_DEFAULT,
_gobblinServiceQuotaManager, 0);
Field jobToDagField =
DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
jobToDagField.setAccessible(true);