This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5684ed76b Fix running counts for retried flows (#3520)
5684ed76b is described below
commit 5684ed76b5054f7727ed4187e5711f962ceabac6
Author: William Lo <[email protected]>
AuthorDate: Wed Jun 15 15:04:14 2022 -0700
Fix running counts for retried flows (#3520)
---
.../apache/gobblin/service/modules/orchestration/DagManager.java | 3 ++-
.../gobblin/service/modules/orchestration/DagManagerTest.java | 9 +++++++++
2 files changed, 11 insertions(+), 1 deletion(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index f57169553..dcfbe9cdf 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -963,7 +963,8 @@ public class DagManager extends AbstractIdleService {
// By this point the quota is allocated, so it's imperative to
increment as missing would introduce the potential to decrement below zero upon
quota release.
// Quota release is guaranteed, despite failure, because exception
handling within would mark the job FAILED.
// When the ensuing kafka message spurs DagManager processing, the
quota is released and the counts decremented
- if (this.metricContext != null) {
+ // Ensure that we do not double increment for flows that are retried
+ if (this.metricContext != null &&
dagNode.getValue().getCurrentAttempts() == 1) {
getRunningJobsCounter(dagNode).inc();
getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::inc);
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index c1b0b80ef..216e28525 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -990,10 +990,19 @@ public class DagManagerTest {
// Dag1 is running
this._dagManagerThread.run();
+ SortedMap<String, Counter> allCounters =
metricContext.getParent().get().getCounters();
+ Assert.assertEquals(allCounters.get(MetricRegistry.name(
+ ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.SERVICE_USERS,
+ "user")).getCount(), 1);
// Dag1 fails and is orchestrated again
this._dagManagerThread.run();
// Dag1 is running again
this._dagManagerThread.run();
+ Assert.assertEquals(allCounters.get(MetricRegistry.name(
+ ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.SERVICE_USERS,
+ "user")).getCount(), 1);
// Dag1 is marked as complete, should be able to run the next Dag without
hitting the quota limit
this._dagManagerThread.run();