[
https://issues.apache.org/jira/browse/GOBBLIN-1650?focusedWorklogId=774260&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-774260
]
ASF GitHub Bot logged work on GOBBLIN-1650:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 24/May/22 22:09
Start Date: 24/May/22 22:09
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #3511:
URL: https://github.com/apache/gobblin/pull/3511#discussion_r880986699
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -41,39 +42,50 @@
@Slf4j
public class UserQuotaManager {
public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX +
"perUserQuota";
+ public static final String PER_FLOWGROUP_QUOTA =
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
public static final String USER_JOB_QUOTA_KEY =
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
public static final String QUOTA_SEPERATOR = ":";
public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
private final Map<String, Integer> proxyUserToJobCount = new
ConcurrentHashMap<>();
+ private final Map<String, Integer> flowGroupToJobCount = new
ConcurrentHashMap<>();
private final Map<String, Integer> requesterToJobCount = new
ConcurrentHashMap<>();
private final Map<String, Integer> perUserQuota;
+ private final Map<String, Integer> perFlowGroupQuota;
Map<String, Boolean> runningDagIds = new ConcurrentHashMap<>();
private final int defaultQuota;
UserQuotaManager(Config config) {
this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY,
DEFAULT_USER_JOB_QUOTA);
- ImmutableMap.Builder<String, Integer> mapBuilder = ImmutableMap.builder();
-
+ ImmutableMap.Builder<String, Integer> userMapBuilder =
ImmutableMap.builder();
+ ImmutableMap.Builder<String, Integer> flowGroupMapBuilder =
ImmutableMap.builder();
+ // Quotas will take form of user:flowGroup:<Quota>
Review Comment:
Am I interpreting this correctly? Each user has a quota (let's call it X)
that can be applied across flowGroups. X can be distributed among flows for
different groups or all go to one flow group's flows? At a higher level each
flowGroup also has a max quota that is enforced, which is why the user quota
check occurs prior to the group level check in the code below. It would be
useful to explain this in a comment above the checkQuota function or to explain
this mapping better here.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -102,11 +115,20 @@ public void checkQuota(Dag.DagNode<JobExecutionPlan>
dagNode, boolean onInit) th
}
}
+ int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
+ DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode),
flowGroupToJobCount, dagNode, getQuotaForFlowGroup(flowGroup));
+ boolean flowGroupCheck = flowGroupQuotaIncrement >= 0;
+ if (!flowGroupCheck) {
+ requesterMessage.append(String.format(
+ "Quota exceeded for flowgroup %s on executor %s : quota=%s, requests
above quota=%d%n",
+ flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
Math.abs(flowGroupQuotaIncrement)+1-getQuotaForFlowGroup(flowGroup)));
+ }
+
// Throw errors for reach quota at the end to avoid inconsistent job counts
- if ((!proxyUserCheck || !requesterCheck) && !onInit) {
+ if ((!proxyUserCheck || !requesterCheck || !flowGroupCheck) && !onInit) {
// roll back the increased counts in this block
- String userKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
- decrementQuotaUsage(proxyUserToJobCount, userKey);
+ decrementQuotaUsage(proxyUserToJobCount,
DagManagerUtils.getUserQuotaKey(proxyUser, dagNode));
Review Comment:
why do we decrement the quota usage at this point? Should it only be
decremented after the flow finishes...?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -194,4 +217,12 @@ private int getQuotaForUser(String user) {
return this.perUserQuota.getOrDefault(user, defaultQuota);
}
+ private int getQuotaForFlowGroup(String flowGroup) {
+ return this.perFlowGroupQuota.getOrDefault(flowGroup, defaultQuota);
+ }
+
+ public boolean isDagCurrentlyRunning(Dag.DagNode<JobExecutionPlan> dagNode) {
Review Comment:
should we use this function to decrement/increment quotas?
Issue Time Tracking
-------------------
Worklog Id: (was: 774260)
Time Spent: 40m (was: 0.5h)
> Allow GaaS to enforce quotas by flowgroup
> -----------------------------------------
>
> Key: GOBBLIN-1650
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1650
> Project: Apache Gobblin
> Issue Type: New Feature
> Reporter: William Lo
> Priority: Major
> Time Spent: 40m
> Remaining Estimate: 0h
>
> Certain GaaS flows can have a large number of flows that can cause
> instability on dependent services. We want to be able to control the
> throughput of flows from a flowgroup granularity on top of a user
> granularity. We keep the quota configuration separate as there can be many
> users submitting flows to the same flowgroup.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)