This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new fa97ff5d4 add MysqlUserQuotaManager (#3545)
fa97ff5d4 is described below
commit fa97ff5d4411c6edb349a05b7c15adcfb8e54b9c
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Tue Sep 6 13:07:03 2022 -0700
add MysqlUserQuotaManager (#3545)
fix unit test and address review comments
address review comments
merge conflicts
---
.../apache/gobblin/service/ServiceConfigKeys.java | 4 +
.../testing/TestMetastoreDatabaseServer.java | 2 +-
.../modules/core/GobblinServiceGuiceModule.java | 4 +-
...aManager.java => AbstractUserQuotaManager.java} | 221 +++++++++---------
.../service/modules/orchestration/DagManager.java | 15 +-
.../modules/orchestration/DagManagerUtils.java | 5 +
.../orchestration/InMemoryUserQuotaManager.java | 119 ++++++++++
.../orchestration/MysqlUserQuotaManager.java | 259 +++++++++++++++++++++
.../modules/orchestration/UserQuotaManager.java | 217 ++---------------
.../scheduler/GobblinServiceJobScheduler.java | 5 +-
.../gobblin/service/GobblinServiceManagerTest.java | 5 +-
.../modules/orchestration/DagManagerTest.java | 4 +-
...Test.java => InMemoryUserQuotaManagerTest.java} | 35 ++-
.../orchestration/MysqlUserQuotaManagerTest.java | 138 +++++++++++
.../scheduler/GobblinServiceJobSchedulerTest.java | 14 +-
15 files changed, 692 insertions(+), 355 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index dc149e20b..dbb9ef072 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -142,6 +142,10 @@ public class ServiceConfigKeys {
public static final String FORCE_LEADER = GOBBLIN_SERVICE_PREFIX +
"forceLeader";
public static final boolean DEFAULT_FORCE_LEADER = false;
+
+ public static final String QUOTA_MANAGER_CLASS = GOBBLIN_SERVICE_PREFIX +
"quotaManager.class";
+ public static final String DEFAULT_QUOTA_MANAGER =
"org.apache.gobblin.service.modules.orchestration.InMemoryUserQuotaManager";
+
// Group Membership authentication service
public static final String GROUP_OWNERSHIP_SERVICE_CLASS =
GOBBLIN_SERVICE_PREFIX + "groupOwnershipService.class";
public static final String DEFAULT_GROUP_OWNERSHIP_SERVICE =
"org.apache.gobblin.service.NoopGroupOwnershipService";
diff --git
a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseServer.java
b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseServer.java
index 77786bd45..ebbd0ad98 100644
---
a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseServer.java
+++
b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseServer.java
@@ -86,7 +86,7 @@ class TestMetastoreDatabaseServer implements Closeable {
this.dbHost = this.embeddedMysqlEnabled ? "localhost" :
realConfig.getString(DBHOST_KEY);
this.dbPort = this.embeddedMysqlEnabled ? new
PortUtils.ServerSocketPortLocator().random() : realConfig.getInt(DBPORT_KEY);
- this.log.error("Starting with config: embeddedMysqlEnabled={}
dbUserName={} dbHost={} dbPort={}",
+ this.log.info("Starting with config: embeddedMysqlEnabled={} dbUserName={}
dbHost={} dbPort={}",
this.embeddedMysqlEnabled,
this.dbUserName,
this.dbHost,
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 653354d45..310073a8c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -200,7 +200,9 @@ public class GobblinServiceGuiceModule implements Module {
binder.bind(SchedulerService.class);
binder.bind(GobblinServiceJobScheduler.class);
OptionalBinder.newOptionalBinder(binder, UserQuotaManager.class);
- binder.bind(UserQuotaManager.class);
+ binder.bind(UserQuotaManager.class)
+ .to(getClassByNameOrAlias(UserQuotaManager.class,
serviceConfig.getInnerConfig(),
+ ServiceConfigKeys.QUOTA_MANAGER_CLASS,
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER));
}
if (serviceConfig.isGitConfigMonitorEnabled()) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
similarity index 50%
copy from
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
copy to
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
index 36b4d2718..8d6cab041 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
@@ -14,51 +14,46 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.gobblin.service.modules.orchestration;
-import com.google.common.collect.ImmutableMap;
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
import java.io.IOException;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import javax.inject.Singleton;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.exception.QuotaExceededException;
-import org.apache.gobblin.service.RequesterService;
-import org.apache.gobblin.service.ServiceRequester;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.util.ConfigUtils;
/**
- * Manages the statically configured user quotas for both the proxy user in
user.to.proxy configuration and the API requester(s)
- * Is used by the dag manager to ensure that the number of currently running
jobs do not exceed the quota, if the quota
- * is exceeded, then the execution will fail without running on the underlying
executor
+ * An abstract implementation of {@link UserQuotaManager} that has base
implementation of checking quota and increasing/decreasing it.
*/
@Slf4j
-@Singleton
-public class UserQuotaManager {
+abstract public class AbstractUserQuotaManager implements UserQuotaManager {
public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX +
"perUserQuota";
public static final String PER_FLOWGROUP_QUOTA =
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
public static final String USER_JOB_QUOTA_KEY =
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
public static final String QUOTA_SEPERATOR = ":";
public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
- private final Map<String, Integer> proxyUserToJobCount = new
ConcurrentHashMap<>();
- private final Map<String, Integer> flowGroupToJobCount = new
ConcurrentHashMap<>();
- private final Map<String, Integer> requesterToJobCount = new
ConcurrentHashMap<>();
private final Map<String, Integer> perUserQuota;
private final Map<String, Integer> perFlowGroupQuota;
- Map<String, Boolean> runningDagIds = new ConcurrentHashMap<>();
+ // TODO : we might want to make this field implementation specific to be
able to decide if the dag is already running or have been accepted
+ Set<String> runningDagIds = ConcurrentHashMap.newKeySet();
private final int defaultQuota;
- @Inject
- public UserQuotaManager(Config config) {
+ public AbstractUserQuotaManager(Config config) {
this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY,
DEFAULT_USER_JOB_QUOTA);
ImmutableMap.Builder<String, Integer> userMapBuilder =
ImmutableMap.builder();
ImmutableMap.Builder<String, Integer> flowGroupMapBuilder =
ImmutableMap.builder();
@@ -74,151 +69,147 @@ public class UserQuotaManager {
this.perFlowGroupQuota = flowGroupMapBuilder.build();
}
- /**
- * Checks if the dagNode exceeds the statically configured user quota for
both the proxy user, requester user, and flowGroup
- * @throws QuotaExceededException if the quota is exceeded, and logs a
statement
- */
- public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean
onInit) throws QuotaExceededException {
+ // Implementations should return the current count and increase them by one
+ abstract int incrementJobCount(String key, CountType countType) throws
IOException;
+
+ abstract void decrementJobCount(String key, CountType countType) throws
IOException;
+
+ public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws
IOException {
+ QuotaCheck quotaCheck = increaseAndCheckQuota(dagNode);
+
+ // Throw errors for reach quota at the end to avoid inconsistent job counts
+ if ((!quotaCheck.proxyUserCheck || !quotaCheck.requesterCheck ||
!quotaCheck.flowGroupCheck)) {
+ // roll back the increased counts in this block
+ rollbackIncrements(dagNode);
+ throw new QuotaExceededException(quotaCheck.requesterMessage);
+ }
+ }
+
+ private void rollbackIncrements(Dag.DagNode<JobExecutionPlan> dagNode)
throws IOException {
+ String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
+ String flowGroup =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
ConfigurationKeys.FLOW_GROUP_KEY, "");
+ List<String> usersQuotaIncrement =
DagManagerUtils.getDistinctUniqueRequesters(DagManagerUtils.getSerializedRequesterList(dagNode));
+
+ decrementJobCount(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode),
CountType.USER_COUNT);
+ decrementQuotaUsageForUsers(usersQuotaIncrement);
+ decrementJobCount(DagManagerUtils.getFlowGroupQuotaKey(flowGroup,
dagNode), CountType.FLOWGROUP_COUNT);
+ runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
+ }
+
+ protected QuotaCheck increaseAndCheckQuota(Dag.DagNode<JobExecutionPlan>
dagNode) throws IOException {
+ QuotaCheck quotaCheck = new QuotaCheck(true, true, true, "");
// Dag is already being tracked, no need to double increment for retries
and multihop flows
- if (isDagCurrentlyRunning(dagNode)) {
- return;
+ if (this.runningDagIds.contains(DagManagerUtils.generateDagId(dagNode))) {
+ return quotaCheck;
+ } else {
+ runningDagIds.add(DagManagerUtils.generateDagId(dagNode));
}
+
String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
String flowGroup =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
ConfigurationKeys.FLOW_GROUP_KEY, "");
String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
- boolean proxyUserCheck = true;
- Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for
which quota is increased
StringBuilder requesterMessage = new StringBuilder();
- runningDagIds.put(DagManagerUtils.generateDagId(dagNode), true);
- if (proxyUser != null) {
+
+ boolean proxyUserCheck;
+
+ if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
int proxyQuotaIncrement = incrementJobCountAndCheckQuota(
- DagManagerUtils.getUserQuotaKey(proxyUser, dagNode),
proxyUserToJobCount, dagNode, getQuotaForUser(proxyUser));
+ DagManagerUtils.getUserQuotaKey(proxyUser, dagNode),
getQuotaForUser(proxyUser), CountType.USER_COUNT);
proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check
succeeds
+ quotaCheck.setProxyUserCheck(proxyUserCheck);
if (!proxyUserCheck) {
- // add 1 to proxyUserIncrement since count starts at 0, and is
negative if quota is exceeded
+ // add 1 to proxyUserIncrement since proxyQuotaIncrement is the count
before the increment
requesterMessage.append(String.format(
"Quota exceeded for proxy user %s on executor %s : quota=%s,
requests above quota=%d%n",
- proxyUser, specExecutorUri, getQuotaForUser(proxyUser),
Math.abs(proxyQuotaIncrement)+1-getQuotaForUser(proxyUser)));
+ proxyUser, specExecutorUri, getQuotaForUser(proxyUser),
Math.abs(proxyQuotaIncrement) + 1 - getQuotaForUser(proxyUser)));
}
}
String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
boolean requesterCheck = true;
- if (serializedRequesters != null) {
+ if (dagNode.getValue().getCurrentAttempts() <= 1) {
List<String> uniqueRequesters =
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
for (String requester : uniqueRequesters) {
int userQuotaIncrement = incrementJobCountAndCheckQuota(
- DagManagerUtils.getUserQuotaKey(requester, dagNode),
requesterToJobCount, dagNode, getQuotaForUser(requester));
+ DagManagerUtils.getUserQuotaKey(requester, dagNode),
getQuotaForUser(requester), CountType.REQUESTER_COUNT);
boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota
check succeeds
- usersQuotaIncrement.add(requester);
requesterCheck = requesterCheck && thisRequesterCheck;
+ quotaCheck.setRequesterCheck(requesterCheck);
if (!thisRequesterCheck) {
requesterMessage.append(String.format(
- "Quota exceeded for requester %s on executor %s : quota=%s,
requests above quota=%d%n",
- requester, specExecutorUri, getQuotaForUser(requester),
Math.abs(userQuotaIncrement)-getQuotaForUser(requester)));
+ "Quota exceeded for requester %s on executor %s : quota=%s,
requests above quota=%d%n. ",
+ requester, specExecutorUri, getQuotaForUser(requester),
Math.abs(userQuotaIncrement) + 1 - getQuotaForUser(requester)));
}
}
}
- int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
- DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode),
flowGroupToJobCount, dagNode, getQuotaForFlowGroup(flowGroup));
- boolean flowGroupCheck = flowGroupQuotaIncrement >= 0;
- if (!flowGroupCheck) {
- requesterMessage.append(String.format(
- "Quota exceeded for flowgroup %s on executor %s : quota=%s, requests
above quota=%d%n",
- flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
Math.abs(flowGroupQuotaIncrement)+1-getQuotaForFlowGroup(flowGroup)));
- }
-
- // Throw errors for reach quota at the end to avoid inconsistent job counts
- if ((!proxyUserCheck || !requesterCheck || !flowGroupCheck) && !onInit) {
- // roll back the increased counts in this block
- decrementQuotaUsage(proxyUserToJobCount,
DagManagerUtils.getUserQuotaKey(proxyUser, dagNode));
- decrementQuotaUsage(flowGroupToJobCount,
DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode));
- decrementQuotaUsageForUsers(usersQuotaIncrement);
- runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
- throw new QuotaExceededException(requesterMessage.toString());
- }
- }
-
- /**
- * Increment quota by one for the given map and key.
- * @return a negative number if quota is already reached
- * a positive number if the quota is not reached
- * the absolute value of the number is the used quota before this
increment request
- * 0 if quota usage is not changed
- */
- private int incrementJobCountAndCheckQuota(String key, Map<String, Integer>
quotaMap, Dag.DagNode<JobExecutionPlan> dagNode, int quotaForKey) {
- // Only increment job count for first attempt, since job is considered
running between retries
- // Include the scenario where currentAttempts is 0 (when checked by the
scheduler)
- // but it will not double increment due to first ensuring that the dag is
not already incremented
- if (dagNode.getValue().getCurrentAttempts() > 1) {
- return 0;
+ boolean flowGroupCheck;
+
+ if (dagNode.getValue().getCurrentAttempts() <= 1) {
+ int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
+ DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode),
getQuotaForFlowGroup(flowGroup), CountType.FLOWGROUP_COUNT);
+ flowGroupCheck = flowGroupQuotaIncrement >= 0;
+ quotaCheck.setFlowGroupCheck(flowGroupCheck);
+ if (!flowGroupCheck) {
+ requesterMessage.append(String.format("Quota exceeded for flowgroup %s
on executor %s : quota=%s, requests above quota=%d%n",
+ flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
+ Math.abs(flowGroupQuotaIncrement) + 1 -
getQuotaForFlowGroup(flowGroup)));
+ }
}
- Integer currentCount;
- // Modifications must be thread safe since DAGs on DagManagerThreads may
update the quota for the same user
- do {
- currentCount = quotaMap.get(key);
- } while (currentCount == null ? quotaMap.putIfAbsent(key, 1) != null :
!quotaMap.replace(key, currentCount, currentCount + 1));
-
- if (currentCount == null) {
- currentCount = 0;
- }
+ quotaCheck.setRequesterMessage(requesterMessage.toString());
- if (currentCount >= quotaForKey) {
- return -currentCount; // increment must have crossed the quota
- } else {
- return currentCount;
- }
+ return quotaCheck;
}
/**
* Decrement the quota by one for the proxy user and requesters
corresponding to the provided {@link Dag.DagNode}.
* Returns true if the dag existed in the set of running dags and was
removed successfully
*/
- public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) {
- Boolean val = runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
- if (val == null) {
+ public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws
IOException {
+ boolean val = runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
+ if (!val) {
return false;
}
+
String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
if (proxyUser != null) {
String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser,
dagNode);
- decrementQuotaUsage(proxyUserToJobCount, proxyUserKey);
+ decrementJobCount(proxyUserKey, CountType.USER_COUNT);
}
+
String flowGroup =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
ConfigurationKeys.FLOW_GROUP_KEY, "");
- decrementQuotaUsage(flowGroupToJobCount,
DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode));
+ decrementJobCount(DagManagerUtils.getFlowGroupQuotaKey(flowGroup,
dagNode), CountType.FLOWGROUP_COUNT);
+
String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
- if (serializedRequesters != null) {
- try {
- for (ServiceRequester requester :
RequesterService.deserialize(serializedRequesters)) {
- String requesterKey =
DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode);
- decrementQuotaUsage(requesterToJobCount, requesterKey);
- }
- } catch (IOException e) {
- log.error("Failed to release quota for requester list " +
serializedRequesters, e);
- return false;
+ try {
+ for (String requester :
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters)) {
+ String requesterKey = DagManagerUtils.getUserQuotaKey(requester,
dagNode);
+ decrementJobCount(requesterKey, CountType.REQUESTER_COUNT);
}
+ } catch (IOException e) {
+ log.error("Failed to release quota for requester list " +
serializedRequesters, e);
+ return false;
}
+
return true;
}
- private void decrementQuotaUsage(Map<String, Integer> quotaMap, String key) {
- Integer currentCount;
- if (key == null) {
- return;
+ private int incrementJobCountAndCheckQuota(String key, int keyQuota,
CountType countType) throws IOException {
+ int currentCount = incrementJobCount(key, countType);
+ if (currentCount >= keyQuota) {
+ return -currentCount;
+ } else {
+ return currentCount;
}
- do {
- currentCount = quotaMap.get(key);
- } while (currentCount != null && currentCount > 0 &&
!quotaMap.replace(key, currentCount, currentCount - 1));
}
- private void decrementQuotaUsageForUsers(Set<String>
requestersToDecreaseCount) {
+ private void decrementQuotaUsageForUsers(List<String>
requestersToDecreaseCount) throws IOException {
for (String requester : requestersToDecreaseCount) {
- decrementQuotaUsage(requesterToJobCount, requester);
+ decrementJobCount(requester, CountType.REQUESTER_COUNT);
}
}
@@ -230,8 +221,18 @@ public class UserQuotaManager {
return this.perFlowGroupQuota.getOrDefault(flowGroup, defaultQuota);
}
- private boolean isDagCurrentlyRunning(Dag.DagNode<JobExecutionPlan> dagNode)
{
- return
this.runningDagIds.containsKey(DagManagerUtils.generateDagId(dagNode));
+ @Setter
+ @AllArgsConstructor
+ protected static class QuotaCheck {
+ boolean proxyUserCheck;
+ boolean requesterCheck;
+ boolean flowGroupCheck;
+ String requesterMessage;
}
-}
+ protected enum CountType {
+ USER_COUNT,
+ REQUESTER_COUNT,
+ FLOWGROUP_COUNT
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index f423277cf..3543c0e16 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -350,16 +350,9 @@ public class DagManager extends AbstractIdleService {
this.dagManagerMetrics.activate();
- UserQuotaManager quotaManager = new UserQuotaManager(config);
- // Before initializing the DagManagerThreads check which dags are
currently running before shutdown
- for (Dag<JobExecutionPlan> dag: dagStateStore.getDags()) {
- for (DagNode<JobExecutionPlan> dagNode: dag.getNodes()) {
- if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) {
- // Add all the currently running Dags to the quota limit per user
- quotaManager.checkQuota(dagNode, true);
- }
- }
- }
+ UserQuotaManager quotaManager = new InMemoryUserQuotaManager(config);
+ quotaManager.init(dagStateStore.getDags());
+
//On startup, the service creates DagManagerThreads that are scheduled
at a fixed rate.
this.dagManagerThreads = new DagManagerThread[numThreads];
for (int i = 0; i < numThreads; i++) {
@@ -911,7 +904,7 @@ public class DagManager extends AbstractIdleService {
// Run this spec on selected executor
SpecProducer<Spec> producer;
try {
- quotaManager.checkQuota(dagNode, false);
+ quotaManager.checkQuota(dagNode);
producer = DagManagerUtils.getSpecProducer(dagNode);
TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ?
this.eventSubmitter.get().
getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) :
null;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 9d59a9085..dc79524b3 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.orchestration;
import com.google.common.collect.ImmutableMap;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -345,6 +346,10 @@ public class DagManagerUtils {
}
static List<String> getDistinctUniqueRequesters(String serializedRequesters)
{
+ if (serializedRequesters == null) {
+ return Collections.emptyList();
+ }
+
List<String> uniqueRequesters;
try {
uniqueRequesters = RequesterService.deserialize(serializedRequesters)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManager.java
new file mode 100644
index 000000000..438e86d20
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManager.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.inject.Singleton;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.RUNNING;
+
+
+/**
+ * An implementation of {@link UserQuotaManager} that stores quota usage in
memory.
+ */
+@Slf4j
+@Singleton
+public class InMemoryUserQuotaManager extends AbstractUserQuotaManager {
+ private final Map<String, Integer> proxyUserToJobCount = new
ConcurrentHashMap<>();
+ private final Map<String, Integer> flowGroupToJobCount = new
ConcurrentHashMap<>();
+ private final Map<String, Integer> requesterToJobCount = new
ConcurrentHashMap<>();
+
+ @Inject
+ public InMemoryUserQuotaManager(Config config) {
+ super(config);
+ }
+
+ public void init(Collection<Dag<JobExecutionPlan>> dags) throws IOException {
+ for (Dag<JobExecutionPlan> dag : dags) {
+ for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+ if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) {
+ // Add all the currently running Dags to the quota limit per user
+ increaseAndCheckQuota(dagNode);
+ }
+ }
+ }
+ }
+
+ private int incrementJobCount(String key, Map<String, Integer> quotaMap) {
+ Integer currentCount;
+ // Modifications must be thread safe since DAGs on DagManagerThreads may
update the quota for the same user
+ do {
+ currentCount = quotaMap.get(key);
+ } while (currentCount == null ? quotaMap.putIfAbsent(key, 1) != null :
!quotaMap.replace(key, currentCount, currentCount + 1));
+
+ if (currentCount == null) {
+ currentCount = 0;
+ }
+
+ return currentCount;
+ }
+
+ private void decrementJobCount(String key, Map<String, Integer> quotaMap) {
+ Integer currentCount;
+ if (key == null) {
+ return;
+ }
+ do {
+ currentCount = quotaMap.get(key);
+ } while (currentCount != null && currentCount > 0 &&
!quotaMap.replace(key, currentCount, currentCount - 1));
+
+ if (currentCount == null || currentCount == 0) {
+ log.warn("Decrement job count was called for " + key + " when the count
was already zero/absent.");
+ }
+ }
+
+ @Override
+ int incrementJobCount(String user, CountType countType) throws IOException {
+ switch (countType) {
+ case USER_COUNT:
+ return incrementJobCount(user, proxyUserToJobCount);
+ case REQUESTER_COUNT:
+ return incrementJobCount(user, requesterToJobCount);
+ case FLOWGROUP_COUNT:
+ return incrementJobCount(user, flowGroupToJobCount);
+ default:
+ throw new IOException("Invalid count type " + countType);
+ }
+ }
+
+ @Override
+ void decrementJobCount(String user, CountType countType) throws IOException {
+ switch (countType) {
+ case USER_COUNT:
+ decrementJobCount(user, proxyUserToJobCount);
+ break;
+ case REQUESTER_COUNT:
+ decrementJobCount(user, requesterToJobCount);
+ break;
+ case FLOWGROUP_COUNT:
+ decrementJobCount(user, flowGroupToJobCount);
+ break;
+ default:
+ throw new IOException("Invalid count type " + countType);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
new file mode 100644
index 000000000..78f15cfba
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+
+import org.apache.commons.dbcp.BasicDataSource;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import javax.inject.Singleton;
+import javax.sql.DataSource;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An implementation of {@link UserQuotaManager} that stores quota usage in
mysql.
+ */
+@Slf4j
+@Singleton
+public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
+ private final MysqlQuotaStore mysqlStore;
+
+ @Inject
+ public MysqlUserQuotaManager(Config config) throws IOException {
+ super(config);
+ this.mysqlStore = createQuotaStore(config);
+ }
+
+ // This implementation does not need to update quota usage when the service
restarts or it's leadership status changes
+ public void init(Collection<Dag<JobExecutionPlan>> dags) {
+ }
+
+ @Override
+ int incrementJobCount(String user, CountType countType) throws IOException {
+ try {
+ return this.mysqlStore.increaseCount(user, countType);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ void decrementJobCount(String user, CountType countType) throws IOException {
+ try {
+ this.mysqlStore.decreaseCount(user, countType);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @VisibleForTesting
+ int getCount(String name, CountType countType) throws IOException {
+ return this.mysqlStore.getCount(name, countType);
+ }
+
+ /**
+ * Creating an instance of StateStore.
+ */
+ protected MysqlQuotaStore createQuotaStore(Config config) throws IOException
{
+ String stateStoreTableName = ConfigUtils.getString(config,
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
+ ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);
+
+ BasicDataSource basicDataSource = MysqlStateStore.newDataSource(config);
+
+ return new MysqlQuotaStore(basicDataSource, stateStoreTableName);
+ }
+
+ static class MysqlQuotaStore {
+ protected final DataSource dataSource;
+ final String tableName;
+ private final String GET_USER_COUNT;
+ private final String GET_REQUESTER_COUNT;
+ private final String GET_FLOWGROUP_COUNT;
+ private final String INCREASE_USER_COUNT_SQL;
+ private final String INCREASE_REQUESTER_COUNT_SQL;
+ private final String INCREASE_FLOW_COUNT_SQL;
+ private final String DECREASE_USER_COUNT_SQL;
+ private final String DECREASE_REQUESTER_COUNT_SQL;
+ private final String DECREASE_FLOWGROUP_COUNT_SQL;
+ private final String DELETE_USER_SQL;
+
+ public MysqlQuotaStore(BasicDataSource dataSource, String tableName)
+ throws IOException {
+ this.dataSource = dataSource;
+ this.tableName = tableName;
+
+ GET_USER_COUNT = "SELECT user_count FROM " + tableName + " WHERE name =
? FOR UPDATE";
+ GET_REQUESTER_COUNT = "SELECT requester_count FROM " + tableName + "
WHERE name = ? FOR UPDATE";
+ GET_FLOWGROUP_COUNT = "SELECT flowgroup_count FROM " + tableName + "
WHERE name = ? FOR UPDATE";
+ INCREASE_USER_COUNT_SQL = "INSERT INTO " + tableName + " (name,
user_count) VALUES (?, 1) "
+ + "ON DUPLICATE KEY UPDATE user_count=user_count+1";
+ INCREASE_REQUESTER_COUNT_SQL = "INSERT INTO " + tableName + " (name,
requester_count) VALUES (?, 1) "
+ + "ON DUPLICATE KEY UPDATE requester_count=requester_count+1";
+ INCREASE_FLOW_COUNT_SQL = "INSERT INTO " + tableName + " (name,
flowgroup_count) VALUES (?, 1) "
+ + "ON DUPLICATE KEY UPDATE flowgroup_count=flowgroup_count+1";
+ DECREASE_USER_COUNT_SQL = "UPDATE " + tableName + " SET
user_count=GREATEST(0, user_count-1) WHERE name = ?";
+ DECREASE_REQUESTER_COUNT_SQL = "UPDATE " + tableName + " SET
requester_count=GREATEST(0, requester_count-1) WHERE name = ?";
+ DECREASE_FLOWGROUP_COUNT_SQL = "UPDATE " + tableName + " SET
flowgroup_count=flowgroup_count-1 WHERE name = ?";
+ DELETE_USER_SQL = "DELETE FROM " + tableName + " WHERE name = ? AND
user_count<1 AND flowgroup_count<1";
+
+ String createQuotaTable = "CREATE TABLE IF NOT EXISTS " + tableName + "
(name VARCHAR(20) CHARACTER SET latin1 NOT NULL, "
+ + "user_count INT NOT NULL DEFAULT 0, requester_count INT NOT NULL
DEFAULT 0, flowgroup_count INT NOT NULL DEFAULT 0, "
+ + "PRIMARY KEY (name), " + "UNIQUE INDEX ind (name))";
+ try (Connection connection = dataSource.getConnection();
PreparedStatement createStatement =
connection.prepareStatement(createQuotaTable)) {
+ createStatement.executeUpdate();
+ } catch (SQLException e) {
+ throw new IOException("Failure creation table " + tableName, e);
+ }
+ }
+
+ /**
+ * returns count of countType for the name. if the row does not exist,
returns zero.
+ */
+ @VisibleForTesting
+ int getCount(String name, CountType countType) throws IOException {
+ String selectStatement = countType == CountType.USER_COUNT ?
GET_USER_COUNT : GET_FLOWGROUP_COUNT;
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement queryStatement =
connection.prepareStatement(selectStatement)) {
+ queryStatement.setString(1, name);
+ try (ResultSet rs = queryStatement.executeQuery()) {
+ if (rs.next()) {
+ return rs.getInt(1);
+ } else {
+ return -1;
+ }
+ }
+ } catch (Exception e) {
+ throw new IOException("failure retrieving count from user/flowGroup "
+ name, e);
+ }
+ }
+
+ public int increaseCount(String name, CountType countType) throws
IOException, SQLException {
+ Connection connection = dataSource.getConnection();
+ connection.setAutoCommit(false);
+
+ String selectStatement;
+ String increaseStatement;
+
+ switch(countType) {
+ case USER_COUNT:
+ selectStatement = GET_USER_COUNT;
+ increaseStatement = INCREASE_USER_COUNT_SQL;
+ break;
+ case REQUESTER_COUNT:
+ selectStatement = GET_REQUESTER_COUNT;
+ increaseStatement = INCREASE_REQUESTER_COUNT_SQL;
+ break;
+ case FLOWGROUP_COUNT:
+ selectStatement = GET_FLOWGROUP_COUNT;
+ increaseStatement = INCREASE_FLOW_COUNT_SQL;
+ break;
+ default:
+ throw new IOException("Invalid count type " + countType);
+ }
+
+ ResultSet rs = null;
+ try (PreparedStatement statement1 =
connection.prepareStatement(selectStatement);
+ PreparedStatement statement2 =
connection.prepareStatement(increaseStatement)) {
+ statement1.setString(1, name);
+ statement2.setString(1, name);
+ rs = statement1.executeQuery();
+ statement2.executeUpdate();
+ connection.commit();
+ if (rs != null && rs.next()) {
+ return rs.getInt(1);
+ } else {
+ return 0;
+ }
+ } catch (SQLException e) {
+ connection.rollback();
+ throw new IOException("Failure increasing count for user/flowGroup " +
name, e);
+ } finally {
+ if (rs != null) {
+ rs.close();
+ }
+ connection.close();
+ }
+ }
+
+ public void decreaseCount(String name, CountType countType) throws
IOException, SQLException {
+ Connection connection = dataSource.getConnection();
+ connection.setAutoCommit(false);
+
+ String selectStatement;
+ String decreaseStatement;
+
+ switch(countType) {
+ case USER_COUNT:
+ selectStatement = GET_USER_COUNT;
+ decreaseStatement = DECREASE_USER_COUNT_SQL;
+ break;
+ case REQUESTER_COUNT:
+ selectStatement = GET_REQUESTER_COUNT;
+ decreaseStatement = DECREASE_REQUESTER_COUNT_SQL;
+ break;
+ case FLOWGROUP_COUNT:
+ selectStatement = GET_FLOWGROUP_COUNT;
+ decreaseStatement = DECREASE_FLOWGROUP_COUNT_SQL;
+ break;
+ default:
+ throw new IOException("Invalid count type " + countType);
+ }
+
+ ResultSet rs = null;
+ try (
+ PreparedStatement statement1 =
connection.prepareStatement(selectStatement);
+ PreparedStatement statement2 =
connection.prepareStatement(decreaseStatement);
+ PreparedStatement statement3 =
connection.prepareStatement(DELETE_USER_SQL)) {
+ statement1.setString(1, name);
+ statement2.setString(1, name);
+ statement3.setString(1, name);
+ rs = statement1.executeQuery();
+ statement2.executeUpdate();
+ statement3.executeUpdate();
+ connection.commit();
+ if (rs != null && rs.next() && rs.getInt(1) == 0) {
+ log.warn("Decrement job count was called for " + name + " when the
count was already zero/absent.");
+ }
+ } catch (SQLException e) {
+ connection.rollback();
+ throw new IOException("Failure decreasing count from user/flowGroup "
+ name, e);
+ } finally {
+ if (rs != null) {
+ rs.close();
+ }
+ connection.close();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
index 36b4d2718..752956d11 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
@@ -14,224 +14,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.gobblin.service.modules.orchestration;
-import com.google.common.collect.ImmutableMap;
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import javax.inject.Singleton;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.configuration.ConfigurationKeys;
+import java.util.Collection;
+
import org.apache.gobblin.exception.QuotaExceededException;
-import org.apache.gobblin.service.RequesterService;
-import org.apache.gobblin.service.ServiceRequester;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.util.ConfigUtils;
-
/**
- * Manages the statically configured user quotas for both the proxy user in
user.to.proxy configuration and the API requester(s)
- * Is used by the dag manager to ensure that the number of currently running
jobs do not exceed the quota, if the quota
- * is exceeded, then the execution will fail without running on the underlying
executor
+ * Manages the statically configured user quotas for the proxy user in
user.to.proxy configuration, the API requester(s)
+ * and the flow group.
+ * It is used by the {@link DagManager} to ensure that the number of currently
running jobs do not exceed the quota, if
+ * the quota is exceeded, the execution will fail without running on the
underlying executor.
*/
-@Slf4j
-@Singleton
-public class UserQuotaManager {
- public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX +
"perUserQuota";
- public static final String PER_FLOWGROUP_QUOTA =
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
- public static final String USER_JOB_QUOTA_KEY =
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
- public static final String QUOTA_SEPERATOR = ":";
- public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
- private final Map<String, Integer> proxyUserToJobCount = new
ConcurrentHashMap<>();
- private final Map<String, Integer> flowGroupToJobCount = new
ConcurrentHashMap<>();
- private final Map<String, Integer> requesterToJobCount = new
ConcurrentHashMap<>();
- private final Map<String, Integer> perUserQuota;
- private final Map<String, Integer> perFlowGroupQuota;
- Map<String, Boolean> runningDagIds = new ConcurrentHashMap<>();
- private final int defaultQuota;
-
- @Inject
- public UserQuotaManager(Config config) {
- this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY,
DEFAULT_USER_JOB_QUOTA);
- ImmutableMap.Builder<String, Integer> userMapBuilder =
ImmutableMap.builder();
- ImmutableMap.Builder<String, Integer> flowGroupMapBuilder =
ImmutableMap.builder();
- // Quotas will take form of user:<Quota> and flowGroup:<Quota>
- for (String flowGroupQuota : ConfigUtils.getStringList(config,
PER_FLOWGROUP_QUOTA)) {
- flowGroupMapBuilder.put(flowGroupQuota.split(QUOTA_SEPERATOR)[0],
Integer.parseInt(flowGroupQuota.split(QUOTA_SEPERATOR)[1]));
- }
- // Keep quotas per user as well in form user:<Quota> which apply for all
flowgroups
- for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA))
{
- userMapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0],
Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
- }
- this.perUserQuota = userMapBuilder.build();
- this.perFlowGroupQuota = flowGroupMapBuilder.build();
- }
+public interface UserQuotaManager {
/**
- * Checks if the dagNode exceeds the statically configured user quota for
both the proxy user, requester user, and flowGroup
- * @throws QuotaExceededException if the quota is exceeded, and logs a
statement
+ * Initialize with the provided set of dags.
*/
- public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean
onInit) throws QuotaExceededException {
- // Dag is already being tracked, no need to double increment for retries
and multihop flows
- if (isDagCurrentlyRunning(dagNode)) {
- return;
- }
- String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
- String flowGroup =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
- ConfigurationKeys.FLOW_GROUP_KEY, "");
- String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
- boolean proxyUserCheck = true;
- Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for
which quota is increased
- StringBuilder requesterMessage = new StringBuilder();
- runningDagIds.put(DagManagerUtils.generateDagId(dagNode), true);
- if (proxyUser != null) {
- int proxyQuotaIncrement = incrementJobCountAndCheckQuota(
- DagManagerUtils.getUserQuotaKey(proxyUser, dagNode),
proxyUserToJobCount, dagNode, getQuotaForUser(proxyUser));
- proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check
succeeds
- if (!proxyUserCheck) {
- // add 1 to proxyUserIncrement since count starts at 0, and is
negative if quota is exceeded
- requesterMessage.append(String.format(
- "Quota exceeded for proxy user %s on executor %s : quota=%s,
requests above quota=%d%n",
- proxyUser, specExecutorUri, getQuotaForUser(proxyUser),
Math.abs(proxyQuotaIncrement)+1-getQuotaForUser(proxyUser)));
- }
- }
-
- String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
- boolean requesterCheck = true;
-
- if (serializedRequesters != null) {
- List<String> uniqueRequesters =
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
- for (String requester : uniqueRequesters) {
- int userQuotaIncrement = incrementJobCountAndCheckQuota(
- DagManagerUtils.getUserQuotaKey(requester, dagNode),
requesterToJobCount, dagNode, getQuotaForUser(requester));
- boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota
check succeeds
- usersQuotaIncrement.add(requester);
- requesterCheck = requesterCheck && thisRequesterCheck;
- if (!thisRequesterCheck) {
- requesterMessage.append(String.format(
- "Quota exceeded for requester %s on executor %s : quota=%s,
requests above quota=%d%n",
- requester, specExecutorUri, getQuotaForUser(requester),
Math.abs(userQuotaIncrement)-getQuotaForUser(requester)));
- }
- }
- }
-
- int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
- DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode),
flowGroupToJobCount, dagNode, getQuotaForFlowGroup(flowGroup));
- boolean flowGroupCheck = flowGroupQuotaIncrement >= 0;
- if (!flowGroupCheck) {
- requesterMessage.append(String.format(
- "Quota exceeded for flowgroup %s on executor %s : quota=%s, requests
above quota=%d%n",
- flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
Math.abs(flowGroupQuotaIncrement)+1-getQuotaForFlowGroup(flowGroup)));
- }
-
- // Throw errors for reach quota at the end to avoid inconsistent job counts
- if ((!proxyUserCheck || !requesterCheck || !flowGroupCheck) && !onInit) {
- // roll back the increased counts in this block
- decrementQuotaUsage(proxyUserToJobCount,
DagManagerUtils.getUserQuotaKey(proxyUser, dagNode));
- decrementQuotaUsage(flowGroupToJobCount,
DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode));
- decrementQuotaUsageForUsers(usersQuotaIncrement);
- runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
- throw new QuotaExceededException(requesterMessage.toString());
- }
- }
+ void init(Collection<Dag<JobExecutionPlan>> dags) throws IOException;
/**
- * Increment quota by one for the given map and key.
- * @return a negative number if quota is already reached
- * a positive number if the quota is not reached
- * the absolute value of the number is the used quota before this
increment request
- * 0 if quota usage is not changed
+ * Checks if the dagNode exceeds the statically configured user quota for
the proxy user, requester user and flowGroup.
+ * It also increases the quota usage for proxy user, requester and the
flowGroup of the given DagNode by one.
+ * @throws QuotaExceededException if the quota is exceeded
*/
- private int incrementJobCountAndCheckQuota(String key, Map<String, Integer>
quotaMap, Dag.DagNode<JobExecutionPlan> dagNode, int quotaForKey) {
- // Only increment job count for first attempt, since job is considered
running between retries
- // Include the scenario where currentAttempts is 0 (when checked by the
scheduler)
- // but it will not double increment due to first ensuring that the dag is
not already incremented
- if (dagNode.getValue().getCurrentAttempts() > 1) {
- return 0;
- }
-
- Integer currentCount;
- // Modifications must be thread safe since DAGs on DagManagerThreads may
update the quota for the same user
- do {
- currentCount = quotaMap.get(key);
- } while (currentCount == null ? quotaMap.putIfAbsent(key, 1) != null :
!quotaMap.replace(key, currentCount, currentCount + 1));
-
- if (currentCount == null) {
- currentCount = 0;
- }
-
- if (currentCount >= quotaForKey) {
- return -currentCount; // increment must have crossed the quota
- } else {
- return currentCount;
- }
- }
+ void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;
/**
* Decrement the quota by one for the proxy user and requesters
corresponding to the provided {@link Dag.DagNode}.
- * Returns true if the dag existed in the set of running dags and was
removed successfully
+ * Returns true if successfully reduces the quota usage
*/
- public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) {
- Boolean val = runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
- if (val == null) {
- return false;
- }
- String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
- if (proxyUser != null) {
- String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser,
dagNode);
- decrementQuotaUsage(proxyUserToJobCount, proxyUserKey);
- }
- String flowGroup =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
- ConfigurationKeys.FLOW_GROUP_KEY, "");
- decrementQuotaUsage(flowGroupToJobCount,
DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode));
- String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
- if (serializedRequesters != null) {
- try {
- for (ServiceRequester requester :
RequesterService.deserialize(serializedRequesters)) {
- String requesterKey =
DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode);
- decrementQuotaUsage(requesterToJobCount, requesterKey);
- }
- } catch (IOException e) {
- log.error("Failed to release quota for requester list " +
serializedRequesters, e);
- return false;
- }
- }
- return true;
- }
-
- private void decrementQuotaUsage(Map<String, Integer> quotaMap, String key) {
- Integer currentCount;
- if (key == null) {
- return;
- }
- do {
- currentCount = quotaMap.get(key);
- } while (currentCount != null && currentCount > 0 &&
!quotaMap.replace(key, currentCount, currentCount - 1));
- }
-
- private void decrementQuotaUsageForUsers(Set<String>
requestersToDecreaseCount) {
- for (String requester : requestersToDecreaseCount) {
- decrementQuotaUsage(requesterToJobCount, requester);
- }
- }
-
- private int getQuotaForUser(String user) {
- return this.perUserQuota.getOrDefault(user, defaultQuota);
- }
-
- private int getQuotaForFlowGroup(String flowGroup) {
- return this.perFlowGroupQuota.getOrDefault(flowGroup, defaultQuota);
- }
-
- private boolean isDagCurrentlyRunning(Dag.DagNode<JobExecutionPlan> dagNode)
{
- return
this.runningDagIds.containsKey(DagManagerUtils.generateDagId(dagNode));
- }
-
+ boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws
IOException;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index c8286740a..438ccff8d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -35,7 +35,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.exception.QuotaExceededException;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
@@ -336,8 +335,8 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
if (quotaManager.isPresent()) {
// QuotaManager has idempotent checks for a dagNode, so this check
won't double add quotas for a flow in the DagManager
try {
- quotaManager.get().checkQuota(dag.getNodes().get(0), false);
- } catch (QuotaExceededException e) {
+ quotaManager.get().checkQuota(dag.getNodes().get(0));
+ } catch (IOException e) {
throw new RuntimeException(e);
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index a66717c56..02f6f58de 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -28,8 +28,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
+
+import
org.apache.gobblin.service.modules.orchestration.AbstractUserQuotaManager;
import
org.apache.gobblin.service.modules.orchestration.ServiceAzkabanConfigKeys;
-import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.hadoop.fs.Path;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jgit.api.Git;
@@ -179,7 +180,7 @@ public class GobblinServiceManagerTest {
serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY,
false);
serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY,
MockedSpecCompiler.class.getCanonicalName());
- serviceCoreProperties.put(UserQuotaManager.PER_USER_QUOTA, "testUser:1");
+ serviceCoreProperties.put(AbstractUserQuotaManager.PER_USER_QUOTA,
"testUser:1");
transportClientProperties.put(HttpClientFactory.HTTP_REQUEST_TIMEOUT,
"10000");
// Create a bare repository
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index b414a76e6..d85c708f4 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -99,8 +99,8 @@ public class DagManagerTest {
this.resumeQueue = new LinkedBlockingQueue<>();
this.metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
Config quotaConfig = ConfigFactory.empty()
- .withValue(UserQuotaManager.PER_USER_QUOTA,
ConfigValueFactory.fromAnyRef("user:1"));
- this._gobblinServiceQuotaManager = new UserQuotaManager(quotaConfig);
+ .withValue(AbstractUserQuotaManager.PER_USER_QUOTA,
ConfigValueFactory.fromAnyRef("user:1"));
+ this._gobblinServiceQuotaManager = new
InMemoryUserQuotaManager(quotaConfig);
this._dagManagerMetrics = new DagManagerMetrics(metricContext);
this._dagManagerMetrics.activate();
this._dagManagerThread = new
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore,
_failedDagStateStore, queue, cancelQueue,
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java
similarity index 83%
rename from
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
rename to
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java
index 9fc9438db..5e63a0f00 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java
@@ -29,16 +29,16 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-public class UserQuotaManagerTest {
+public class InMemoryUserQuotaManagerTest {
- UserQuotaManager _quotaManager;
+ InMemoryUserQuotaManager _quotaManager;
@BeforeClass
public void setUp() {
Config quotaConfig = ConfigFactory.empty()
- .withValue(UserQuotaManager.PER_USER_QUOTA,
ConfigValueFactory.fromAnyRef("user:1,user2:1,user3:1,user6:1"))
- .withValue(UserQuotaManager.PER_FLOWGROUP_QUOTA,
ConfigValueFactory.fromAnyRef("group1:1,group2:2"));
- this._quotaManager = new UserQuotaManager(quotaConfig);
+ .withValue(AbstractUserQuotaManager.PER_USER_QUOTA,
ConfigValueFactory.fromAnyRef("user:1,user2:1,user3:1,user6:1"))
+ .withValue(AbstractUserQuotaManager.PER_FLOWGROUP_QUOTA,
ConfigValueFactory.fromAnyRef("group1:1,group2:2"));
+ this._quotaManager = new InMemoryUserQuotaManager(quotaConfig);
}
// Tests that if exceeding the quota on startup, do not throw an exception
and do not decrement the counter
@@ -49,9 +49,8 @@ public class UserQuotaManagerTest {
dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1);
dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1);
- this._quotaManager.checkQuota(dags.get(0).getNodes().get(0), true);
// Should not be throwing the exception
- this._quotaManager.checkQuota(dags.get(1).getNodes().get(0), true);
+ this._quotaManager.init(dags);
}
@Test
@@ -62,9 +61,9 @@ public class UserQuotaManagerTest {
dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1);
dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1);
- this._quotaManager.checkQuota(dags.get(0).getNodes().get(0), false);
+ this._quotaManager.checkQuota(dags.get(0).getNodes().get(0));
Assert.assertThrows(IOException.class, () -> {
- this._quotaManager.checkQuota(dags.get(1).getNodes().get(0), false);
+ this._quotaManager.checkQuota(dags.get(1).getNodes().get(0));
});
}
@@ -77,7 +76,7 @@ public class UserQuotaManagerTest {
dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1);
dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1);
- this._quotaManager.checkQuota(dags.get(0).getNodes().get(0), false);
+ this._quotaManager.checkQuota(dags.get(0).getNodes().get(0));
Assert.assertTrue(this._quotaManager.releaseQuota(dags.get(0).getNodes().get(0)));
Assert.assertFalse(this._quotaManager.releaseQuota(dags.get(0).getNodes().get(0)));
}
@@ -92,9 +91,9 @@ public class UserQuotaManagerTest {
dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1);
dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1);
- this._quotaManager.checkQuota(dags.get(0).getNodes().get(0), false);
+ this._quotaManager.checkQuota(dags.get(0).getNodes().get(0));
Assert.assertThrows(IOException.class, () -> {
- this._quotaManager.checkQuota(dags.get(1).getNodes().get(0), false);
+ this._quotaManager.checkQuota(dags.get(1).getNodes().get(0));
});
}
@@ -116,20 +115,20 @@ public class UserQuotaManagerTest {
dag3.getNodes().get(0).getValue().setCurrentAttempts(1);
dag4.getNodes().get(0).getValue().setCurrentAttempts(1);
- this._quotaManager.checkQuota(dag1.getNodes().get(0), false);
- this._quotaManager.checkQuota(dag2.getNodes().get(0), false);
+ this._quotaManager.checkQuota(dag1.getNodes().get(0));
+ this._quotaManager.checkQuota(dag2.getNodes().get(0));
// Should fail due to user quota
Assert.assertThrows(IOException.class, () -> {
- this._quotaManager.checkQuota(dag3.getNodes().get(0), false);
+ this._quotaManager.checkQuota(dag3.getNodes().get(0));
});
// Should fail due to flowgroup quota
Assert.assertThrows(IOException.class, () -> {
- this._quotaManager.checkQuota(dag4.getNodes().get(0), false);
+ this._quotaManager.checkQuota(dag4.getNodes().get(0));
});
// should pass due to quota being released
this._quotaManager.releaseQuota(dag2.getNodes().get(0));
- this._quotaManager.checkQuota(dag3.getNodes().get(0), false);
- this._quotaManager.checkQuota(dag4.getNodes().get(0), false);
+ this._quotaManager.checkQuota(dag3.getNodes().get(0));
+ this._quotaManager.checkQuota(dag4.getNodes().get(0));
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
new file mode 100644
index 000000000..4bdd45597
--- /dev/null
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+
+public class MysqlUserQuotaManagerTest {
+ private static final String USER = "testUser";
+ private static final String PASSWORD = "testPassword";
+ private static final String TABLE = "quotas";
+ private static final String PROXY_USER = "abora";
+ private MysqlUserQuotaManager quotaManager;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+
+ Config config = ConfigBuilder.create()
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY,
testDb.getJdbcUrl())
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+ .build();
+
+ this.quotaManager = new MysqlUserQuotaManager(config);
+ }
+
+ @Test
+ public void testIncreaseCount() throws Exception {
+ int prevCount = this.quotaManager.incrementJobCount(PROXY_USER,
AbstractUserQuotaManager.CountType.USER_COUNT);
+ Assert.assertEquals(prevCount, 0);
+
+ prevCount = this.quotaManager.incrementJobCount(PROXY_USER,
AbstractUserQuotaManager.CountType.USER_COUNT);
+ Assert.assertEquals(prevCount, 1);
+ Assert.assertEquals(this.quotaManager.getCount(PROXY_USER,
AbstractUserQuotaManager.CountType.USER_COUNT), 2);
+
+ prevCount = this.quotaManager.incrementJobCount(PROXY_USER,
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+ Assert.assertEquals(prevCount, 0);
+
+ prevCount = this.quotaManager.incrementJobCount(PROXY_USER,
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+ Assert.assertEquals(prevCount, 1);
+ }
+
+ @Test(dependsOnMethods = "testIncreaseCount")
+ public void testDecreaseCount() throws Exception {
+ this.quotaManager.decrementJobCount(PROXY_USER,
AbstractUserQuotaManager.CountType.USER_COUNT);
+ Assert.assertEquals(this.quotaManager.getCount(PROXY_USER,
AbstractUserQuotaManager.CountType.USER_COUNT), 1);
+
+ this.quotaManager.decrementJobCount(PROXY_USER,
AbstractUserQuotaManager.CountType.USER_COUNT);
+ Assert.assertEquals(this.quotaManager.getCount(PROXY_USER,
AbstractUserQuotaManager.CountType.USER_COUNT), 0);
+
+ this.quotaManager.decrementJobCount(PROXY_USER,
AbstractUserQuotaManager.CountType.USER_COUNT);
+ Assert.assertEquals(this.quotaManager.getCount(PROXY_USER,
AbstractUserQuotaManager.CountType.USER_COUNT), 0);
+
+ this.quotaManager.decrementJobCount(PROXY_USER,
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+ Assert.assertEquals(this.quotaManager.getCount(PROXY_USER,
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT), 1);
+ this.quotaManager.decrementJobCount(PROXY_USER,
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+ // on count reduced to zero, the row should get deleted and the get call
should return -1 instead of 0.
+ Assert.assertEquals(this.quotaManager.getCount(PROXY_USER,
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT), -1);
+ }
+
+ class ChangeCountRunnable implements Runnable {
+ boolean increaseOrDecrease;
+
+ public ChangeCountRunnable(boolean increaseOrDecrease) {
+ this.increaseOrDecrease = increaseOrDecrease;
+ }
+
+ @Override
+ public void run() {
+ int i = 0;
+ while (i++ < 1000) {
+ try {
+ if (increaseOrDecrease) {
+
MysqlUserQuotaManagerTest.this.quotaManager.incrementJobCount(PROXY_USER,
AbstractUserQuotaManager.CountType.USER_COUNT);
+ } else {
+
MysqlUserQuotaManagerTest.this.quotaManager.decrementJobCount(PROXY_USER,
AbstractUserQuotaManager.CountType.USER_COUNT);
+ }
+ } catch (IOException e) {
+ Assert.fail("Thread got an exception.", e);
+ }
+ }
+ }
+ }
+
+ @Test(dependsOnMethods = "testDecreaseCount")
+ public void testConcurrentChanges() throws IOException, InterruptedException
{
+ Runnable increaseCountRunnable = new ChangeCountRunnable(true);
+ Runnable decreaseCountRunnable = new ChangeCountRunnable(false);
+ Thread thread1 = new Thread(increaseCountRunnable);
+ Thread thread2 = new Thread(increaseCountRunnable);
+ Thread thread3 = new Thread(increaseCountRunnable);
+ Thread thread4 = new Thread(decreaseCountRunnable);
+ Thread thread5 = new Thread(decreaseCountRunnable);
+ Thread thread6 = new Thread(decreaseCountRunnable);
+
+ thread1.start();
+ thread2.start();
+ thread3.start();
+ thread1.join();
+ thread2.join();
+ thread3.join();
+ Assert.assertEquals(this.quotaManager.getCount(PROXY_USER,
AbstractUserQuotaManager.CountType.USER_COUNT), 3000);
+ thread4.start();
+ thread5.start();
+ thread6.start();
+ thread4.join();
+ thread5.join();
+ thread6.join();
+ Assert.assertEquals(this.quotaManager.getCount(PROXY_USER,
AbstractUserQuotaManager.CountType.USER_COUNT), -1);
+ }
+}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index a219148bc..107892243 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -47,6 +47,8 @@ import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.AbstractUserQuotaManager;
+import
org.apache.gobblin.service.modules.orchestration.InMemoryUserQuotaManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalogTest;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
@@ -76,7 +78,7 @@ public class GobblinServiceJobSchedulerTest {
private Config quotaConfig;
@BeforeClass
public void setUp() {
- this.quotaConfig =
ConfigFactory.empty().withValue(UserQuotaManager.PER_FLOWGROUP_QUOTA,
ConfigValueFactory.fromAnyRef("group1:1"));
+ this.quotaConfig =
ConfigFactory.empty().withValue(AbstractUserQuotaManager.PER_FLOWGROUP_QUOTA,
ConfigValueFactory.fromAnyRef("group1:1"));
}
/**
* Test whenever JobScheduler is calling setActive, the FlowSpec is loading
into scheduledFlowSpecs (eventually)
@@ -107,7 +109,7 @@ public class GobblinServiceJobSchedulerTest {
Assert.assertEquals(flowCatalog.getSpecs().size(), 2);
Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
- UserQuotaManager quotaManager = new UserQuotaManager(quotaConfig);
+ UserQuotaManager quotaManager = new InMemoryUserQuotaManager(quotaConfig);
// Mock a GaaS scheduler.
TestGobblinServiceJobScheduler scheduler = new
TestGobblinServiceJobScheduler("testscheduler",
@@ -197,7 +199,7 @@ public class GobblinServiceJobSchedulerTest {
// Mock a GaaS scheduler.
TestGobblinServiceJobScheduler scheduler = new
TestGobblinServiceJobScheduler("testscheduler",
- ConfigFactory.empty(), Optional.of(flowCatalog), null,
mockOrchestrator, Optional.of(new UserQuotaManager(quotaConfig)), null, false);
+ ConfigFactory.empty(), Optional.of(flowCatalog), null,
mockOrchestrator, Optional.of(new InMemoryUserQuotaManager(quotaConfig)), null,
false);
SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
@@ -260,7 +262,7 @@ public class GobblinServiceJobSchedulerTest {
SchedulerService schedulerService = new SchedulerService(new Properties());
// Mock a GaaS scheduler.
TestGobblinServiceJobScheduler scheduler = new
TestGobblinServiceJobScheduler("testscheduler",
- ConfigFactory.empty(), Optional.of(flowCatalog), null,
mockOrchestrator, Optional.of(new UserQuotaManager(quotaConfig)),
schedulerService, false);
+ ConfigFactory.empty(), Optional.of(flowCatalog), null,
mockOrchestrator, Optional.of(new InMemoryUserQuotaManager(quotaConfig)),
schedulerService, false);
schedulerService.startAsync().awaitRunning();
scheduler.startUp();
@@ -330,7 +332,7 @@ public class GobblinServiceJobSchedulerTest {
SchedulerService schedulerService = new SchedulerService(new Properties());
// Mock a GaaS scheduler not in warm standby mode
GobblinServiceJobScheduler scheduler = new
GobblinServiceJobScheduler("testscheduler",
- ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog),
null, mockOrchestrator, schedulerService, Optional.of(new
UserQuotaManager(quotaConfig)), Optional.absent(), false);
+ ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog),
null, mockOrchestrator, schedulerService, Optional.of(new
InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), false);
schedulerService.startAsync().awaitRunning();
scheduler.startUp();
@@ -348,7 +350,7 @@ public class GobblinServiceJobSchedulerTest {
//Mock a GaaS scheduler in warm standby mode, where we don't check quota
GobblinServiceJobScheduler schedulerWithWarmStandbyEnabled = new
GobblinServiceJobScheduler("testscheduler",
- ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog),
null, mockOrchestrator, schedulerService, Optional.of(new
UserQuotaManager(quotaConfig)), Optional.absent(), true);
+ ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog),
null, mockOrchestrator, schedulerService, Optional.of(new
InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), true);
schedulerWithWarmStandbyEnabled.startUp();
schedulerWithWarmStandbyEnabled.setActive(true);