This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 631e1d6 [GOBBLIN-1435] Update flow state gauge name and fix bug when
rerunning flow
631e1d6 is described below
commit 631e1d67333d4a75e372269ab8bb88d802b20e41
Author: Jack Moseley <[email protected]>
AuthorDate: Thu Apr 22 13:00:31 2021 -0700
[GOBBLIN-1435] Update flow state gauge name and fix bug when rerunning flow
Closes #3269 from jack-moseley/gauge-fix
---
.../gobblin/service/modules/orchestration/DagManager.java | 13 ++++++++-----
1 file changed, 8 insertions(+), 5 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 195d9be..2641b92 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -69,6 +69,7 @@ import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.RequesterService;
import org.apache.gobblin.service.ServiceRequester;
import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -652,12 +653,13 @@ public class DagManager extends AbstractIdleService {
}
}
- String flowId = DagManagerUtils.getFlowId(dag).toString();
- if (!flowGauges.containsKey(flowId)) {
- String flowStateGaugeName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowId,
ServiceMetricNames.RUNNING_STATUS);
- flowGauges.put(flowId, FlowState.RUNNING);
+ FlowId flowId = DagManagerUtils.getFlowId(dag);
+ if (!flowGauges.containsKey(flowId.toString())) {
+ String flowStateGaugeName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
flowId.getFlowGroup(),
+ flowId.getFlowName(), ServiceMetricNames.RUNNING_STATUS);
+ flowGauges.put(flowId.toString(), FlowState.RUNNING);
ContextAwareGauge<Integer> gauge = RootMetricContext
- .get().newContextAwareGauge(flowStateGaugeName, () ->
flowGauges.get(flowId).value);
+ .get().newContextAwareGauge(flowStateGaugeName, () ->
flowGauges.get(flowId.toString()).value);
RootMetricContext.get().register(flowStateGaugeName, gauge);
}
@@ -670,6 +672,7 @@ public class DagManager extends AbstractIdleService {
// Set flow status to running
DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag,
TimingEvent.FlowTimings.FLOW_RUNNING);
+ flowGauges.put(flowId.toString(), FlowState.RUNNING);
// Report the orchestration delay the first time the Dag is initialized.
Orchestration delay is defined as
// the time difference between the instant when a flow first transitions
to the running state and the instant