[ 
https://issues.apache.org/jira/browse/GOBBLIN-1624?focusedWorklogId=755439&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-755439
 ]

ASF GitHub Bot logged work on GOBBLIN-1624:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Apr/22 21:03
            Start Date: 11/Apr/22 21:03
    Worklog Time Spent: 10m 
      Work Description: Will-Lo commented on code in PR #3481:
URL: https://github.com/apache/gobblin/pull/3481#discussion_r847733755


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Manages the statically configured user quotas for both the proxy user in 
user.to.proxy configuration and the API requester(s)
+ * Is used by the dag manager to ensure that the number of currently running 
jobs do not exceed the quota, if the quota
+ * is exceeded, then the execution will fail without running on the underlying 
executor
+ */
+@Slf4j
+public class UserQuotaManager {
+  public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + 
"perUserQuota";
+  public static final String USER_JOB_QUOTA_KEY = 
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
+  public static final String QUOTA_SEPERATOR = ":";
+  public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
+  private final Map<String, Integer> proxyUserToJobCount = new 
ConcurrentHashMap<>();
+  private final Map<String, Integer> requesterToJobCount = new 
ConcurrentHashMap<>();
+  private final Map<String, Integer> perUserQuota;
+  Map<String, Boolean> runningDagIds = new ConcurrentHashMap<>();
+  private final int defaultQuota;
+
+  UserQuotaManager(Config config) {
+    this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, 
DEFAULT_USER_JOB_QUOTA);
+    ImmutableMap.Builder<String, Integer> mapBuilder = ImmutableMap.builder();
+
+    for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) 
{
+      mapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0], 
Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
+    }
+    this.perUserQuota = mapBuilder.build();
+  }
+
+  /**
+   * Checks if the dagNode exceeds the statically configured user quota for 
both the proxy user and requester user
+   * @throws IOException if the quota is exceeded, and logs a statement
+   */
+  public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean 
onInit) throws IOException {
+    String proxyUser = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), 
AzkabanProjectConfig.USER_TO_PROXY, null);
+    String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+    boolean proxyUserCheck = true;
+    int proxyQuotaIncrement;
+    Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for 
which quota is increased
+    StringBuilder requesterMessage = new StringBuilder();
+    runningDagIds.put(DagManagerUtils.generateDagId(dagNode), true);
+
+    if (proxyUser != null) {
+      proxyQuotaIncrement = 
incrementJobCountAndCheckUserQuota(proxyUserToJobCount, proxyUser, dagNode);
+      proxyUserCheck = proxyQuotaIncrement >= 0;  // proxy user quota check 
succeeds
+      if (!proxyUserCheck) {
+        // add 1 to proxyUserIncrement since count starts at 0, and is 
negative if quota is exceeded
+        requesterMessage.append(String.format(
+            "Quota exceeded for proxy user %s on executor %s : quota=%s, 
requests above quota=%d%n",
+            proxyUser, specExecutorUri, getQuotaForUser(proxyUser), 
Math.abs(proxyQuotaIncrement)+1-getQuotaForUser(proxyUser)));
+      }
+    }
+
+    String serializedRequesters = 
DagManagerUtils.getSerializedRequesterList(dagNode);
+    boolean requesterCheck = true;
+
+    if (serializedRequesters != null) {
+      List<String> uniqueRequesters = 
RequesterService.deserialize(serializedRequesters).stream()
+          
.map(ServiceRequester::getName).distinct().collect(Collectors.toList());
+      for (String requester : uniqueRequesters) {
+        int userQuotaIncrement = 
incrementJobCountAndCheckUserQuota(requesterToJobCount, requester, dagNode);
+        boolean thisRequesterCheck = userQuotaIncrement >= 0;  // user quota 
check succeeds
+        usersQuotaIncrement.add(requester);
+        requesterCheck = requesterCheck && thisRequesterCheck;
+        if (!thisRequesterCheck) {
+          requesterMessage.append(String.format(
+              "Quota exceeded for requester %s on executor %s : quota=%s, 
requests above quota=%d%n",
+              requester, specExecutorUri, getQuotaForUser(requester), 
Math.abs(userQuotaIncrement)-getQuotaForUser(requester)));
+        }
+      }
+    }
+
+    // Throw errors for reach quota at the end to avoid inconsistent job counts
+    if ((!proxyUserCheck || !requesterCheck) && !onInit) {
+      // roll back the increased counts in this block
+      String userKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
+      decrementQuotaUsage(proxyUserToJobCount, userKey);
+      decrementQuotaUsageForUsers(usersQuotaIncrement);
+      runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
+      throw new IOException(requesterMessage.toString());
+    }
+  }
+
+  /**
+   * Increment quota by one for the given map and key.
+   * @return a negative number if quota is already reached for this user
+   *         a positive number if the quota is not reached for this user
+   *         the absolute value of the number is the used quota before this 
increment request
+   *         0 if quota usage is not changed
+   */
+  private int incrementJobCountAndCheckUserQuota(Map<String, Integer> 
quotaMap, String user, Dag.DagNode<JobExecutionPlan> dagNode) {
+    String key = DagManagerUtils.getUserQuotaKey(user, dagNode);
+
+    // Only increment job count for first attempt, since job is considered 
running between retries
+    if (dagNode.getValue().getCurrentAttempts() != 1) {
+      return 0;
+    }
+
+    Integer currentCount;
+    // Modifications must be thread safe since DAGs on DagManagerThreads may 
update the quota for the same user
+    do {
+      currentCount = quotaMap.get(key);
+    } while (currentCount == null ? quotaMap.putIfAbsent(key, 1) != null : 
!quotaMap.replace(key, currentCount, currentCount + 1));
+
+    if (currentCount == null) {
+      currentCount = 0;
+    }
+
+    if (currentCount >= getQuotaForUser(user)) {
+      return -currentCount; // increment must have crossed the quota
+    } else {
+      return currentCount;
+    }
+  }
+
+  /**
+   * Decrement the quota by one for the proxy user and requesters 
corresponding to the provided {@link Dag.DagNode}.
+   * Returns true if the dag existed in the set of running dags and was 
removed successfully
+   */
+  public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) {
+    Boolean val = runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
+    if (val == null) {
+      return false;
+    }
+    String proxyUser = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), 
AzkabanProjectConfig.USER_TO_PROXY, null);
+    if (proxyUser != null) {
+      String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, 
dagNode);
+      decrementQuotaUsage(proxyUserToJobCount, proxyUserKey);

Review Comment:
   Ah I see it now, it's thrown in 
`RequesterService.deserialize(serializedRequesters)` so it's not needed in this 
line





Issue Time Tracking
-------------------

    Worklog Id:     (was: 755439)
    Time Spent: 2h 40m  (was: 2.5h)

> Gobblin as a Service does not emit correct running job metrics and quotas in 
> some edge cases
> --------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1624
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1624
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: William Lo
>            Priority: Major
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> With the DagManager class in GaaS, during rollout/leader swap it is possible 
> to get an inaccurate count of running jobs emitted, and quotas for these 
> running jobs.
> For example, if the leader is shut down while keeping track of 10 running 
> jobs, and during restart 5 of these jobs completed, the leader would emit 
> that 0 jobs are currently running since it would not treat the job counters 
> as idempotent. Additionally, we over-decrement due to not differentiating 
> jobs running on the executor that fail, vs jobs that fail on the GaaS side.
> We should keep track of currently running jobs better to ensure that we only 
> decrement counters/quotas for jobs that are actually running on the executor 
> and track better between startup. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to