Will-Lo commented on code in PR #3511:
URL: https://github.com/apache/gobblin/pull/3511#discussion_r881050400


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -102,11 +115,20 @@ public void checkQuota(Dag.DagNode<JobExecutionPlan> 
dagNode, boolean onInit) th
       }
     }
 
+    int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
+        DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), 
flowGroupToJobCount, dagNode, getQuotaForFlowGroup(flowGroup));
+    boolean flowGroupCheck = flowGroupQuotaIncrement >= 0;
+    if (!flowGroupCheck) {
+      requesterMessage.append(String.format(
+          "Quota exceeded for flowgroup %s on executor %s : quota=%s, requests 
above quota=%d%n",
+          flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup), 
Math.abs(flowGroupQuotaIncrement)+1-getQuotaForFlowGroup(flowGroup)));
+    }
+
     // Throw errors for reach quota at the end to avoid inconsistent job counts
-    if ((!proxyUserCheck || !requesterCheck) && !onInit) {
+    if ((!proxyUserCheck || !requesterCheck || !flowGroupCheck) && !onInit) {
       // roll back the increased counts in this block
-      String userKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
-      decrementQuotaUsage(proxyUserToJobCount, userKey);
+      decrementQuotaUsage(proxyUserToJobCount, 
DagManagerUtils.getUserQuotaKey(proxyUser, dagNode));

Review Comment:
   Due to concurrency scenarios, unless we set some sort of lock it's not safe 
to check the quota then increment. Instead we allow concurrent updates, then we 
check the value that we updated. If we exceed the quota then we force a 
decrement on the quota at the end and the flow is marked as a failure. 
   
   So this would lead to a double decrement if we didn't have guards around it. 
Decrementing is done here for a faster turnaround so that we don't need to wait 
for the kafka event to be ingested before freeing up the quota. But since 
decrementing quotas is idempotent for the dagId, as right below we do `      
runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));` then 
decrementing only occurs once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to