umustafi commented on code in PR #3511:
URL: https://github.com/apache/gobblin/pull/3511#discussion_r880986699


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -41,39 +42,50 @@
 @Slf4j
 public class UserQuotaManager {
   public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + 
"perUserQuota";
+  public static final String PER_FLOWGROUP_QUOTA = 
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
   public static final String USER_JOB_QUOTA_KEY = 
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
   public static final String QUOTA_SEPERATOR = ":";
   public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
   private final Map<String, Integer> proxyUserToJobCount = new 
ConcurrentHashMap<>();
+  private final Map<String, Integer> flowGroupToJobCount = new 
ConcurrentHashMap<>();
   private final Map<String, Integer> requesterToJobCount = new 
ConcurrentHashMap<>();
   private final Map<String, Integer> perUserQuota;
+  private final Map<String, Integer> perFlowGroupQuota;
   Map<String, Boolean> runningDagIds = new ConcurrentHashMap<>();
   private final int defaultQuota;
 
   UserQuotaManager(Config config) {
     this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, 
DEFAULT_USER_JOB_QUOTA);
-    ImmutableMap.Builder<String, Integer> mapBuilder = ImmutableMap.builder();
-
+    ImmutableMap.Builder<String, Integer> userMapBuilder = 
ImmutableMap.builder();
+    ImmutableMap.Builder<String, Integer> flowGroupMapBuilder = 
ImmutableMap.builder();
+    // Quotas will take form of user:flowGroup:<Quota>

Review Comment:
   Am I interpreting this correctly? Each user has a quota (let's call it X) 
that can be applied across flowGroups. X can be distributed among flows for 
different groups or all go to one flow group's flows? At a higher level each 
flowGroup also has a max quota that is enforced, which is why the user quota 
check occurs prior to the group level check in the code below. It would be 
useful to explain this in a comment above the checkQuota function or to explain 
this mapping better here.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -102,11 +115,20 @@ public void checkQuota(Dag.DagNode<JobExecutionPlan> 
dagNode, boolean onInit) th
       }
     }
 
+    int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
+        DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), 
flowGroupToJobCount, dagNode, getQuotaForFlowGroup(flowGroup));
+    boolean flowGroupCheck = flowGroupQuotaIncrement >= 0;
+    if (!flowGroupCheck) {
+      requesterMessage.append(String.format(
+          "Quota exceeded for flowgroup %s on executor %s : quota=%s, requests 
above quota=%d%n",
+          flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup), 
Math.abs(flowGroupQuotaIncrement)+1-getQuotaForFlowGroup(flowGroup)));
+    }
+
     // Throw errors for reach quota at the end to avoid inconsistent job counts
-    if ((!proxyUserCheck || !requesterCheck) && !onInit) {
+    if ((!proxyUserCheck || !requesterCheck || !flowGroupCheck) && !onInit) {
       // roll back the increased counts in this block
-      String userKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
-      decrementQuotaUsage(proxyUserToJobCount, userKey);
+      decrementQuotaUsage(proxyUserToJobCount, 
DagManagerUtils.getUserQuotaKey(proxyUser, dagNode));

Review Comment:
   why do we decrement the quota usage at this point? Should it only be 
decremented after the flow finishes...?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -194,4 +217,12 @@ private int getQuotaForUser(String user) {
     return this.perUserQuota.getOrDefault(user, defaultQuota);
   }
 
+  private int getQuotaForFlowGroup(String flowGroup) {
+    return this.perFlowGroupQuota.getOrDefault(flowGroup, defaultQuota);
+  }
+
+  public boolean isDagCurrentlyRunning(Dag.DagNode<JobExecutionPlan> dagNode) {

Review Comment:
   should we use this function to decrement/increment quotas?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to