This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b29ce291 [GOBBLIN-1650] Implement flowGroup quotas for the DagManager 
(#3511)
7b29ce291 is described below

commit 7b29ce2911e5d77410c313f71d315dc8bc82cf5b
Author: William Lo <[email protected]>
AuthorDate: Fri May 27 13:15:30 2022 -0700

    [GOBBLIN-1650] Implement flowGroup quotas for the DagManager (#3511)
    
    * Implement flowGroup quotas for the DagManager
    
    * Address review and add comments to tests
    
    * Add guard for double increments on already tracked dags
    
    * Fix tests
---
 .../modules/orchestration/DagManagerUtils.java     |  3 +
 .../modules/orchestration/UserQuotaManager.java    | 77 ++++++++++++++++------
 .../service/modules/spec/JobExecutionPlan.java     |  2 -
 .../modules/orchestration/DagManagerTest.java      | 58 +++++++++++++++-
 .../orchestration/UserQuotaManagerTest.java        | 58 +++++++++++++++-
 5 files changed, 171 insertions(+), 27 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index b59a9636e..a472a37cb 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -239,6 +239,9 @@ public class DagManagerUtils {
     return user + QUOTA_KEY_SEPERATOR + getSpecExecutorUri(dagNode);
   }
 
+  static String getFlowGroupQuotaKey(String flowGroup, 
DagNode<JobExecutionPlan> dagNode) {
+    return flowGroup + QUOTA_KEY_SEPERATOR + getSpecExecutorUri(dagNode);
+  }
   /**
    * Increment the value of {@link JobExecutionPlan#currentAttempts}
    */
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
index d28787f7a..c49cdfc7f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.service.RequesterService;
 import org.apache.gobblin.service.ServiceRequester;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -41,39 +42,54 @@ import org.apache.gobblin.util.ConfigUtils;
 @Slf4j
 public class UserQuotaManager {
   public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + 
"perUserQuota";
+  public static final String PER_FLOWGROUP_QUOTA = 
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
   public static final String USER_JOB_QUOTA_KEY = 
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
   public static final String QUOTA_SEPERATOR = ":";
   public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
   private final Map<String, Integer> proxyUserToJobCount = new 
ConcurrentHashMap<>();
+  private final Map<String, Integer> flowGroupToJobCount = new 
ConcurrentHashMap<>();
   private final Map<String, Integer> requesterToJobCount = new 
ConcurrentHashMap<>();
   private final Map<String, Integer> perUserQuota;
+  private final Map<String, Integer> perFlowGroupQuota;
   Map<String, Boolean> runningDagIds = new ConcurrentHashMap<>();
   private final int defaultQuota;
 
   UserQuotaManager(Config config) {
     this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, 
DEFAULT_USER_JOB_QUOTA);
-    ImmutableMap.Builder<String, Integer> mapBuilder = ImmutableMap.builder();
-
+    ImmutableMap.Builder<String, Integer> userMapBuilder = 
ImmutableMap.builder();
+    ImmutableMap.Builder<String, Integer> flowGroupMapBuilder = 
ImmutableMap.builder();
+    // Quotas will take form of user:<Quota> and flowGroup:<Quota>
+    for (String flowGroupQuota : ConfigUtils.getStringList(config, 
PER_FLOWGROUP_QUOTA)) {
+      flowGroupMapBuilder.put(flowGroupQuota.split(QUOTA_SEPERATOR)[0], 
Integer.parseInt(flowGroupQuota.split(QUOTA_SEPERATOR)[1]));
+    }
+    // Keep quotas per user as well in form user:<Quota> which apply for all 
flowgroups
     for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) 
{
-      mapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0], 
Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
+      userMapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0], 
Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
     }
-    this.perUserQuota = mapBuilder.build();
+    this.perUserQuota = userMapBuilder.build();
+    this.perFlowGroupQuota = flowGroupMapBuilder.build();
   }
 
   /**
-   * Checks if the dagNode exceeds the statically configured user quota for 
both the proxy user and requester user
+   * Checks if the dagNode exceeds the statically configured user quota for 
both the proxy user, requester user, and flowGroup
    * @throws IOException if the quota is exceeded, and logs a statement
    */
   public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean 
