[
https://issues.apache.org/jira/browse/GOBBLIN-1650?focusedWorklogId=774283&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-774283
]
ASF GitHub Bot logged work on GOBBLIN-1650:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 24/May/22 23:11
Start Date: 24/May/22 23:11
Worklog Time Spent: 10m
Work Description: Will-Lo commented on code in PR #3511:
URL: https://github.com/apache/gobblin/pull/3511#discussion_r881050400
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -102,11 +115,20 @@ public void checkQuota(Dag.DagNode<JobExecutionPlan>
dagNode, boolean onInit) th
}
}
+ int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
+ DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode),
flowGroupToJobCount, dagNode, getQuotaForFlowGroup(flowGroup));
+ boolean flowGroupCheck = flowGroupQuotaIncrement >= 0;
+ if (!flowGroupCheck) {
+ requesterMessage.append(String.format(
+ "Quota exceeded for flowgroup %s on executor %s : quota=%s, requests
above quota=%d%n",
+ flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
Math.abs(flowGroupQuotaIncrement)+1-getQuotaForFlowGroup(flowGroup)));
+ }
+
// Throw errors for reach quota at the end to avoid inconsistent job counts
- if ((!proxyUserCheck || !requesterCheck) && !onInit) {
+ if ((!proxyUserCheck || !requesterCheck || !flowGroupCheck) && !onInit) {
// roll back the increased counts in this block
- String userKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
- decrementQuotaUsage(proxyUserToJobCount, userKey);
+ decrementQuotaUsage(proxyUserToJobCount,
DagManagerUtils.getUserQuotaKey(proxyUser, dagNode));
Review Comment:
Due to concurrency scenarios, unless we set some sort of lock it's not safe
to check the quota then increment. Instead we allow concurrent updates, then we
check the value that we updated. If we exceed the quota then we force a
decrement on the quota at the end and the flow is marked as a failure.
So this would lead to a double decrement if we didn't have guards around it.
Decrementing is done here for a faster turnaround so that we don't need to wait
for the kafka event to be ingested before freeing up the quota. But since
decrementing quotas is idempotent for the dagId, as right below we do `
runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));` then
decrementing only occurs once.
Issue Time Tracking
-------------------
Worklog Id: (was: 774283)
Time Spent: 1h 10m (was: 1h)
> Allow GaaS to enforce quotas by flowgroup
> -----------------------------------------
>
> Key: GOBBLIN-1650
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1650
> Project: Apache Gobblin
> Issue Type: New Feature
> Reporter: William Lo
> Priority: Major
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> Certain GaaS flows can have a large number of flows that can cause
> instability on dependent services. We want to be able to control the
> throughput of flows from a flowgroup granularity on top of a user
> granularity. We keep the quota configuration separate as there can be many
> users submitting flows to the same flowgroup.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)