This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7b29ce291 [GOBBLIN-1650] Implement flowGroup quotas for the DagManager
(#3511)
7b29ce291 is described below
commit 7b29ce2911e5d77410c313f71d315dc8bc82cf5b
Author: William Lo <[email protected]>
AuthorDate: Fri May 27 13:15:30 2022 -0700
[GOBBLIN-1650] Implement flowGroup quotas for the DagManager (#3511)
* Implement flowGroup quotas for the DagManager
* Address review and add comments to tests
* Add guard for double increments on already tracked dags
* Fix tests
---
.../modules/orchestration/DagManagerUtils.java | 3 +
.../modules/orchestration/UserQuotaManager.java | 77 ++++++++++++++++------
.../service/modules/spec/JobExecutionPlan.java | 2 -
.../modules/orchestration/DagManagerTest.java | 58 +++++++++++++++-
.../orchestration/UserQuotaManagerTest.java | 58 +++++++++++++++-
5 files changed, 171 insertions(+), 27 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index b59a9636e..a472a37cb 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -239,6 +239,9 @@ public class DagManagerUtils {
return user + QUOTA_KEY_SEPERATOR + getSpecExecutorUri(dagNode);
}
+ static String getFlowGroupQuotaKey(String flowGroup,
DagNode<JobExecutionPlan> dagNode) {
+ return flowGroup + QUOTA_KEY_SEPERATOR + getSpecExecutorUri(dagNode);
+ }
/**
* Increment the value of {@link JobExecutionPlan#currentAttempts}
*/
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
index d28787f7a..c49cdfc7f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
@@ -26,6 +26,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.service.RequesterService;
import org.apache.gobblin.service.ServiceRequester;
import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -41,39 +42,54 @@ import org.apache.gobblin.util.ConfigUtils;
@Slf4j
public class UserQuotaManager {
public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX +
"perUserQuota";
+ public static final String PER_FLOWGROUP_QUOTA =
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
public static final String USER_JOB_QUOTA_KEY =
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
public static final String QUOTA_SEPERATOR = ":";
public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
private final Map<String, Integer> proxyUserToJobCount = new
ConcurrentHashMap<>();
+ private final Map<String, Integer> flowGroupToJobCount = new
ConcurrentHashMap<>();
private final Map<String, Integer> requesterToJobCount = new
ConcurrentHashMap<>();
private final Map<String, Integer> perUserQuota;
+ private final Map<String, Integer> perFlowGroupQuota;
Map<String, Boolean> runningDagIds = new ConcurrentHashMap<>();
private final int defaultQuota;
UserQuotaManager(Config config) {
this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY,
DEFAULT_USER_JOB_QUOTA);
- ImmutableMap.Builder<String, Integer> mapBuilder = ImmutableMap.builder();
-
+ ImmutableMap.Builder<String, Integer> userMapBuilder =
ImmutableMap.builder();
+ ImmutableMap.Builder<String, Integer> flowGroupMapBuilder =
ImmutableMap.builder();
+ // Quotas will take form of user:<Quota> and flowGroup:<Quota>
+ for (String flowGroupQuota : ConfigUtils.getStringList(config,
PER_FLOWGROUP_QUOTA)) {
+ flowGroupMapBuilder.put(flowGroupQuota.split(QUOTA_SEPERATOR)[0],
Integer.parseInt(flowGroupQuota.split(QUOTA_SEPERATOR)[1]));
+ }
+ // Keep quotas per user as well in form user:<Quota> which apply for all
flowgroups
for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA))
{
- mapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0],
Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
+ userMapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0],
Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
}
- this.perUserQuota = mapBuilder.build();
+ this.perUserQuota = userMapBuilder.build();
+ this.perFlowGroupQuota = flowGroupMapBuilder.build();
}
/**
- * Checks if the dagNode exceeds the statically configured user quota for
both the proxy user and requester user
+ * Checks if the dagNode exceeds the statically configured user quota for
both the proxy user, requester user, and flowGroup
* @throws IOException if the quota is exceeded, and logs a statement
*/
public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean
onInit) throws IOException {
+ // Dag is already being tracked, no need to double increment for retries
and multihop flows
+ if (isDagCurrentlyRunning(dagNode)) {
+ return;
+ }
String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
+ String flowGroup =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+ ConfigurationKeys.FLOW_GROUP_KEY, "");
String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
boolean proxyUserCheck = true;
- int proxyQuotaIncrement;
Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for
which quota is increased
StringBuilder requesterMessage = new StringBuilder();
runningDagIds.put(DagManagerUtils.generateDagId(dagNode), true);
if (proxyUser != null) {
- proxyQuotaIncrement =
incrementJobCountAndCheckUserQuota(proxyUserToJobCount, proxyUser, dagNode);
+ int proxyQuotaIncrement = incrementJobCountAndCheckQuota(
+ DagManagerUtils.getUserQuotaKey(proxyUser, dagNode),
proxyUserToJobCount, dagNode, getQuotaForUser(proxyUser));
proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check
succeeds
if (!proxyUserCheck) {
// add 1 to proxyUserIncrement since count starts at 0, and is
negative if quota is exceeded
@@ -90,7 +106,8 @@ public class UserQuotaManager {
List<String> uniqueRequesters =
RequesterService.deserialize(serializedRequesters).stream()
.map(ServiceRequester::getName).distinct().collect(Collectors.toList());
for (String requester : uniqueRequesters) {
- int userQuotaIncrement =
incrementJobCountAndCheckUserQuota(requesterToJobCount, requester, dagNode);
+ int userQuotaIncrement = incrementJobCountAndCheckQuota(
+ DagManagerUtils.getUserQuotaKey(requester, dagNode),
requesterToJobCount, dagNode, getQuotaForUser(requester));
boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota
check succeeds
usersQuotaIncrement.add(requester);
requesterCheck = requesterCheck && thisRequesterCheck;
@@ -102,11 +119,20 @@ public class UserQuotaManager {
}
}
+ int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
+ DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode),
flowGroupToJobCount, dagNode, getQuotaForFlowGroup(flowGroup));
+ boolean flowGroupCheck = flowGroupQuotaIncrement >= 0;
+ if (!flowGroupCheck) {
+ requesterMessage.append(String.format(
+ "Quota exceeded for flowgroup %s on executor %s : quota=%s, requests
above quota=%d%n",
+ flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
Math.abs(flowGroupQuotaIncrement)+1-getQuotaForFlowGroup(flowGroup)));
+ }
+
// Throw errors for reach quota at the end to avoid inconsistent job counts
- if ((!proxyUserCheck || !requesterCheck) && !onInit) {
+ if ((!proxyUserCheck || !requesterCheck || !flowGroupCheck) && !onInit) {
// roll back the increased counts in this block
- String userKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
- decrementQuotaUsage(proxyUserToJobCount, userKey);
+ decrementQuotaUsage(proxyUserToJobCount,
DagManagerUtils.getUserQuotaKey(proxyUser, dagNode));
+ decrementQuotaUsage(flowGroupToJobCount,
DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode));
decrementQuotaUsageForUsers(usersQuotaIncrement);
runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
throw new IOException(requesterMessage.toString());
@@ -115,14 +141,12 @@ public class UserQuotaManager {
/**
* Increment quota by one for the given map and key.
- * @return a negative number if quota is already reached for this user
- * a positive number if the quota is not reached for this user
+ * @return a negative number if quota is already reached
+ * a positive number if the quota is not reached
* the absolute value of the number is the used quota before this
increment request
* 0 if quota usage is not changed
*/
- private int incrementJobCountAndCheckUserQuota(Map<String, Integer>
quotaMap, String user, Dag.DagNode<JobExecutionPlan> dagNode) {
- String key = DagManagerUtils.getUserQuotaKey(user, dagNode);
-
+ private int incrementJobCountAndCheckQuota(String key, Map<String, Integer>
quotaMap, Dag.DagNode<JobExecutionPlan> dagNode, int quotaForKey) {
// Only increment job count for first attempt, since job is considered
running between retries
if (dagNode.getValue().getCurrentAttempts() != 1) {
return 0;
@@ -138,7 +162,7 @@ public class UserQuotaManager {
currentCount = 0;
}
- if (currentCount >= getQuotaForUser(user)) {
+ if (currentCount >= quotaForKey) {
return -currentCount; // increment must have crossed the quota
} else {
return currentCount;
@@ -159,6 +183,9 @@ public class UserQuotaManager {
String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser,
dagNode);
decrementQuotaUsage(proxyUserToJobCount, proxyUserKey);
}
+ String flowGroup =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+ ConfigurationKeys.FLOW_GROUP_KEY, "");
+ decrementQuotaUsage(flowGroupToJobCount,
DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode));
String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
if (serializedRequesters != null) {
try {
@@ -174,14 +201,14 @@ public class UserQuotaManager {
return true;
}
- private void decrementQuotaUsage(Map<String, Integer> quotaMap, String user)
{
+ private void decrementQuotaUsage(Map<String, Integer> quotaMap, String key) {
Integer currentCount;
- if (user == null) {
+ if (key == null) {
return;
}
do {
- currentCount = quotaMap.get(user);
- } while (currentCount != null && currentCount > 0 &&
!quotaMap.replace(user, currentCount, currentCount - 1));
+ currentCount = quotaMap.get(key);
+ } while (currentCount != null && currentCount > 0 &&
!quotaMap.replace(key, currentCount, currentCount - 1));
}
private void decrementQuotaUsageForUsers(Set<String>
requestersToDecreaseCount) {
@@ -194,4 +221,12 @@ public class UserQuotaManager {
return this.perUserQuota.getOrDefault(user, defaultQuota);
}
+ private int getQuotaForFlowGroup(String flowGroup) {
+ return this.perFlowGroupQuota.getOrDefault(flowGroup, defaultQuota);
+ }
+
+ private boolean isDagCurrentlyRunning(Dag.DagNode<JobExecutionPlan> dagNode)
{
+ return
this.runningDagIds.containsKey(DagManagerUtils.generateDagId(dagNode));
+ }
+
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 50d2a5e14..79374af33 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -122,8 +122,6 @@ public class JobExecutionPlan {
.withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
//Add flowName to job spec
.withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
- //Add job name
- .withValue(ConfigurationKeys.JOB_NAME_KEY,
ConfigValueFactory.fromAnyRef(jobName))
//Add flow execution id
.withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
ConfigValueFactory.fromAnyRef(flowExecutionId))
// Remove schedule due to namespace conflict with azkaban schedule
key, but still keep track if flow is scheduled or not
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 3b685011f..c1b0b80ef 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -161,8 +161,8 @@ public class DagManagerTest {
addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id).
addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).
addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION,
flowFailureOption).
- addPrimitive(AzkabanProjectConfig.USER_TO_PROXY, proxyUser).build()
- .withFallback(additionalConfig);
+ addPrimitive(AzkabanProjectConfig.USER_TO_PROXY, proxyUser).build();
+ jobConfig = additionalConfig.withFallback(jobConfig);
if ((i == 1) || (i == 2)) {
jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job0"));
} else if (i == 3) {
@@ -949,6 +949,60 @@ public class DagManagerTest {
}
@Test (dependsOnMethods = "testQuotaDecrement")
+ public void testQuotasRetryFlow() throws URISyntaxException, IOException {
+ List<Dag<JobExecutionPlan>> dagList = buildDagList(2, "user",
ConfigFactory.empty());
+ //Add a dag to the queue of dags
+ this.queue.offer(dagList.get(0));
+ Config jobConfig0 =
dagList.get(0).getNodes().get(0).getValue().getJobSpec().getConfig();
+ Config jobConfig1 =
dagList.get(1).getNodes().get(0).getValue().getJobSpec().getConfig();
+ Iterator<JobStatus> jobStatusIterator0 =
+ getMockJobStatus("flow0", "group0",
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+ "job0", "group0", String.valueOf(ExecutionStatus.ORCHESTRATED),
true);
+ // Cleanup the running job that is scheduled normally
+ Iterator<JobStatus> jobStatusIterator1 =
+ getMockJobStatus("flow0", "group0",
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+ "job0", "group0", String.valueOf(ExecutionStatus.RUNNING), true);
+ Iterator<JobStatus> jobStatusIterator2 =
+ getMockJobStatus("flow0", "group0",
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+ "job0", "group0", String.valueOf(ExecutionStatus.ORCHESTRATED));
+ Iterator<JobStatus> jobStatusIterator3 =
+ getMockJobStatus("flow0", "group0",
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+ "job0", "group0", String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIterator4 =
+ getMockJobStatus("flow1", "group1",
Long.valueOf(jobConfig1.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+ "job0", "group1", String.valueOf(ExecutionStatus.ORCHESTRATED));
+ Iterator<JobStatus> jobStatusIterator5 =
+ getMockJobStatus("flow1", "group1",
Long.valueOf(jobConfig1.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+ "job0", "group1", String.valueOf(ExecutionStatus.COMPLETE));
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.eq("flow0"),
Mockito.eq("group0"), Mockito.anyLong(),
+ Mockito.anyString(), Mockito.anyString()))
+ .thenReturn(jobStatusIterator0)
+ .thenReturn(jobStatusIterator1)
+ .thenReturn(jobStatusIterator2)
+ .thenReturn(jobStatusIterator3);
+
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.eq("flow1"),
Mockito.eq("group1"), Mockito.anyLong(),
+ Mockito.anyString(), Mockito.anyString()))
+ .thenReturn(jobStatusIterator4)
+ .thenReturn(jobStatusIterator5);
+
+ // Dag1 is running
+ this._dagManagerThread.run();
+ // Dag1 fails and is orchestrated again
+ this._dagManagerThread.run();
+ // Dag1 is running again
+ this._dagManagerThread.run();
+ // Dag1 is marked as complete, should be able to run the next Dag without
hitting the quota limit
+ this._dagManagerThread.run();
+
+ this.queue.offer(dagList.get(1));
+ this._dagManagerThread.run();
+ this._dagManagerThread.run(); // cleanup
+ }
+
+ @Test (dependsOnMethods = "testQuotasRetryFlow")
public void testEmitFlowMetricOnlyIfNotAdhoc() throws URISyntaxException,
IOException {
Long flowId = System.currentTimeMillis();
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
index f3fdf1251..98ebc6bca 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
@@ -21,6 +21,7 @@ import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.io.IOException;
import java.util.List;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.testng.Assert;
@@ -35,7 +36,8 @@ public class UserQuotaManagerTest {
@BeforeClass
public void setUp() {
Config quotaConfig = ConfigFactory.empty()
- .withValue(UserQuotaManager.PER_USER_QUOTA,
ConfigValueFactory.fromAnyRef("user:1,user2:1,user3:1"));
+ .withValue(UserQuotaManager.PER_USER_QUOTA,
ConfigValueFactory.fromAnyRef("user:1,user2:1,user3:1,user6:1"))
+ .withValue(UserQuotaManager.PER_FLOWGROUP_QUOTA,
ConfigValueFactory.fromAnyRef("group1:1,group2:2"));
this._quotaManager = new UserQuotaManager(quotaConfig);
}
@@ -55,7 +57,7 @@ public class UserQuotaManagerTest {
}
@Test
- public void testExceedsQuotaThrowsException() throws Exception {
+ public void testExceedsUserQuotaThrowsException() throws Exception {
List<Dag<JobExecutionPlan>> dags = DagManagerTest.buildDagList(2, "user2",
ConfigFactory.empty());
// Ensure that the current attempt is 1, normally done by DagManager
@@ -70,6 +72,7 @@ public class UserQuotaManagerTest {
@Test
public void testMultipleRemoveQuotasIdempotent() throws Exception {
+ // Test that multiple decrements cannot cause the number to decrease by
more than 1
List<Dag<JobExecutionPlan>> dags = DagManagerTest.buildDagList(2, "user3",
ConfigFactory.empty());
// Ensure that the current attempt is 1, normally done by DagManager
@@ -80,4 +83,55 @@ public class UserQuotaManagerTest {
Assert.assertTrue(this._quotaManager.releaseQuota(dags.get(0).getNodes().get(0)));
Assert.assertFalse(this._quotaManager.releaseQuota(dags.get(0).getNodes().get(0)));
}
+
+ @Test
+ public void testExceedsFlowGroupQuotaThrowsException() throws Exception {
+ // Test flowgroup quotas
+ List<Dag<JobExecutionPlan>> dags = DagManagerTest.buildDagList(2, "user4",
ConfigFactory.empty().withValue(
+ ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef("group1")));
+
+ // Ensure that the current attempt is 1, normally done by DagManager
+ dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1);
+ dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1);
+
+ this._quotaManager.checkQuota(dags.get(0).getNodes().get(0), false);
+ Assert.assertThrows(IOException.class, () -> {
+ this._quotaManager.checkQuota(dags.get(1).getNodes().get(0), false);
+ });
+ }
+
+
+ @Test
+ public void testUserAndFlowGroupQuotaMultipleUsersAdd() throws Exception {
+ // Test that user quota and group quotas can both be exceeded, and that
decrementing one flow will change both quotas
+ Dag<JobExecutionPlan> dag1 = DagManagerTest.buildDag("1",
System.currentTimeMillis(),DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 1, "user5",
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef("group2")));
+ Dag<JobExecutionPlan> dag2 = DagManagerTest.buildDag("2",
System.currentTimeMillis(),DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 1, "user6",
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef("group2")));
+ Dag<JobExecutionPlan> dag3 = DagManagerTest.buildDag("3",
System.currentTimeMillis(),DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 1, "user6",
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef("group3")));
+ Dag<JobExecutionPlan> dag4 = DagManagerTest.buildDag("4",
System.currentTimeMillis(),DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 1, "user5",
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef("group2")));
+ // Ensure that the current attempt is 1, normally done by DagManager
+ dag1.getNodes().get(0).getValue().setCurrentAttempts(1);
+ dag2.getNodes().get(0).getValue().setCurrentAttempts(1);
+ dag3.getNodes().get(0).getValue().setCurrentAttempts(1);
+ dag4.getNodes().get(0).getValue().setCurrentAttempts(1);
+
+ this._quotaManager.checkQuota(dag1.getNodes().get(0), false);
+ this._quotaManager.checkQuota(dag2.getNodes().get(0), false);
+
+ // Should fail due to user quota
+ Assert.assertThrows(IOException.class, () -> {
+ this._quotaManager.checkQuota(dag3.getNodes().get(0), false);
+ });
+ // Should fail due to flowgroup quota
+ Assert.assertThrows(IOException.class, () -> {
+ this._quotaManager.checkQuota(dag4.getNodes().get(0), false);
+ });
+ // should pass due to quota being released
+ this._quotaManager.releaseQuota(dag2.getNodes().get(0));
+ this._quotaManager.checkQuota(dag3.getNodes().get(0), false);
+ this._quotaManager.checkQuota(dag4.getNodes().get(0), false);
+ }
}