This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f11d68f [GOBBLIN-1421] Add running status gauge in DagManager
f11d68f is described below
commit f11d68ff07bb64f29024cb09ec2b7bf217bc0ba5
Author: Jack Moseley <[email protected]>
AuthorDate: Mon Apr 19 11:55:04 2021 -0700
[GOBBLIN-1421] Add running status gauge in DagManager
Closes #3257 from jack-moseley/status-gauge
---
.../apache/gobblin/metrics/ServiceMetricNames.java | 1 +
.../service/modules/orchestration/DagManager.java | 27 ++++++++++++++++++++++
2 files changed, 28 insertions(+)
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 89d62a4..269e5bc 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -44,6 +44,7 @@ public class ServiceMetricNames {
public static final String RUNNING_FLOWS_COUNTER = "RunningFlows";
public static final String SERVICE_USERS = "ServiceUsers";
public static final String COMPILED = "Compiled";
+ public static final String RUNNING_STATUS = "RunningStatus";
public static final String HELIX_LEADER_STATE = "HelixLeaderState";
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 2b8872e..195d9be 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -59,6 +59,7 @@ import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
@@ -427,6 +428,7 @@ public class DagManager extends AbstractIdleService {
private final int defaultQuota;
private final Map<String, Integer> perUserQuota;
private final AtomicLong orchestrationDelay = new AtomicLong(0);
+ private static Map<String, FlowState> flowGauges = Maps.newHashMap();
private JobStatusRetriever jobStatusRetriever;
private DagStateStore dagStateStore;
@@ -650,6 +652,15 @@ public class DagManager extends AbstractIdleService {
}
}
+ String flowId = DagManagerUtils.getFlowId(dag).toString();
+ if (!flowGauges.containsKey(flowId)) {
+ String flowStateGaugeName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowId,
ServiceMetricNames.RUNNING_STATUS);
+ flowGauges.put(flowId, FlowState.RUNNING);
+ ContextAwareGauge<Integer> gauge = RootMetricContext
+ .get().newContextAwareGauge(flowStateGaugeName, () ->
flowGauges.get(flowId).value);
+ RootMetricContext.get().register(flowStateGaugeName, gauge);
+ }
+
log.debug("Dag {} submitting jobs ready for execution.",
DagManagerUtils.getFullyQualifiedDagName(dag));
//Determine the next set of jobs to run and submit them for execution
Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted =
submitNext(dagId);
@@ -1136,6 +1147,7 @@ public class DagManager extends AbstractIdleService {
deleteJobState(dagId, dagNode);
}
log.info("Dag {} has finished with status FAILED; Cleaning up dag from
the state store.", dagId);
+
flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(),
FlowState.FAILED);
// send an event before cleaning up dag
DagManagerUtils.emitFlowEvent(this.eventSubmitter,
this.dags.get(dagId), TimingEvent.FlowTimings.FLOW_FAILED);
dagIdstoClean.add(dagId);
@@ -1149,6 +1161,9 @@ public class DagManager extends AbstractIdleService {
addFailedDag(dagId);
status = TimingEvent.FlowTimings.FLOW_FAILED;
this.failedDagIdsFinishAllPossible.remove(dagId);
+
flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(),
FlowState.FAILED);
+ } else {
+
flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(),
FlowState.SUCCESSFUL);
}
log.info("Dag {} has finished with status {}; Cleaning up dag from
the state store.", dagId, status);
// send an event before cleaning up dag
@@ -1192,6 +1207,18 @@ public class DagManager extends AbstractIdleService {
}
}
+ private enum FlowState {
+ FAILED(-1),
+ RUNNING(0),
+ SUCCESSFUL(1);
+
+ public int value;
+
+ FlowState(int value) {
+ this.value = value;
+ }
+ }
+
/**
* Thread that runs retention on failed dags based on their original start
time (which is the flow execution ID).
*/