[
https://issues.apache.org/jira/browse/GOBBLIN-1691?focusedWorklogId=803798&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-803798
]
ASF GitHub Bot logged work on GOBBLIN-1691:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 25/Aug/22 22:07
Start Date: 25/Aug/22 22:07
Worklog Time Spent: 10m
Work Description: arjun4084346 commented on code in PR #3545:
URL: https://github.com/apache/gobblin/pull/3545#discussion_r955460218
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An abstract implementation of {@link UserQuotaManager} that
+ */
+@Slf4j
+abstract public class AbstractUserQuotaManager implements UserQuotaManager {
+ public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX +
"perUserQuota";
+ public static final String PER_FLOWGROUP_QUOTA =
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
+ public static final String USER_JOB_QUOTA_KEY =
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
+ public static final String QUOTA_SEPERATOR = ":";
+ public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
+ private final Map<String, Integer> perUserQuota;
+ private final Map<String, Integer> perFlowGroupQuota;
+ Set<String> runningDagIds = ConcurrentHashMap.newKeySet();
+ private final int defaultQuota;
+
+ public AbstractUserQuotaManager(Config config) {
+ this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY,
DEFAULT_USER_JOB_QUOTA);
+ ImmutableMap.Builder<String, Integer> userMapBuilder =
ImmutableMap.builder();
+ ImmutableMap.Builder<String, Integer> flowGroupMapBuilder =
ImmutableMap.builder();
+ // Quotas will take form of user:<Quota> and flowGroup:<Quota>
+ for (String flowGroupQuota : ConfigUtils.getStringList(config,
PER_FLOWGROUP_QUOTA)) {
+ flowGroupMapBuilder.put(flowGroupQuota.split(QUOTA_SEPERATOR)[0],
Integer.parseInt(flowGroupQuota.split(QUOTA_SEPERATOR)[1]));
+ }
+ // Keep quotas per user as well in form user:<Quota> which apply for all
flowgroups
+ for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA))
{
+ userMapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0],
Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
+ }
+ this.perUserQuota = userMapBuilder.build();
+ this.perFlowGroupQuota = flowGroupMapBuilder.build();
+ }
+
+ abstract int incrementJobCount(String key, CountType countType) throws
IOException;
+
+ abstract void decrementJobCount(String user, CountType countType) throws
IOException;
+
+ public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean
onInit) throws IOException {
+ // Dag is already being tracked, no need to double increment for retries
and multihop flows
+ if (isDagCurrentlyRunning(dagNode)) {
+ return;
+ }
+ String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
+ String flowGroup =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+ ConfigurationKeys.FLOW_GROUP_KEY, "");
+ String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+ boolean proxyUserCheck = true;
+ Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for
which quota is increased
+ StringBuilder requesterMessage = new StringBuilder();
+ runningDagIds.add(DagManagerUtils.generateDagId(dagNode));
+ if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
+ int proxyQuotaIncrement = incrementJobCountAndCheckQuota(
+ DagManagerUtils.getUserQuotaKey(proxyUser, dagNode),
getQuotaForUser(proxyUser), CountType.USER_COUNT);
+ proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check
succeeds
+ if (!proxyUserCheck) {
+ // add 1 to proxyUserIncrement since count starts at 0, and is
negative if quota is exceeded
+ requesterMessage.append(String.format(
+ "Quota exceeded for proxy user %s on executor %s : quota=%s,
requests above quota=%d%n",
+ proxyUser, specExecutorUri, getQuotaForUser(proxyUser),
Math.abs(proxyQuotaIncrement)+1-getQuotaForUser(proxyUser)));
+ }
+ }
+
+ String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
+ boolean requesterCheck = true;
+
+ if (serializedRequesters != null &&
dagNode.getValue().getCurrentAttempts() <= 1) {
+ List<String> uniqueRequesters =
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
+ for (String requester : uniqueRequesters) {
+ int userQuotaIncrement = incrementJobCountAndCheckQuota(
+ DagManagerUtils.getUserQuotaKey(requester, dagNode),
getQuotaForUser(requester), CountType.REQUESTER_COUNT);
+ boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota
check succeeds
+ usersQuotaIncrement.add(requester);
+ requesterCheck = requesterCheck && thisRequesterCheck;
+ if (!thisRequesterCheck) {
+ requesterMessage.append(String.format(
+ "Quota exceeded for requester %s on executor %s : quota=%s,
requests above quota=%d%n",
+ requester, specExecutorUri, getQuotaForUser(requester),
Math.abs(userQuotaIncrement)-getQuotaForUser(requester)));
+ }
+ }
+ }
+
+ boolean flowGroupCheck = true;
+ if (dagNode.getValue().getCurrentAttempts() <= 1) {
+ int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
+ DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode),
getQuotaForFlowGroup(flowGroup), CountType.FLOW_COUNT);
+ flowGroupCheck = flowGroupQuotaIncrement >= 0;
+ if (!flowGroupCheck) {
+ requesterMessage.append(String.format("Quota exceeded for flowgroup %s
on executor %s : quota=%s, requests above quota=%d%n",
+ flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
+ Math.abs(flowGroupQuotaIncrement) + 1 -
getQuotaForFlowGroup(flowGroup)));
+ }
+ }
+
+ // Throw errors for reach quota at the end to avoid inconsistent job counts
Review Comment:
Thanks, though it was probably originally added by Jack (not sure) before a
couple of refactoring.
Issue Time Tracking
-------------------
Worklog Id: (was: 803798)
Time Spent: 1h (was: 50m)
> add a mysql based user quota manager
> ------------------------------------
>
> Key: GOBBLIN-1691
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1691
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
> mysql based user quota manager can share the usage data with other instances
> of Gobblin Service
--
This message was sent by Atlassian Jira
(v8.20.10#820010)