onInit) throws IOException {
+    // Dag is already being tracked, no need to double increment for retries 
and multihop flows
+    if (isDagCurrentlyRunning(dagNode)) {
+      return;
+    }
     String proxyUser = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), 
AzkabanProjectConfig.USER_TO_PROXY, null);
+    String flowGroup = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+        ConfigurationKeys.FLOW_GROUP_KEY, "");
     String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
     boolean proxyUserCheck = true;
-    int proxyQuotaIncrement;
     Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for 
which quota is increased
     StringBuilder requesterMessage = new StringBuilder();
     runningDagIds.put(DagManagerUtils.generateDagId(dagNode), true);
     if (proxyUser != null) {
-      proxyQuotaIncrement = 
incrementJobCountAndCheckUserQuota(proxyUserToJobCount, proxyUser, dagNode);
+      int proxyQuotaIncrement = incrementJobCountAndCheckQuota(
+          DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), 
proxyUserToJobCount, dagNode, getQuotaForUser(proxyUser));
       proxyUserCheck = proxyQuotaIncrement >= 0;  // proxy user quota check 
succeeds
       if (!proxyUserCheck) {
         // add 1 to proxyUserIncrement since count starts at 0, and is 
negative if quota is exceeded
@@ -90,7 +106,8 @@ public class UserQuotaManager {
       List<String> uniqueRequesters = 
RequesterService.deserialize(serializedRequesters).stream()
           
.map(ServiceRequester::getName).distinct().collect(Collectors.toList());
       for (String requester : uniqueRequesters) {
-        int userQuotaIncrement = 
incrementJobCountAndCheckUserQuota(requesterToJobCount, requester, dagNode);
+        int userQuotaIncrement = incrementJobCountAndCheckQuota(
+            DagManagerUtils.getUserQuotaKey(requester, dagNode), 
requesterToJobCount, dagNode, getQuotaForUser(requester));
         boolean thisRequesterCheck = userQuotaIncrement >= 0;  // user quota 
check succeeds
         usersQuotaIncrement.add(requester);
         requesterCheck = requesterCheck && thisRequesterCheck;
@@ -102,11 +119,20 @@ public class UserQuotaManager {
       }
     }
 
+    int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
+        DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), 
flowGroupToJobCount, dagNode, getQuotaForFlowGroup(flowGroup));
+    boolean flowGroupCheck = flowGroupQuotaIncrement >= 0;
+    if (!flowGroupCheck) {
+      requesterMessage.append(String.format(
+          "Quota exceeded for flowgroup %s on executor %s : quota=%s, requests 
above quota=%d%n",
+          flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup), 
Math.abs(flowGroupQuotaIncrement)+1-getQuotaForFlowGroup(flowGroup)));
+    }
+
     // Throw errors for reach quota at the end to avoid inconsistent job counts
