[ 
https://issues.apache.org/jira/browse/GOBBLIN-1703?focusedWorklogId=813532&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-813532
 ]

ASF GitHub Bot logged work on GOBBLIN-1703:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Sep/22 00:39
            Start Date: 30/Sep/22 00:39
    Worklog Time Spent: 10m 
      Work Description: ZihanLi58 commented on code in PR #3550:
URL: https://github.com/apache/gobblin/pull/3550#discussion_r984119194


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java:
##########
@@ -47,55 +50,261 @@
 @Slf4j
 @Singleton
 public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
-  private final MysqlQuotaStore mysqlStore;
+  public final MysqlQuotaStore quotaStore;
+  public final RunningDagIdsStore runningDagIds;
+
 
   @Inject
   public MysqlUserQuotaManager(Config config) throws IOException {
     super(config);
-    this.mysqlStore = createQuotaStore(config);
+    this.quotaStore = createQuotaStore(config);
+    this.runningDagIds = createRunningDagStore(config);
+  }
+
+  void addDagId(Connection connection, String dagId) throws IOException {
+    this.runningDagIds.add(connection, dagId);
+  }
+
+  @Override
+  boolean containsDagId(String dagId) throws IOException {
+    return this.runningDagIds.contains(dagId);
+  }
+
+  boolean removeDagId(Connection connection, String dagId) throws IOException {
+    return this.runningDagIds.remove(connection, dagId);
   }
 
   // This implementation does not need to update quota usage when the service 
restarts or it's leadership status changes
   public void init(Collection<Dag<JobExecutionPlan>> dags) {
   }
 
   @Override
-  int incrementJobCount(String user, CountType countType) throws IOException {
-    try {
-      return this.mysqlStore.increaseCount(user, countType);
+  public void checkQuota(Collection<Dag.DagNode<JobExecutionPlan>> dagNodes) 
throws IOException {
+    try (Connection connection = this.quotaStore.dataSource.getConnection()) {
+      connection.setAutoCommit(false);
+
+      for (Dag.DagNode<JobExecutionPlan> dagNode : dagNodes) {
+        QuotaCheck quotaCheck = increaseAndCheckQuota(connection, dagNode);
+        if ((!quotaCheck.proxyUserCheck || !quotaCheck.requesterCheck || 
!quotaCheck.flowGroupCheck)) {
+          connection.rollback();
+          throw new QuotaExceededException(quotaCheck.requesterMessage);
+        }
+      }
+      connection.commit();
     } catch (SQLException e) {
       throw new IOException(e);
     }
   }
 
+  int incrementJobCount(Connection connection, String user, CountType 
countType) throws IOException, SQLException {
+    return this.quotaStore.increaseCount(connection, user, countType);
+  }
+
+  void decrementJobCount(Connection connection,String user, CountType 
countType) throws IOException, SQLException {
+      this.quotaStore.decreaseCount(connection, user, countType);
+  }
+
+  protected QuotaCheck increaseAndCheckQuota(Connection connection, 
Dag.DagNode<JobExecutionPlan> dagNode)
+      throws SQLException, IOException {
+    QuotaCheck quotaCheck = new QuotaCheck(true, true, true, "");
+    StringBuilder requesterMessage = new StringBuilder();
+
+    // Dag is already being tracked, no need to double increment for retries 
and multihop flows
+    if (containsDagId(DagManagerUtils.generateDagId(dagNode).toString())) {
+      return quotaCheck;
+    } else {
+      addDagId(connection, DagManagerUtils.generateDagId(dagNode).toString());
+    }
+
+    String proxyUser = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), 
AzkabanProjectConfig.USER_TO_PROXY, null);
+    String flowGroup = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+        ConfigurationKeys.FLOW_GROUP_KEY, "");
+    String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+
+    boolean proxyUserCheck;
+
+    if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
+      int proxyQuotaIncrement = incrementJobCountAndCheckQuota(connection,
+          DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), 
getQuotaForUser(proxyUser), CountType.USER_COUNT);
+      proxyUserCheck = proxyQuotaIncrement >= 0;  // proxy user quota check 
succeeds
+      quotaCheck.setProxyUserCheck(proxyUserCheck);
+      if (!proxyUserCheck) {
+        // add 1 to proxyUserIncrement since proxyQuotaIncrement is the count 
before the increment
+        requesterMessage.append(String.format(
+            "Quota exceeded for proxy user %s on executor %s : quota=%s, 
requests above quota=%d%n",
+            proxyUser, specExecutorUri, getQuotaForUser(proxyUser), 
Math.abs(proxyQuotaIncrement) + 1 - getQuotaForUser(proxyUser)));
+      }
+    }
+
+    String serializedRequesters = 
DagManagerUtils.getSerializedRequesterList(dagNode);
+    boolean requesterCheck = true;
+
+    if (dagNode.getValue().getCurrentAttempts() <= 1) {
+      List<String> uniqueRequesters = 
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
+      for (String requester : uniqueRequesters) {
+        int userQuotaIncrement = incrementJobCountAndCheckQuota(connection, 
DagManagerUtils.getUserQuotaKey(requester, dagNode),
+            getQuotaForUser(requester), CountType.REQUESTER_COUNT);
+        boolean thisRequesterCheck = userQuotaIncrement >= 0;  // user quota 
check succeeds
+        requesterCheck = requesterCheck && thisRequesterCheck;
+        quotaCheck.setRequesterCheck(requesterCheck);
+        if (!thisRequesterCheck) {
+          requesterMessage.append(String.format("Quota exceeded for requester 
%s on executor %s : quota=%s, requests above quota=%d%n. ",
+              requester, specExecutorUri, getQuotaForUser(requester), 
Math.abs(userQuotaIncrement) + 1 - getQuotaForUser(requester)));
+        }
+      }
+    }
+
+    boolean flowGroupCheck;
+
+    if (dagNode.getValue().getCurrentAttempts() <= 1) {
+      int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(connection,
+          DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), 
getQuotaForFlowGroup(flowGroup), CountType.FLOWGROUP_COUNT);
+      flowGroupCheck = flowGroupQuotaIncrement >= 0;
+      quotaCheck.setFlowGroupCheck(flowGroupCheck);
+      if (!flowGroupCheck) {
+        requesterMessage.append(String.format("Quota exceeded for flowgroup %s 
on executor %s : quota=%s, requests above quota=%d%n",
+            flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
+            Math.abs(flowGroupQuotaIncrement) + 1 - 
getQuotaForFlowGroup(flowGroup)));
+      }
+    }
+
+    quotaCheck.setRequesterMessage(requesterMessage.toString());
+
+    return quotaCheck;
+  }
+
   @Override
-  void decrementJobCount(String user, CountType countType) throws IOException {
+  protected void rollbackIncrements(Dag.DagNode<JobExecutionPlan> dagNode) 
throws IOException {

Review Comment:
   Do we need this method at all in mysql quota manager, seems not called 
anywhere?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 813532)
    Time Spent: 1h 40m  (was: 1.5h)

> avoid double quota usage increment for ad hoc flows
> ---------------------------------------------------
>
>                 Key: GOBBLIN-1703
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1703
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> When a gaas flow request comes, resource handler checks the quota right there.
> However, if the flow has runImmediately=true, the quota will be checked again 
> when the first job starts. This should be avoided.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to