[ 
https://issues.apache.org/jira/browse/GOBBLIN-1650?focusedWorklogId=774260&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-774260
 ]

ASF GitHub Bot logged work on GOBBLIN-1650:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/May/22 22:09
            Start Date: 24/May/22 22:09
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #3511:
URL: https://github.com/apache/gobblin/pull/3511#discussion_r880986699


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -41,39 +42,50 @@
 @Slf4j
 public class UserQuotaManager {
   public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + 
"perUserQuota";
+  public static final String PER_FLOWGROUP_QUOTA = 
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
   public static final String USER_JOB_QUOTA_KEY = 
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
   public static final String QUOTA_SEPERATOR = ":";
   public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
   private final Map<String, Integer> proxyUserToJobCount = new 
ConcurrentHashMap<>();
+  private final Map<String, Integer> flowGroupToJobCount = new 
ConcurrentHashMap<>();
   private final Map<String, Integer> requesterToJobCount = new 
ConcurrentHashMap<>();
   private final Map<String, Integer> perUserQuota;
+  private final Map<String, Integer> perFlowGroupQuota;
   Map<String, Boolean> runningDagIds = new ConcurrentHashMap<>();
   private final int defaultQuota;
 
   UserQuotaManager(Config config) {
     this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, 
DEFAULT_USER_JOB_QUOTA);
-    ImmutableMap.Builder<String, Integer> mapBuilder = ImmutableMap.builder();
-
+    ImmutableMap.Builder<String, Integer> userMapBuilder = 
ImmutableMap.builder();
+    ImmutableMap.Builder<String, Integer> flowGroupMapBuilder = 
ImmutableMap.builder();
+    // Quotas will take form of user:flowGroup:<Quota>

Review Comment:
   Am I interpreting this correctly? Each user has a quota (let's call it X) 
that can be applied across flowGroups. X can be distributed among flows for 
different groups or all go to one flow group's flows? At a higher level each 
flowGroup also has a max quota that is enforced, which is why the user quota 
check occurs prior to the group level check in the code below. It would be 
useful to explain this in a comment above the checkQuota function or to explain 
this mapping better here.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -102,11 +115,20 @@ public void checkQuota(Dag.DagNode<JobExecutionPlan> 
dagNode, boolean onInit) th
       }
     }
 
+    int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
+        DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), 
flowGroupToJobCount, dagNode, getQuotaForFlowGroup(flowGroup));
+    boolean flowGroupCheck = flowGroupQuotaIncrement >= 0;
+    if (!flowGroupCheck) {
+      requesterMessage.append(String.format(
+          "Quota exceeded for flowgroup %s on executor %s : quota=%s, requests 
above quota=%d%n",
+          flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup), 
Math.abs(flowGroupQuotaIncrement)+1-getQuotaForFlowGroup(flowGroup)));
+    }
+
     // Throw errors for reach quota at the end to avoid inconsistent job counts
-    if ((!proxyUserCheck || !requesterCheck) && !onInit) {
+    if ((!proxyUserCheck || !requesterCheck || !flowGroupCheck) && !onInit) {
       // roll back the increased counts in this block
-      String userKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
-      decrementQuotaUsage(proxyUserToJobCount, userKey);
+      decrementQuotaUsage(proxyUserToJobCount, 
DagManagerUtils.getUserQuotaKey(proxyUser, dagNode));

Review Comment:
   why do we decrement the quota usage at this point? Should it only be 
decremented after the flow finishes...?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -194,4 +217,12 @@ private int getQuotaForUser(String user) {
     return this.perUserQuota.getOrDefault(user, defaultQuota);
   }
 
+  private int getQuotaForFlowGroup(String flowGroup) {
+    return this.perFlowGroupQuota.getOrDefault(flowGroup, defaultQuota);
+  }
+
+  public boolean isDagCurrentlyRunning(Dag.DagNode<JobExecutionPlan> dagNode) {

Review Comment:
   should we use this function to decrement/increment quotas?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 774260)
    Time Spent: 40m  (was: 0.5h)

> Allow GaaS to enforce quotas by flowgroup
> -----------------------------------------
>
>                 Key: GOBBLIN-1650
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1650
>             Project: Apache Gobblin
>          Issue Type: New Feature
>            Reporter: William Lo
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Certain GaaS flows can have a large number of flows that can cause 
> instability on dependent services. We want to be able to control the 
> throughput of flows from a flowgroup granularity on top of a user 
> granularity. We keep the quota configuration separate as there can be many 
> users submitting flows to the same flowgroup.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to