umustafi commented on code in PR #3511:
URL: https://github.com/apache/gobblin/pull/3511#discussion_r880986699
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -41,39 +42,50 @@
@Slf4j
public class UserQuotaManager {
public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX +
"perUserQuota";
+ public static final String PER_FLOWGROUP_QUOTA =
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
public static final String USER_JOB_QUOTA_KEY =
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
public static final String QUOTA_SEPERATOR = ":";
public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
private final Map<String, Integer> proxyUserToJobCount = new
ConcurrentHashMap<>();
+ private final Map<String, Integer> flowGroupToJobCount = new
ConcurrentHashMap<>();
private final Map<String, Integer> requesterToJobCount = new
ConcurrentHashMap<>();
private final Map<String, Integer> perUserQuota;
+ private final Map<String, Integer> perFlowGroupQuota;
Map<String, Boolean> runningDagIds = new ConcurrentHashMap<>();
private final int defaultQuota;
UserQuotaManager(Config config) {
this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY,
DEFAULT_USER_JOB_QUOTA);
- ImmutableMap.Builder<String, Integer> mapBuilder = ImmutableMap.builder();
-
+ ImmutableMap.Builder<String, Integer> userMapBuilder =
ImmutableMap.builder();
+ ImmutableMap.Builder<String, Integer> flowGroupMapBuilder =
ImmutableMap.builder();
+ // Quotas will take form of user:flowGroup:<Quota>
Review Comment:
Am I interpreting this correctly? Each user has a quota (let's call it X)
that can be applied across flowGroups. X can be distributed among flows for
different groups or all go to one flow group's flows? At a higher level each
flowGroup also has a max quota that is enforced, which is why the user quota
check occurs prior to the group level check in the code below. It would be
useful to explain this in a comment above the checkQuota function or to explain
this mapping better here.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -102,11 +115,20 @@ public void checkQuota(Dag.DagNode<JobExecutionPlan>
dagNode, boolean onInit) th
}
}
+ int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
+ DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode),
flowGroupToJobCount, dagNode, getQuotaForFlowGroup(flowGroup));
+ boolean flowGroupCheck = flowGroupQuotaIncrement >= 0;
+ if (!flowGroupCheck) {
+ requesterMessage.append(String.format(
+ "Quota exceeded for flowgroup %s on executor %s : quota=%s, requests
above quota=%d%n",
+ flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
Math.abs(flowGroupQuotaIncrement)+1-getQuotaForFlowGroup(flowGroup)));
+ }
+
// Throw errors for reach quota at the end to avoid inconsistent job counts
- if ((!proxyUserCheck || !requesterCheck) && !onInit) {
+ if ((!proxyUserCheck || !requesterCheck || !flowGroupCheck) && !onInit) {
// roll back the increased counts in this block
- String userKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
- decrementQuotaUsage(proxyUserToJobCount, userKey);
+ decrementQuotaUsage(proxyUserToJobCount,
DagManagerUtils.getUserQuotaKey(proxyUser, dagNode));
Review Comment:
why do we decrement the quota usage at this point? Should it only be
decremented after the flow finishes...?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -194,4 +217,12 @@ private int getQuotaForUser(String user) {
return this.perUserQuota.getOrDefault(user, defaultQuota);
}
+ private int getQuotaForFlowGroup(String flowGroup) {
+ return this.perFlowGroupQuota.getOrDefault(flowGroup, defaultQuota);
+ }
+
+ public boolean isDagCurrentlyRunning(Dag.DagNode<JobExecutionPlan> dagNode) {
Review Comment:
should we use this function to decrement/increment quotas?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]