-    if ((!proxyUserCheck || !requesterCheck) && !onInit) {
+    if ((!proxyUserCheck || !requesterCheck || !flowGroupCheck) && !onInit) {
       // roll back the increased counts in this block
-      String userKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
-      decrementQuotaUsage(proxyUserToJobCount, userKey);
+      decrementQuotaUsage(proxyUserToJobCount, 
DagManagerUtils.getUserQuotaKey(proxyUser, dagNode));
+      decrementQuotaUsage(flowGroupToJobCount, 
DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode));
       decrementQuotaUsageForUsers(usersQuotaIncrement);
       runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
       throw new IOException(requesterMessage.toString());
@@ -115,14 +141,12 @@ public class UserQuotaManager {
 
   /**
    * Increment quota by one for the given map and key.
-   * @return a negative number if quota is already reached for this user
-   *         a positive number if the quota is not reached for this user
+   * @return a negative number if quota is already reached
+   *         a positive number if the quota is not reached
    *         the absolute value of the number is the used quota before this 
increment request
    *         0 if quota usage is not changed
    */
-  private int incrementJobCountAndCheckUserQuota(Map<String, Integer> 
quotaMap, String user, Dag.DagNode<JobExecutionPlan> dagNode) {
-    String key = DagManagerUtils.getUserQuotaKey(user, dagNode);
-
+  private int incrementJobCountAndCheckQuota(String key, Map<String, Integer> 
quotaMap, Dag.DagNode<JobExecutionPlan> dagNode, int quotaForKey) {
     // Only increment job count for first attempt, since job is considered 
running between retries
     if (dagNode.getValue().getCurrentAttempts() != 1) {
       return 0;
@@ -138,7 +162,7 @@ public class UserQuotaManager {
       currentCount = 0;
     }
 
-    if (currentCount >= getQuotaForUser(user)) {
+    if (currentCount >= quotaForKey) {
       return -currentCount; // increment must have crossed the quota
     } else {
       return currentCount;
@@ -159,6 +183,9 @@ public class UserQuotaManager {
       String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, 
dagNode);
       decrementQuotaUsage(proxyUserToJobCount, proxyUserKey);
     }
+    String flowGroup = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+        ConfigurationKeys.FLOW_GROUP_KEY, "");
+    decrementQuotaUsage(flowGroupToJobCount, 
DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode));
     String serializedRequesters = 
DagManagerUtils.getSerializedRequesterList(dagNode);
     if (serializedRequesters != null) {
       try {
@@ -174,14 +201,14 @@ public class UserQuotaManager {
     return true;
   }
 
-  private void decrementQuotaUsage(Map<String, Integer> quotaMap, String user) 
{
+  private void decrementQuotaUsage(Map<String, Integer> quotaMap, String key) {
     Integer currentCount;
-    if (user == null) {
+    if (key == null) {
       return;
     }
     do {
-      currentCount = quotaMap.get(user);
-    } while (currentCount != null && currentCount > 0 && 
!quotaMap.replace(user, currentCount, currentCount - 1));
+      currentCount = quotaMap.get(key);
+    } while (currentCount != null && currentCount > 0 && 
!quotaMap.replace(key, currentCount, currentCount - 1));
   }
 
   private void decrementQuotaUsageForUsers(Set<String> 
requestersToDecreaseCount) {
@@ -194,4 +221,12 @@ public class UserQuotaManager {
     return this.perUserQuota.getOrDefault(user, defaultQuota);
   }
 
+  private int getQuotaForFlowGroup(String flowGroup) {
+    return this.perFlowGroupQuota.getOrDefault(flowGroup, defaultQuota);
+  }
+
+  private boolean isDagCurrentlyRunning(Dag.DagNode<JobExecutionPlan> dagNode) 
{
+    return 
this.runningDagIds.containsKey(DagManagerUtils.generateDagId(dagNode));
+  }
+
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 50d2a5e14..79374af33 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -122,8 +122,6 @@ public class JobExecutionPlan {
           .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
           //Add flowName to job spec
           .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
-          //Add job name
-          .withValue(ConfigurationKeys.JOB_NAME_KEY, 
ConfigValueFactory.fromAnyRef(jobName))
           //Add flow execution id
           .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
ConfigValueFactory.fromAnyRef(flowExecutionId))
           // Remove schedule due to namespace conflict with azkaban schedule 
key, but still keep track if flow is scheduled or not
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 3b685011f..c1b0b80ef 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -161,8 +161,8 @@ public class DagManagerTest {
           addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id).
           addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).
           addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION, 
flowFailureOption).
-          addPrimitive(AzkabanProjectConfig.USER_TO_PROXY, proxyUser).build()
-          .withFallback(additionalConfig);
+          addPrimitive(AzkabanProjectConfig.USER_TO_PROXY, proxyUser).build();
+      jobConfig = additionalConfig.withFallback(jobConfig);
       if ((i == 1) || (i == 2)) {
         jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job0"));
       } else if (i == 3) {
@@ -949,6 +949,60 @@ public class DagManagerTest {
   }
 
   @Test (dependsOnMethods = "testQuotaDecrement")
+  public void testQuotasRetryFlow() throws URISyntaxException, IOException {
+    List<Dag<JobExecutionPlan>> dagList = buildDagList(2, "user", 
ConfigFactory.empty());
+    //Add a dag to the queue of dags
+    this.queue.offer(dagList.get(0));
+    Config jobConfig0 = 
dagList.get(0).getNodes().get(0).getValue().getJobSpec().getConfig();
+    Config jobConfig1 = 
dagList.get(1).getNodes().get(0).getValue().getJobSpec().getConfig();
+    Iterator<JobStatus> jobStatusIterator0 =
+        getMockJobStatus("flow0", "group0", 
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+            "job0", "group0", String.valueOf(ExecutionStatus.ORCHESTRATED), 
true);
+    // Cleanup the running job that is scheduled normally
+    Iterator<JobStatus> jobStatusIterator1 =
+        getMockJobStatus("flow0", "group0", 
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+            "job0", "group0", String.valueOf(ExecutionStatus.RUNNING), true);
+    Iterator<JobStatus> jobStatusIterator2 =
+        getMockJobStatus("flow0", "group0", 
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+            "job0", "group0", String.valueOf(ExecutionStatus.ORCHESTRATED));
+    Iterator<JobStatus> jobStatusIterator3 =
+        getMockJobStatus("flow0", "group0", 
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+            "job0", "group0", String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIterator4 =
+        getMockJobStatus("flow1", "group1", 
Long.valueOf(jobConfig1.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+            "job0", "group1", String.valueOf(ExecutionStatus.ORCHESTRATED));
+    Iterator<JobStatus> jobStatusIterator5 =
+        getMockJobStatus("flow1", "group1", 
Long.valueOf(jobConfig1.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+            "job0", "group1", String.valueOf(ExecutionStatus.COMPLETE));
+    Mockito.when(_jobStatusRetriever
+        .getJobStatusesForFlowExecution(Mockito.eq("flow0"), 
Mockito.eq("group0"), Mockito.anyLong(),
+            Mockito.anyString(), Mockito.anyString()))
+        .thenReturn(jobStatusIterator0)
+        .thenReturn(jobStatusIterator1)
+        .thenReturn(jobStatusIterator2)
+        .thenReturn(jobStatusIterator3);
+
+    Mockito.when(_jobStatusRetriever
+        .getJobStatusesForFlowExecution(Mockito.eq("flow1"), 
Mockito.eq("group1"), Mockito.anyLong(),
+            Mockito.anyString(), Mockito.anyString()))
+        .thenReturn(jobStatusIterator4)
+        .thenReturn(jobStatusIterator5);
+
+    // Dag1 is running
+    this._dagManagerThread.run();
+    // Dag1 fails and is orchestrated again
+    this._dagManagerThread.run();
+    // Dag1 is running again
+    this._dagManagerThread.run();
+    // Dag1 is marked as complete, should be able to run the next Dag without 
hitting the quota limit
+    this._dagManagerThread.run();
+
+    this.queue.offer(dagList.get(1));
+    this._dagManagerThread.run();
+    this._dagManagerThread.run(); // cleanup
+  }
+
+  @Test (dependsOnMethods = "testQuotasRetryFlow")
   public void testEmitFlowMetricOnlyIfNotAdhoc() throws URISyntaxException, 
IOException {
 
     Long flowId = System.currentTimeMillis();
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
index f3fdf1251..98ebc6bca 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
@@ -21,6 +21,7 @@ import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 import java.io.IOException;
 import java.util.List;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.testng.Assert;
@@ -35,7 +36,8 @@ public class UserQuotaManagerTest {
   @BeforeClass
   public void setUp() {
     Config quotaConfig = ConfigFactory.empty()
-        .withValue(UserQuotaManager.PER_USER_QUOTA, 
ConfigValueFactory.fromAnyRef("user:1,user2:1,user3:1"));
+        .withValue(UserQuotaManager.PER_USER_QUOTA, 
ConfigValueFactory.fromAnyRef("user:1,user2:1,user3:1,user6:1"))
+        .withValue(UserQuotaManager.PER_FLOWGROUP_QUOTA, 
ConfigValueFactory.fromAnyRef("group1:1,group2:2"));
     this._quotaManager = new UserQuotaManager(quotaConfig);
   }
 
@@ -55,7 +57,7 @@ public class UserQuotaManagerTest {
   }
 
   @Test
-  public void testExceedsQuotaThrowsException() throws Exception {
+  public void testExceedsUserQuotaThrowsException() throws Exception {
     List<Dag<JobExecutionPlan>> dags = DagManagerTest.buildDagList(2, "user2", 
ConfigFactory.empty());
 
     // Ensure that the current attempt is 1, normally done by DagManager
@@ -70,6 +72,7 @@ public class UserQuotaManagerTest {
 
   @Test
   public void testMultipleRemoveQuotasIdempotent() throws Exception {
+    // Test that multiple decrements cannot cause the number to decrease by 
more than 1
     List<Dag<JobExecutionPlan>> dags = DagManagerTest.buildDagList(2, "user3", 
ConfigFactory.empty());
 
     // Ensure that the current attempt is 1, normally done by DagManager
@@ -80,4 +83,55 @@ public class UserQuotaManagerTest {
     
Assert.assertTrue(this._quotaManager.releaseQuota(dags.get(0).getNodes().get(0)));
     
Assert.assertFalse(this._quotaManager.releaseQuota(dags.get(0).getNodes().get(0)));
   }
+
+  @Test
+  public void testExceedsFlowGroupQuotaThrowsException() throws Exception {
+    // Test flowgroup quotas
+    List<Dag<JobExecutionPlan>> dags = DagManagerTest.buildDagList(2, "user4", 
ConfigFactory.empty().withValue(
+        ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef("group1")));
+
+    // Ensure that the current attempt is 1, normally done by DagManager
+    dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1);
+    dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1);
+
+    this._quotaManager.checkQuota(dags.get(0).getNodes().get(0), false);
+    Assert.assertThrows(IOException.class, () -> {
+      this._quotaManager.checkQuota(dags.get(1).getNodes().get(0), false);
+    });
+  }
+
+
+  @Test
+  public void testUserAndFlowGroupQuotaMultipleUsersAdd() throws Exception {
+    // Test that user quota and group quotas can both be exceeded, and that 
decrementing one flow will change both quotas
+    Dag<JobExecutionPlan> dag1 = DagManagerTest.buildDag("1", 
System.currentTimeMillis(),DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        1, "user5", 
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef("group2")));
+    Dag<JobExecutionPlan> dag2 = DagManagerTest.buildDag("2", 
System.currentTimeMillis(),DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        1, "user6", 
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef("group2")));
+    Dag<JobExecutionPlan> dag3 = DagManagerTest.buildDag("3", 
System.currentTimeMillis(),DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        1, "user6", 
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef("group3")));
+    Dag<JobExecutionPlan> dag4 = DagManagerTest.buildDag("4", 
System.currentTimeMillis(),DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        1, "user5", 
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef("group2")));
+    // Ensure that the current attempt is 1, normally done by DagManager
+    dag1.getNodes().get(0).getValue().setCurrentAttempts(1);
+    dag2.getNodes().get(0).getValue().setCurrentAttempts(1);
+    dag3.getNodes().get(0).getValue().setCurrentAttempts(1);
+    dag4.getNodes().get(0).getValue().setCurrentAttempts(1);
+
+    this._quotaManager.checkQuota(dag1.getNodes().get(0), false);
+    this._quotaManager.checkQuota(dag2.getNodes().get(0), false);
+
+    // Should fail due to user quota
+    Assert.assertThrows(IOException.class, () -> {
+      this._quotaManager.checkQuota(dag3.getNodes().get(0), false);
+    });
+    // Should fail due to flowgroup quota
+    Assert.assertThrows(IOException.class, () -> {
+      this._quotaManager.checkQuota(dag4.getNodes().get(0), false);
+    });
+    // should pass due to quota being released
+    this._quotaManager.releaseQuota(dag2.getNodes().get(0));
+    this._quotaManager.checkQuota(dag3.getNodes().get(0), false);
+    this._quotaManager.checkQuota(dag4.getNodes().get(0), false);
+  }
 }

Reply via email to