This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5684ed76b Fix running counts for retried flows (#3520)
5684ed76b is described below

commit 5684ed76b5054f7727ed4187e5711f962ceabac6
Author: William Lo <[email protected]>
AuthorDate: Wed Jun 15 15:04:14 2022 -0700

    Fix running counts for retried flows (#3520)
---
 .../apache/gobblin/service/modules/orchestration/DagManager.java | 3 ++-
 .../gobblin/service/modules/orchestration/DagManagerTest.java    | 9 +++++++++
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index f57169553..dcfbe9cdf 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -963,7 +963,8 @@ public class DagManager extends AbstractIdleService {
         // By this point the quota is allocated, so it's imperative to 
increment as missing would introduce the potential to decrement below zero upon 
quota release.
         // Quota release is guaranteed, despite failure, because exception 
handling within would mark the job FAILED.
         // When the ensuing kafka message spurs DagManager processing, the 
quota is released and the counts decremented
-        if (this.metricContext != null) {
+        // Ensure that we do not double increment for flows that are retried
+        if (this.metricContext != null && 
dagNode.getValue().getCurrentAttempts() == 1) {
           getRunningJobsCounter(dagNode).inc();
           
getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::inc);
         }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index c1b0b80ef..216e28525 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -990,10 +990,19 @@ public class DagManagerTest {
 
     // Dag1 is running
     this._dagManagerThread.run();
+    SortedMap<String, Counter> allCounters = 
metricContext.getParent().get().getCounters();
+    Assert.assertEquals(allCounters.get(MetricRegistry.name(
+        ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+        ServiceMetricNames.SERVICE_USERS,
+        "user")).getCount(), 1);
     // Dag1 fails and is orchestrated again
     this._dagManagerThread.run();
     // Dag1 is running again
     this._dagManagerThread.run();
+    Assert.assertEquals(allCounters.get(MetricRegistry.name(
+        ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+        ServiceMetricNames.SERVICE_USERS,
+        "user")).getCount(), 1);
     // Dag1 is marked as complete, should be able to run the next Dag without 
hitting the quota limit
     this._dagManagerThread.run();
 

Reply via email to