This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 631e1d6  [GOBBLIN-1435] Update flow state gauge name and fix bug when 
rerunning flow
631e1d6 is described below

commit 631e1d67333d4a75e372269ab8bb88d802b20e41
Author: Jack Moseley <[email protected]>
AuthorDate: Thu Apr 22 13:00:31 2021 -0700

    [GOBBLIN-1435] Update flow state gauge name and fix bug when rerunning flow
    
    Closes #3269 from jack-moseley/gauge-fix
---
 .../gobblin/service/modules/orchestration/DagManager.java   | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 195d9be..2641b92 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -69,6 +69,7 @@ import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.RequesterService;
 import org.apache.gobblin.service.ServiceRequester;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -652,12 +653,13 @@ public class DagManager extends AbstractIdleService {
         }
       }
 
-      String flowId = DagManagerUtils.getFlowId(dag).toString();
-      if (!flowGauges.containsKey(flowId)) {
-        String flowStateGaugeName = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowId, 
ServiceMetricNames.RUNNING_STATUS);
-        flowGauges.put(flowId, FlowState.RUNNING);
+      FlowId flowId = DagManagerUtils.getFlowId(dag);
+      if (!flowGauges.containsKey(flowId.toString())) {
+        String flowStateGaugeName = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, 
flowId.getFlowGroup(),
+            flowId.getFlowName(), ServiceMetricNames.RUNNING_STATUS);
+        flowGauges.put(flowId.toString(), FlowState.RUNNING);
         ContextAwareGauge<Integer> gauge = RootMetricContext
-            .get().newContextAwareGauge(flowStateGaugeName, () -> 
flowGauges.get(flowId).value);
+            .get().newContextAwareGauge(flowStateGaugeName, () -> 
flowGauges.get(flowId.toString()).value);
         RootMetricContext.get().register(flowStateGaugeName, gauge);
       }
 
@@ -670,6 +672,7 @@ public class DagManager extends AbstractIdleService {
 
       // Set flow status to running
       DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, 
TimingEvent.FlowTimings.FLOW_RUNNING);
+      flowGauges.put(flowId.toString(), FlowState.RUNNING);
 
       // Report the orchestration delay the first time the Dag is initialized. 
Orchestration delay is defined as
       // the time difference between the instant when a flow first transitions 
to the running state and the instant

Reply via email to