This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a497969a [GOBBLIN-1638] Fix unbalanced running count metrics due to 
Azkaban failures (#3499)
3a497969a is described below

commit 3a497969a8888fe730151fb5bfbcab281b15eeb2
Author: William Lo <[email protected]>
AuthorDate: Wed May 4 17:30:19 2022 -0700

    [GOBBLIN-1638] Fix unbalanced running count metrics due to Azkaban failures 
(#3499)
    
    * Fix unbalanced running count metrics due to Azkaban failures
    
    * Fix tests
    
    * Address comments
    
    * Rename test
    
    * Update comment with review
---
 .../service/modules/orchestration/AzkabanClient.java     |  2 --
 .../service/modules/orchestration/DagManager.java        | 13 ++++++++-----
 .../service/modules/orchestration/UserQuotaManager.java  |  1 -
 .../modules/orchestration/UserQuotaManagerTest.java      | 16 ++++++++++++++--
 4 files changed, 22 insertions(+), 10 deletions(-)

diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
index 70d8b06b2..c8e538032 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
@@ -42,7 +42,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import lombok.Builder;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.ObjectUtils;
@@ -113,7 +112,6 @@ public class AzkabanClient implements Closeable {
 
     this.retryer = RetryerBuilder.<AzkabanClientStatus>newBuilder()
         .retryIfExceptionOfType(InvalidSessionException.class)
-        .retryIfExceptionOfType(TimeoutException.class)
         
.withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(this.requestTimeout.toMillis(),
 TimeUnit.MILLISECONDS,
             this.executorService))
         .withWaitStrategy(WaitStrategies.exponentialWait(60, TimeUnit.SECONDS))
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 3a06a5179..41b407905 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -952,6 +952,14 @@ public class DagManager extends AbstractIdleService {
         TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? 
this.eventSubmitter.get().
             getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : 
null;
 
+        // Increment job count before submitting the job onto the spec 
producer, in case that throws an exception.
+        // By this point the quota is allocated, so it's imperative to 
increment as missing would introduce the potential to decrement below zero upon 
quota release.
+        // Quota release is guaranteed, despite failure, because exception 
handling within would mark the job FAILED.
+        // When the ensuing kafka message spurs DagManager processing, the 
quota is released and the counts decremented
+        if (this.metricContext != null) {
+          getRunningJobsCounter(dagNode).inc();
+          
getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::inc);
+        }
         // Submit the job to the SpecProducer, which in turn performs the 
actual job submission to the SpecExecutor instance.
         // The SpecProducer implementations submit the job to the underlying 
executor and return when the submission is complete,
         // either successfully or unsuccessfully. To catch any exceptions in 
the job submission, the DagManagerThread
@@ -961,11 +969,6 @@ public class DagManager extends AbstractIdleService {
         //Persist the dag
         
this.dagStateStore.writeCheckpoint(this.dags.get(DagManagerUtils.generateDagId(dagNode)));
 
-        if (this.metricContext != null) {
-          getRunningJobsCounter(dagNode).inc();
-          
getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::inc);
-        }
-
         addSpecFuture.get();
 
         jobMetadata.put(TimingEvent.METADATA_MESSAGE, 
producer.getExecutionLink(addSpecFuture, specExecutorUri));
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
index 58b212db2..d28787f7a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
@@ -72,7 +72,6 @@ public class UserQuotaManager {
     Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for 
which quota is increased
     StringBuilder requesterMessage = new StringBuilder();
     runningDagIds.put(DagManagerUtils.generateDagId(dagNode), true);
-
     if (proxyUser != null) {
       proxyQuotaIncrement = 
incrementJobCountAndCheckUserQuota(proxyUserToJobCount, proxyUser, dagNode);
       proxyUserCheck = proxyQuotaIncrement >= 0;  // proxy user quota check 
succeeds
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
index bb8e6a6cc..f3fdf1251 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
@@ -35,8 +35,7 @@ public class UserQuotaManagerTest {
   @BeforeClass
   public void setUp() {
     Config quotaConfig = ConfigFactory.empty()
-        .withValue(UserQuotaManager.PER_USER_QUOTA, 
ConfigValueFactory.fromAnyRef("user:1"))
-        .withValue(UserQuotaManager.PER_USER_QUOTA, 
ConfigValueFactory.fromAnyRef("user2:1"));
+        .withValue(UserQuotaManager.PER_USER_QUOTA, 
ConfigValueFactory.fromAnyRef("user:1,user2:1,user3:1"));
     this._quotaManager = new UserQuotaManager(quotaConfig);
   }
 
@@ -68,4 +67,17 @@ public class UserQuotaManagerTest {
       this._quotaManager.checkQuota(dags.get(1).getNodes().get(0), false);
     });
   }
+
+  @Test
+  public void testMultipleRemoveQuotasIdempotent() throws Exception {
+    List<Dag<JobExecutionPlan>> dags = DagManagerTest.buildDagList(2, "user3", 
ConfigFactory.empty());
+
+    // Ensure that the current attempt is 1, normally done by DagManager
+    dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1);
+    dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1);
+
+    this._quotaManager.checkQuota(dags.get(0).getNodes().get(0), false);
+    
Assert.assertTrue(this._quotaManager.releaseQuota(dags.get(0).getNodes().get(0)));
+    
Assert.assertFalse(this._quotaManager.releaseQuota(dags.get(0).getNodes().get(0)));
+  }
 }

Reply via email to