This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new fa97ff5d4 add MysqlUserQuotaManager (#3545)
fa97ff5d4 is described below

commit fa97ff5d4411c6edb349a05b7c15adcfb8e54b9c
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Tue Sep 6 13:07:03 2022 -0700

    add MysqlUserQuotaManager (#3545)
    
    fix unit test and address review comments
    address review comments
    merge conflicts
---
 .../apache/gobblin/service/ServiceConfigKeys.java  |   4 +
 .../testing/TestMetastoreDatabaseServer.java       |   2 +-
 .../modules/core/GobblinServiceGuiceModule.java    |   4 +-
 ...aManager.java => AbstractUserQuotaManager.java} | 221 +++++++++---------
 .../service/modules/orchestration/DagManager.java  |  15 +-
 .../modules/orchestration/DagManagerUtils.java     |   5 +
 .../orchestration/InMemoryUserQuotaManager.java    | 119 ++++++++++
 .../orchestration/MysqlUserQuotaManager.java       | 259 +++++++++++++++++++++
 .../modules/orchestration/UserQuotaManager.java    | 217 ++---------------
 .../scheduler/GobblinServiceJobScheduler.java      |   5 +-
 .../gobblin/service/GobblinServiceManagerTest.java |   5 +-
 .../modules/orchestration/DagManagerTest.java      |   4 +-
 ...Test.java => InMemoryUserQuotaManagerTest.java} |  35 ++-
 .../orchestration/MysqlUserQuotaManagerTest.java   | 138 +++++++++++
 .../scheduler/GobblinServiceJobSchedulerTest.java  |  14 +-
 15 files changed, 692 insertions(+), 355 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index dc149e20b..dbb9ef072 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -142,6 +142,10 @@ public class ServiceConfigKeys {
 
   public static final String FORCE_LEADER = GOBBLIN_SERVICE_PREFIX + 
"forceLeader";
   public static final boolean DEFAULT_FORCE_LEADER = false;
+
+  public static final String QUOTA_MANAGER_CLASS = GOBBLIN_SERVICE_PREFIX + 
"quotaManager.class";
+  public static final String DEFAULT_QUOTA_MANAGER = 
"org.apache.gobblin.service.modules.orchestration.InMemoryUserQuotaManager";
+
   // Group Membership authentication service
   public static final String GROUP_OWNERSHIP_SERVICE_CLASS = 
GOBBLIN_SERVICE_PREFIX + "groupOwnershipService.class";
   public static final String DEFAULT_GROUP_OWNERSHIP_SERVICE = 
"org.apache.gobblin.service.NoopGroupOwnershipService";
diff --git 
a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseServer.java
 
b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseServer.java
index 77786bd45..ebbd0ad98 100644
--- 
a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseServer.java
+++ 
b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseServer.java
@@ -86,7 +86,7 @@ class TestMetastoreDatabaseServer implements Closeable {
     this.dbHost = this.embeddedMysqlEnabled ? "localhost" : 
realConfig.getString(DBHOST_KEY);
     this.dbPort = this.embeddedMysqlEnabled ? new 
PortUtils.ServerSocketPortLocator().random() : realConfig.getInt(DBPORT_KEY);
 
-    this.log.error("Starting with config: embeddedMysqlEnabled={} 
dbUserName={} dbHost={} dbPort={}",
+    this.log.info("Starting with config: embeddedMysqlEnabled={} dbUserName={} 
dbHost={} dbPort={}",
                   this.embeddedMysqlEnabled,
                   this.dbUserName,
                   this.dbHost,
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 653354d45..310073a8c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -200,7 +200,9 @@ public class GobblinServiceGuiceModule implements Module {
       binder.bind(SchedulerService.class);
       binder.bind(GobblinServiceJobScheduler.class);
       OptionalBinder.newOptionalBinder(binder, UserQuotaManager.class);
-      binder.bind(UserQuotaManager.class);
+      binder.bind(UserQuotaManager.class)
+          .to(getClassByNameOrAlias(UserQuotaManager.class, 
serviceConfig.getInnerConfig(),
+              ServiceConfigKeys.QUOTA_MANAGER_CLASS, 
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER));
     }
 
     if (serviceConfig.isGitConfigMonitorEnabled()) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
similarity index 50%
copy from 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
copy to 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
index 36b4d2718..8d6cab041 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
@@ -14,51 +14,46 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.gobblin.service.modules.orchestration;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
 import java.io.IOException;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import javax.inject.Singleton;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.exception.QuotaExceededException;
-import org.apache.gobblin.service.RequesterService;
-import org.apache.gobblin.service.ServiceRequester;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
- * Manages the statically configured user quotas for both the proxy user in 
user.to.proxy configuration and the API requester(s)
- * Is used by the dag manager to ensure that the number of currently running 
jobs do not exceed the quota, if the quota
- * is exceeded, then the execution will fail without running on the underlying 
executor
+ * An abstract implementation of {@link UserQuotaManager} that has base 
implementation of checking quota and increasing/decreasing it.
  */
 @Slf4j
-@Singleton
-public class UserQuotaManager {
+abstract public class AbstractUserQuotaManager implements UserQuotaManager {
   public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + 
"perUserQuota";
   public static final String PER_FLOWGROUP_QUOTA = 
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
   public static final String USER_JOB_QUOTA_KEY = 
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
   public static final String QUOTA_SEPERATOR = ":";
   public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
-  private final Map<String, Integer> proxyUserToJobCount = new 
ConcurrentHashMap<>();
-  private final Map<String, Integer> flowGroupToJobCount = new 
ConcurrentHashMap<>();
-  private final Map<String, Integer> requesterToJobCount = new 
ConcurrentHashMap<>();
   private final Map<String, Integer> perUserQuota;
   private final Map<String, Integer> perFlowGroupQuota;
-  Map<String, Boolean> runningDagIds = new ConcurrentHashMap<>();
+  // TODO : we might want to make this field implementation specific to be 
able to decide if the dag is already running or have been accepted
+  Set<String> runningDagIds = ConcurrentHashMap.newKeySet();
   private final int defaultQuota;
 
-  @Inject
-  public UserQuotaManager(Config config) {
+  public AbstractUserQuotaManager(Config config) {
     this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, 
DEFAULT_USER_JOB_QUOTA);
     ImmutableMap.Builder<String, Integer> userMapBuilder = 
ImmutableMap.builder();
     ImmutableMap.Builder<String, Integer> flowGroupMapBuilder = 
ImmutableMap.builder();
@@ -74,151 +69,147 @@ public class UserQuotaManager {
     this.perFlowGroupQuota = flowGroupMapBuilder.build();
   }
 
-  /**
-   * Checks if the dagNode exceeds the statically configured user quota for 
both the proxy user, requester user, and flowGroup
-   * @throws QuotaExceededException if the quota is exceeded, and logs a 
statement
-   */
-  public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean 
onInit) throws QuotaExceededException {
+  // Implementations should return the current count and increase them by one
+  abstract int incrementJobCount(String key, CountType countType) throws 
IOException;
+
+  abstract void decrementJobCount(String key, CountType countType) throws 
IOException;
+
+  public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws 
IOException {
+    QuotaCheck quotaCheck = increaseAndCheckQuota(dagNode);
+
+    // Throw errors for reach quota at the end to avoid inconsistent job counts
+    if ((!quotaCheck.proxyUserCheck || !quotaCheck.requesterCheck || 
!quotaCheck.flowGroupCheck)) {
+      // roll back the increased counts in this block
+      rollbackIncrements(dagNode);
+      throw new QuotaExceededException(quotaCheck.requesterMessage);
+    }
+  }
+
+  private void rollbackIncrements(Dag.DagNode<JobExecutionPlan> dagNode) 
throws IOException {
+    String proxyUser = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), 
AzkabanProjectConfig.USER_TO_PROXY, null);
+    String flowGroup = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), 
ConfigurationKeys.FLOW_GROUP_KEY, "");
+    List<String> usersQuotaIncrement = 
DagManagerUtils.getDistinctUniqueRequesters(DagManagerUtils.getSerializedRequesterList(dagNode));
+
+    decrementJobCount(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), 
CountType.USER_COUNT);
+    decrementQuotaUsageForUsers(usersQuotaIncrement);
+    decrementJobCount(DagManagerUtils.getFlowGroupQuotaKey(flowGroup, 
dagNode), CountType.FLOWGROUP_COUNT);
+    runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
+  }
+
+  protected QuotaCheck increaseAndCheckQuota(Dag.DagNode<JobExecutionPlan> 
dagNode) throws IOException {
+    QuotaCheck quotaCheck = new QuotaCheck(true, true, true, "");
     // Dag is already being tracked, no need to double increment for retries 
and multihop flows
-    if (isDagCurrentlyRunning(dagNode)) {
-      return;
+    if (this.runningDagIds.contains(DagManagerUtils.generateDagId(dagNode))) {
+      return quotaCheck;
+    } else {
+      runningDagIds.add(DagManagerUtils.generateDagId(dagNode));
     }
+
     String proxyUser = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), 
AzkabanProjectConfig.USER_TO_PROXY, null);
     String flowGroup = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
         ConfigurationKeys.FLOW_GROUP_KEY, "");
     String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
-    boolean proxyUserCheck = true;
-    Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for 
which quota is increased
     StringBuilder requesterMessage = new StringBuilder();
-    runningDagIds.put(DagManagerUtils.generateDagId(dagNode), true);
-    if (proxyUser != null) {
+
+    boolean proxyUserCheck;
+
+    if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
       int proxyQuotaIncrement = incrementJobCountAndCheckQuota(
-          DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), 
proxyUserToJobCount, dagNode, getQuotaForUser(proxyUser));
+          DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), 
getQuotaForUser(proxyUser), CountType.USER_COUNT);
       proxyUserCheck = proxyQuotaIncrement >= 0;  // proxy user quota check 
succeeds
+      quotaCheck.setProxyUserCheck(proxyUserCheck);
       if (!proxyUserCheck) {
-        // add 1 to proxyUserIncrement since count starts at 0, and is 
negative if quota is exceeded
+        // add 1 to proxyUserIncrement since proxyQuotaIncrement is the count 
before the increment
         requesterMessage.append(String.format(
             "Quota exceeded for proxy user %s on executor %s : quota=%s, 
requests above quota=%d%n",
-            proxyUser, specExecutorUri, getQuotaForUser(proxyUser), 
Math.abs(proxyQuotaIncrement)+1-getQuotaForUser(proxyUser)));
+            proxyUser, specExecutorUri, getQuotaForUser(proxyUser), 
Math.abs(proxyQuotaIncrement) + 1 - getQuotaForUser(proxyUser)));
       }
     }
 
     String serializedRequesters = 
DagManagerUtils.getSerializedRequesterList(dagNode);
     boolean requesterCheck = true;
 
-    if (serializedRequesters != null) {
+    if (dagNode.getValue().getCurrentAttempts() <= 1) {
       List<String> uniqueRequesters = 
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
       for (String requester : uniqueRequesters) {
         int userQuotaIncrement = incrementJobCountAndCheckQuota(
-            DagManagerUtils.getUserQuotaKey(requester, dagNode), 
requesterToJobCount, dagNode, getQuotaForUser(requester));
+            DagManagerUtils.getUserQuotaKey(requester, dagNode), 
getQuotaForUser(requester), CountType.REQUESTER_COUNT);
         boolean thisRequesterCheck = userQuotaIncrement >= 0;  // user quota 
check succeeds
-        usersQuotaIncrement.add(requester);
         requesterCheck = requesterCheck && thisRequesterCheck;
+        quotaCheck.setRequesterCheck(requesterCheck);
         if (!thisRequesterCheck) {
           requesterMessage.append(String.format(
-              "Quota exceeded for requester %s on executor %s : quota=%s, 
requests above quota=%d%n",
-              requester, specExecutorUri, getQuotaForUser(requester), 
Math.abs(userQuotaIncrement)-getQuotaForUser(requester)));
+              "Quota exceeded for requester %s on executor %s : quota=%s, 
requests above quota=%d%n. ",
+              requester, specExecutorUri, getQuotaForUser(requester), 
Math.abs(userQuotaIncrement) + 1 - getQuotaForUser(requester)));
         }
       }
     }
 
-    int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
-        DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), 
flowGroupToJobCount, dagNode, getQuotaForFlowGroup(flowGroup));
-    boolean flowGroupCheck = flowGroupQuotaIncrement >= 0;
-    if (!flowGroupCheck) {
-      requesterMessage.append(String.format(
-          "Quota exceeded for flowgroup %s on executor %s : quota=%s, requests 
above quota=%d%n",
-          flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup), 
Math.abs(flowGroupQuotaIncrement)+1-getQuotaForFlowGroup(flowGroup)));
-    }
-
-    // Throw errors for reach quota at the end to avoid inconsistent job counts
-    if ((!proxyUserCheck || !requesterCheck || !flowGroupCheck) && !onInit) {
-      // roll back the increased counts in this block
-      decrementQuotaUsage(proxyUserToJobCount, 
DagManagerUtils.getUserQuotaKey(proxyUser, dagNode));
-      decrementQuotaUsage(flowGroupToJobCount, 
DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode));
-      decrementQuotaUsageForUsers(usersQuotaIncrement);
-      runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
-      throw new QuotaExceededException(requesterMessage.toString());
-    }
-  }
-
-  /**
-   * Increment quota by one for the given map and key.
-   * @return a negative number if quota is already reached
-   *         a positive number if the quota is not reached
-   *         the absolute value of the number is the used quota before this 
increment request
-   *         0 if quota usage is not changed
-   */
-  private int incrementJobCountAndCheckQuota(String key, Map<String, Integer> 
quotaMap, Dag.DagNode<JobExecutionPlan> dagNode, int quotaForKey) {
-    // Only increment job count for first attempt, since job is considered 
running between retries
-    // Include the scenario where currentAttempts is 0 (when checked by the 
scheduler)
-    // but it will not double increment due to first ensuring that the dag is 
not already incremented
-    if (dagNode.getValue().getCurrentAttempts() > 1) {
-      return 0;
+    boolean flowGroupCheck;
+
+    if (dagNode.getValue().getCurrentAttempts() <= 1) {
+      int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
+          DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), 
getQuotaForFlowGroup(flowGroup), CountType.FLOWGROUP_COUNT);
+      flowGroupCheck = flowGroupQuotaIncrement >= 0;
+      quotaCheck.setFlowGroupCheck(flowGroupCheck);
+      if (!flowGroupCheck) {
+        requesterMessage.append(String.format("Quota exceeded for flowgroup %s 
on executor %s : quota=%s, requests above quota=%d%n",
+            flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
+            Math.abs(flowGroupQuotaIncrement) + 1 - 
getQuotaForFlowGroup(flowGroup)));
+      }
     }
 
-    Integer currentCount;
-    // Modifications must be thread safe since DAGs on DagManagerThreads may 
update the quota for the same user
-    do {
-      currentCount = quotaMap.get(key);
-    } while (currentCount == null ? quotaMap.putIfAbsent(key, 1) != null : 
!quotaMap.replace(key, currentCount, currentCount + 1));
-
-    if (currentCount == null) {
-      currentCount = 0;
-    }
+    quotaCheck.setRequesterMessage(requesterMessage.toString());
 
-    if (currentCount >= quotaForKey) {
-      return -currentCount; // increment must have crossed the quota
-    } else {
-      return currentCount;
-    }
+    return quotaCheck;
   }
 
   /**
    * Decrement the quota by one for the proxy user and requesters 
corresponding to the provided {@link Dag.DagNode}.
    * Returns true if the dag existed in the set of running dags and was 
removed successfully
    */
-  public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) {
-    Boolean val = runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
-    if (val == null) {
+  public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws 
IOException {
+    boolean val = runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
+    if (!val) {
       return false;
     }
+
     String proxyUser = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), 
AzkabanProjectConfig.USER_TO_PROXY, null);
     if (proxyUser != null) {
       String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, 
dagNode);
-      decrementQuotaUsage(proxyUserToJobCount, proxyUserKey);
+      decrementJobCount(proxyUserKey, CountType.USER_COUNT);
     }
+
     String flowGroup = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
         ConfigurationKeys.FLOW_GROUP_KEY, "");
-    decrementQuotaUsage(flowGroupToJobCount, 
DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode));
+    decrementJobCount(DagManagerUtils.getFlowGroupQuotaKey(flowGroup, 
dagNode), CountType.FLOWGROUP_COUNT);
+
     String serializedRequesters = 
DagManagerUtils.getSerializedRequesterList(dagNode);
-    if (serializedRequesters != null) {
-      try {
-        for (ServiceRequester requester : 
RequesterService.deserialize(serializedRequesters)) {
-          String requesterKey = 
DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode);
-          decrementQuotaUsage(requesterToJobCount, requesterKey);
-        }
-      } catch (IOException e) {
-        log.error("Failed to release quota for requester list " + 
serializedRequesters, e);
-        return false;
+    try {
+      for (String requester : 
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters)) {
+        String requesterKey = DagManagerUtils.getUserQuotaKey(requester, 
dagNode);
+        decrementJobCount(requesterKey, CountType.REQUESTER_COUNT);
       }
+    } catch (IOException e) {
+      log.error("Failed to release quota for requester list " + 
serializedRequesters, e);
+      return false;
     }
+
     return true;
   }
 
-  private void decrementQuotaUsage(Map<String, Integer> quotaMap, String key) {
-    Integer currentCount;
-    if (key == null) {
-      return;
+  private int incrementJobCountAndCheckQuota(String key, int keyQuota, 
CountType countType) throws IOException {
+    int currentCount = incrementJobCount(key, countType);
+    if (currentCount >= keyQuota) {
+      return -currentCount;
+    } else {
+      return currentCount;
     }
-    do {
-      currentCount = quotaMap.get(key);
-    } while (currentCount != null && currentCount > 0 && 
!quotaMap.replace(key, currentCount, currentCount - 1));
   }
 
-  private void decrementQuotaUsageForUsers(Set<String> 
requestersToDecreaseCount) {
+  private void decrementQuotaUsageForUsers(List<String> 
requestersToDecreaseCount) throws IOException {
     for (String requester : requestersToDecreaseCount) {
-      decrementQuotaUsage(requesterToJobCount, requester);
+      decrementJobCount(requester, CountType.REQUESTER_COUNT);
     }
   }
 
@@ -230,8 +221,18 @@ public class UserQuotaManager {
     return this.perFlowGroupQuota.getOrDefault(flowGroup, defaultQuota);
   }
 
-  private boolean isDagCurrentlyRunning(Dag.DagNode<JobExecutionPlan> dagNode) 
{
-    return 
this.runningDagIds.containsKey(DagManagerUtils.generateDagId(dagNode));
+  @Setter
+  @AllArgsConstructor
+  protected static class QuotaCheck {
+    boolean proxyUserCheck;
+    boolean requesterCheck;
+    boolean flowGroupCheck;
+    String requesterMessage;
   }
 
-}
+  protected enum CountType {
+    USER_COUNT,
+    REQUESTER_COUNT,
+    FLOWGROUP_COUNT
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index f423277cf..3543c0e16 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -350,16 +350,9 @@ public class DagManager extends AbstractIdleService {
 
        this.dagManagerMetrics.activate();
 
-        UserQuotaManager quotaManager = new UserQuotaManager(config);
-        // Before initializing the DagManagerThreads check which dags are 
currently running before shutdown
-        for (Dag<JobExecutionPlan> dag: dagStateStore.getDags()) {
-          for (DagNode<JobExecutionPlan> dagNode: dag.getNodes()) {
-            if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) {
-              // Add all the currently running Dags to the quota limit per user
-              quotaManager.checkQuota(dagNode, true);
-            }
-          }
-        }
+        UserQuotaManager quotaManager = new InMemoryUserQuotaManager(config);
+        quotaManager.init(dagStateStore.getDags());
+
         //On startup, the service creates DagManagerThreads that are scheduled 
at a fixed rate.
         this.dagManagerThreads = new DagManagerThread[numThreads];
         for (int i = 0; i < numThreads; i++) {
@@ -911,7 +904,7 @@ public class DagManager extends AbstractIdleService {
       // Run this spec on selected executor
       SpecProducer<Spec> producer;
       try {
-        quotaManager.checkQuota(dagNode, false);
+        quotaManager.checkQuota(dagNode);
         producer = DagManagerUtils.getSpecProducer(dagNode);
         TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? 
this.eventSubmitter.get().
             getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : 
null;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 9d59a9085..dc79524b3 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.orchestration;
 import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -345,6 +346,10 @@ public class DagManagerUtils {
   }
 
   static List<String> getDistinctUniqueRequesters(String serializedRequesters) 
{
+    if (serializedRequesters == null) {
+      return Collections.emptyList();
+    }
+
     List<String> uniqueRequesters;
     try {
       uniqueRequesters = RequesterService.deserialize(serializedRequesters)
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManager.java
new file mode 100644
index 000000000..438e86d20
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManager.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.inject.Singleton;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.RUNNING;
+
+
+/**
+ * An implementation of {@link UserQuotaManager} that stores quota usage in 
memory.
+ */
+@Slf4j
+@Singleton
+public class InMemoryUserQuotaManager extends AbstractUserQuotaManager {
+  private final Map<String, Integer> proxyUserToJobCount = new 
ConcurrentHashMap<>();
+  private final Map<String, Integer> flowGroupToJobCount = new 
ConcurrentHashMap<>();
+  private final Map<String, Integer> requesterToJobCount = new 
ConcurrentHashMap<>();
+
+  @Inject
+  public InMemoryUserQuotaManager(Config config) {
+    super(config);
+  }
+
+  public void init(Collection<Dag<JobExecutionPlan>> dags) throws IOException {
+    for (Dag<JobExecutionPlan> dag : dags) {
+      for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+        if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) {
+          // Add all the currently running Dags to the quota limit per user
+          increaseAndCheckQuota(dagNode);
+        }
+      }
+    }
+  }
+
+  private int incrementJobCount(String key, Map<String, Integer> quotaMap) {
+    Integer currentCount;
+    // Modifications must be thread safe since DAGs on DagManagerThreads may 
update the quota for the same user
+    do {
+      currentCount = quotaMap.get(key);
+    } while (currentCount == null ? quotaMap.putIfAbsent(key, 1) != null : 
!quotaMap.replace(key, currentCount, currentCount + 1));
+
+    if (currentCount == null) {
+      currentCount = 0;
+    }
+
+    return currentCount;
+  }
+
+  private void decrementJobCount(String key, Map<String, Integer> quotaMap)  {
+    Integer currentCount;
+    if (key == null) {
+      return;
+    }
+    do {
+      currentCount = quotaMap.get(key);
+    } while (currentCount != null && currentCount > 0 && 
!quotaMap.replace(key, currentCount, currentCount - 1));
+
+    if (currentCount == null || currentCount == 0) {
+      log.warn("Decrement job count was called for " + key + " when the count 
was already zero/absent.");
+    }
+  }
+
+  @Override
+  int incrementJobCount(String user, CountType countType) throws IOException {
+    switch (countType) {
+      case USER_COUNT:
+        return incrementJobCount(user, proxyUserToJobCount);
+      case REQUESTER_COUNT:
+        return incrementJobCount(user, requesterToJobCount);
+      case FLOWGROUP_COUNT:
+        return incrementJobCount(user, flowGroupToJobCount);
+      default:
+        throw new IOException("Invalid count type " + countType);
+    }
+  }
+
+  @Override
+  void decrementJobCount(String user, CountType countType) throws IOException {
+    switch (countType) {
+      case USER_COUNT:
+        decrementJobCount(user, proxyUserToJobCount);
+        break;
+      case REQUESTER_COUNT:
+        decrementJobCount(user, requesterToJobCount);
+        break;
+      case FLOWGROUP_COUNT:
+        decrementJobCount(user, flowGroupToJobCount);
+        break;
+      default:
+        throw new IOException("Invalid count type " + countType);
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
new file mode 100644
index 000000000..78f15cfba
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+
+import org.apache.commons.dbcp.BasicDataSource;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import javax.inject.Singleton;
+import javax.sql.DataSource;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An implementation of {@link UserQuotaManager} that stores quota usage in 
mysql.
+ */
+@Slf4j
+@Singleton
+public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
+  private final MysqlQuotaStore mysqlStore;
+
+  @Inject
+  public MysqlUserQuotaManager(Config config) throws IOException {
+    super(config);
+    this.mysqlStore = createQuotaStore(config);
+  }
+
+  // This implementation does not need to update quota usage when the service 
restarts or it's leadership status changes
+  public void init(Collection<Dag<JobExecutionPlan>> dags) {
+  }
+
+  @Override
+  int incrementJobCount(String user, CountType countType) throws IOException {
+    try {
+      return this.mysqlStore.increaseCount(user, countType);
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  void decrementJobCount(String user, CountType countType) throws IOException {
+    try {
+      this.mysqlStore.decreaseCount(user, countType);
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @VisibleForTesting
+  int getCount(String name, CountType countType) throws IOException {
+    return this.mysqlStore.getCount(name, countType);
+  }
+
+  /**
+   * Creating an instance of StateStore.
+   */
+  protected MysqlQuotaStore createQuotaStore(Config config) throws IOException 
{
+    String stateStoreTableName = ConfigUtils.getString(config, 
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
+        ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);
+
+    BasicDataSource basicDataSource = MysqlStateStore.newDataSource(config);
+
+    return new MysqlQuotaStore(basicDataSource, stateStoreTableName);
+  }
+
+  static class MysqlQuotaStore {
+    protected final DataSource dataSource;
+    final String tableName;
+    private final String GET_USER_COUNT;
+    private final String GET_REQUESTER_COUNT;
+    private final String GET_FLOWGROUP_COUNT;
+    private final String INCREASE_USER_COUNT_SQL;
+    private final String INCREASE_REQUESTER_COUNT_SQL;
+    private final String INCREASE_FLOW_COUNT_SQL;
+    private final String DECREASE_USER_COUNT_SQL;
+    private final String DECREASE_REQUESTER_COUNT_SQL;
+    private final String DECREASE_FLOWGROUP_COUNT_SQL;
+    private final String DELETE_USER_SQL;
+
+    public MysqlQuotaStore(BasicDataSource dataSource, String tableName)
+        throws IOException {
+      this.dataSource = dataSource;
+      this.tableName = tableName;
+
+      GET_USER_COUNT = "SELECT user_count FROM " + tableName + " WHERE name = 
? FOR UPDATE";
+      GET_REQUESTER_COUNT = "SELECT requester_count FROM " + tableName + " 
WHERE name = ? FOR UPDATE";
+      GET_FLOWGROUP_COUNT = "SELECT flowgroup_count FROM " + tableName + " 
WHERE name = ? FOR UPDATE";
+      INCREASE_USER_COUNT_SQL = "INSERT INTO " + tableName + " (name, 
user_count) VALUES (?, 1) "
+          + "ON DUPLICATE KEY UPDATE user_count=user_count+1";
+      INCREASE_REQUESTER_COUNT_SQL = "INSERT INTO " + tableName + " (name, 
requester_count) VALUES (?, 1) "
+          + "ON DUPLICATE KEY UPDATE requester_count=requester_count+1";
+      INCREASE_FLOW_COUNT_SQL = "INSERT INTO " + tableName + " (name, 
flowgroup_count) VALUES (?, 1) "
+          + "ON DUPLICATE KEY UPDATE flowgroup_count=flowgroup_count+1";
+      DECREASE_USER_COUNT_SQL = "UPDATE " + tableName + " SET 
user_count=GREATEST(0, user_count-1) WHERE name = ?";
+      DECREASE_REQUESTER_COUNT_SQL = "UPDATE " + tableName + " SET 
requester_count=GREATEST(0, requester_count-1) WHERE name = ?";
+      DECREASE_FLOWGROUP_COUNT_SQL = "UPDATE " + tableName + " SET 
flowgroup_count=flowgroup_count-1 WHERE name = ?";
+      DELETE_USER_SQL = "DELETE FROM " + tableName + " WHERE name = ? AND 
user_count<1 AND flowgroup_count<1";
+
+      String createQuotaTable = "CREATE TABLE IF NOT EXISTS " + tableName + " 
(name VARCHAR(20) CHARACTER SET latin1 NOT NULL, "
+          + "user_count INT NOT NULL DEFAULT 0, requester_count INT NOT NULL 
DEFAULT 0, flowgroup_count INT NOT NULL DEFAULT 0, "
+          + "PRIMARY KEY (name), " + "UNIQUE INDEX ind (name))";
+      try (Connection connection = dataSource.getConnection(); 
PreparedStatement createStatement = 
connection.prepareStatement(createQuotaTable)) {
+        createStatement.executeUpdate();
+      } catch (SQLException e) {
+        throw new IOException("Failure creation table " + tableName, e);
+      }
+    }
+
+    /**
+     * returns count of countType for the name. if the row does not exist, 
returns zero.
+     */
+    @VisibleForTesting
+    int getCount(String name, CountType countType) throws IOException {
+      String selectStatement = countType == CountType.USER_COUNT ? 
GET_USER_COUNT : GET_FLOWGROUP_COUNT;
+      try (Connection connection = dataSource.getConnection();
+          PreparedStatement queryStatement = 
connection.prepareStatement(selectStatement)) {
+        queryStatement.setString(1, name);
+        try (ResultSet rs = queryStatement.executeQuery()) {
+          if (rs.next()) {
+            return rs.getInt(1);
+          } else {
+            return -1;
+          }
+        }
+      } catch (Exception e) {
+        throw new IOException("failure retrieving count from user/flowGroup " 
+ name, e);
+      }
+    }
+
+    public int increaseCount(String name, CountType countType) throws 
IOException, SQLException {
+      Connection connection = dataSource.getConnection();
+      connection.setAutoCommit(false);
+
+      String selectStatement;
+      String increaseStatement;
+
+      switch(countType) {
+        case USER_COUNT:
+          selectStatement = GET_USER_COUNT;
+          increaseStatement = INCREASE_USER_COUNT_SQL;
+          break;
+        case REQUESTER_COUNT:
+          selectStatement = GET_REQUESTER_COUNT;
+          increaseStatement = INCREASE_REQUESTER_COUNT_SQL;
+          break;
+        case FLOWGROUP_COUNT:
+          selectStatement = GET_FLOWGROUP_COUNT;
+          increaseStatement = INCREASE_FLOW_COUNT_SQL;
+          break;
+        default:
+          throw new IOException("Invalid count type " + countType);
+      }
+
+      ResultSet rs = null;
+      try (PreparedStatement statement1 = 
connection.prepareStatement(selectStatement);
+          PreparedStatement statement2 = 
connection.prepareStatement(increaseStatement)) {
+        statement1.setString(1, name);
+        statement2.setString(1, name);
+        rs = statement1.executeQuery();
+        statement2.executeUpdate();
+        connection.commit();
+        if (rs != null && rs.next()) {
+          return rs.getInt(1);
+        } else {
+          return 0;
+        }
+      } catch (SQLException e) {
+        connection.rollback();
+        throw new IOException("Failure increasing count for user/flowGroup " + 
name, e);
+      } finally {
+        if (rs != null) {
+          rs.close();
+        }
+        connection.close();
+      }
+    }
+
+    public void decreaseCount(String name, CountType countType) throws 
IOException, SQLException {
+      Connection connection = dataSource.getConnection();
+      connection.setAutoCommit(false);
+
+      String selectStatement;
+      String decreaseStatement;
+
+      switch(countType) {
+        case USER_COUNT:
+          selectStatement = GET_USER_COUNT;
+          decreaseStatement = DECREASE_USER_COUNT_SQL;
+          break;
+        case REQUESTER_COUNT:
+          selectStatement = GET_REQUESTER_COUNT;
+          decreaseStatement = DECREASE_REQUESTER_COUNT_SQL;
+          break;
+        case FLOWGROUP_COUNT:
+          selectStatement = GET_FLOWGROUP_COUNT;
+          decreaseStatement = DECREASE_FLOWGROUP_COUNT_SQL;
+          break;
+        default:
+          throw new IOException("Invalid count type " + countType);
+      }
+
+      ResultSet rs = null;
+      try (
+          PreparedStatement statement1 = 
connection.prepareStatement(selectStatement);
+          PreparedStatement statement2 = 
connection.prepareStatement(decreaseStatement);
+          PreparedStatement statement3 = 
connection.prepareStatement(DELETE_USER_SQL)) {
+        statement1.setString(1, name);
+        statement2.setString(1, name);
+        statement3.setString(1, name);
+        rs = statement1.executeQuery();
+        statement2.executeUpdate();
+        statement3.executeUpdate();
+        connection.commit();
+        if (rs != null && rs.next() && rs.getInt(1) == 0) {
+          log.warn("Decrement job count was called for " + name + " when the 
count was already zero/absent.");
+        }
+      } catch (SQLException e) {
+        connection.rollback();
+        throw new IOException("Failure decreasing count from user/flowGroup " 
+ name, e);
+      } finally {
+        if (rs != null) {
+          rs.close();
+        }
+        connection.close();
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
index 36b4d2718..752956d11 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
@@ -14,224 +14,39 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.gobblin.service.modules.orchestration;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
 import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import javax.inject.Singleton;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.configuration.ConfigurationKeys;
+import java.util.Collection;
+
 import org.apache.gobblin.exception.QuotaExceededException;
-import org.apache.gobblin.service.RequesterService;
-import org.apache.gobblin.service.ServiceRequester;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.util.ConfigUtils;
-
 
 /**
- * Manages the statically configured user quotas for both the proxy user in 
user.to.proxy configuration and the API requester(s)
- * Is used by the dag manager to ensure that the number of currently running 
jobs do not exceed the quota, if the quota
- * is exceeded, then the execution will fail without running on the underlying 
executor
+ * Manages the statically configured user quotas for the proxy user in 
user.to.proxy configuration, the API requester(s)
+ * and the flow group.
+ * It is used by the {@link DagManager} to ensure that the number of currently 
running jobs do not exceed the quota, if
+ * the quota is exceeded, the execution will fail without running on the 
underlying executor.
  */
-@Slf4j
-@Singleton
-public class UserQuotaManager {
-  public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + 
"perUserQuota";
-  public static final String PER_FLOWGROUP_QUOTA = 
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
-  public static final String USER_JOB_QUOTA_KEY = 
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
-  public static final String QUOTA_SEPERATOR = ":";
-  public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
-  private final Map<String, Integer> proxyUserToJobCount = new 
ConcurrentHashMap<>();
-  private final Map<String, Integer> flowGroupToJobCount = new 
ConcurrentHashMap<>();
-  private final Map<String, Integer> requesterToJobCount = new 
ConcurrentHashMap<>();
-  private final Map<String, Integer> perUserQuota;
-  private final Map<String, Integer> perFlowGroupQuota;
-  Map<String, Boolean> runningDagIds = new ConcurrentHashMap<>();
-  private final int defaultQuota;
-
-  @Inject
-  public UserQuotaManager(Config config) {
-    this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, 
DEFAULT_USER_JOB_QUOTA);
-    ImmutableMap.Builder<String, Integer> userMapBuilder = 
ImmutableMap.builder();
-    ImmutableMap.Builder<String, Integer> flowGroupMapBuilder = 
ImmutableMap.builder();
-    // Quotas will take form of user:<Quota> and flowGroup:<Quota>
-    for (String flowGroupQuota : ConfigUtils.getStringList(config, 
PER_FLOWGROUP_QUOTA)) {
-      flowGroupMapBuilder.put(flowGroupQuota.split(QUOTA_SEPERATOR)[0], 
Integer.parseInt(flowGroupQuota.split(QUOTA_SEPERATOR)[1]));
-    }
-    // Keep quotas per user as well in form user:<Quota> which apply for all 
flowgroups
-    for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) 
{
-      userMapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0], 
Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
-    }
-    this.perUserQuota = userMapBuilder.build();
-    this.perFlowGroupQuota = flowGroupMapBuilder.build();
-  }
+public interface UserQuotaManager {
 
   /**
-   * Checks if the dagNode exceeds the statically configured user quota for 
both the proxy user, requester user, and flowGroup
-   * @throws QuotaExceededException if the quota is exceeded, and logs a 
statement
+   * Initialize with the provided set of dags.
    */
-  public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean 
onInit) throws QuotaExceededException {
-    // Dag is already being tracked, no need to double increment for retries 
and multihop flows
-    if (isDagCurrentlyRunning(dagNode)) {
-      return;
-    }
-    String proxyUser = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), 
AzkabanProjectConfig.USER_TO_PROXY, null);
-    String flowGroup = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
-        ConfigurationKeys.FLOW_GROUP_KEY, "");
-    String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
-    boolean proxyUserCheck = true;
-    Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for 
which quota is increased
-    StringBuilder requesterMessage = new StringBuilder();
-    runningDagIds.put(DagManagerUtils.generateDagId(dagNode), true);
-    if (proxyUser != null) {
-      int proxyQuotaIncrement = incrementJobCountAndCheckQuota(
-          DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), 
proxyUserToJobCount, dagNode, getQuotaForUser(proxyUser));
-      proxyUserCheck = proxyQuotaIncrement >= 0;  // proxy user quota check 
succeeds
-      if (!proxyUserCheck) {
-        // add 1 to proxyUserIncrement since count starts at 0, and is 
negative if quota is exceeded
-        requesterMessage.append(String.format(
-            "Quota exceeded for proxy user %s on executor %s : quota=%s, 
requests above quota=%d%n",
-            proxyUser, specExecutorUri, getQuotaForUser(proxyUser), 
Math.abs(proxyQuotaIncrement)+1-getQuotaForUser(proxyUser)));
-      }
-    }
-
-    String serializedRequesters = 
DagManagerUtils.getSerializedRequesterList(dagNode);
-    boolean requesterCheck = true;
-
-    if (serializedRequesters != null) {
-      List<String> uniqueRequesters = 
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
-      for (String requester : uniqueRequesters) {
-        int userQuotaIncrement = incrementJobCountAndCheckQuota(
-            DagManagerUtils.getUserQuotaKey(requester, dagNode), 
requesterToJobCount, dagNode, getQuotaForUser(requester));
-        boolean thisRequesterCheck = userQuotaIncrement >= 0;  // user quota 
check succeeds
-        usersQuotaIncrement.add(requester);
-        requesterCheck = requesterCheck && thisRequesterCheck;
-        if (!thisRequesterCheck) {
-          requesterMessage.append(String.format(
-              "Quota exceeded for requester %s on executor %s : quota=%s, 
requests above quota=%d%n",
-              requester, specExecutorUri, getQuotaForUser(requester), 
Math.abs(userQuotaIncrement)-getQuotaForUser(requester)));
-        }
-      }
-    }
-
-    int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
-        DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), 
flowGroupToJobCount, dagNode, getQuotaForFlowGroup(flowGroup));
-    boolean flowGroupCheck = flowGroupQuotaIncrement >= 0;
-    if (!flowGroupCheck) {
-      requesterMessage.append(String.format(
-          "Quota exceeded for flowgroup %s on executor %s : quota=%s, requests 
above quota=%d%n",
-          flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup), 
Math.abs(flowGroupQuotaIncrement)+1-getQuotaForFlowGroup(flowGroup)));
-    }
-
-    // Throw errors for reach quota at the end to avoid inconsistent job counts
-    if ((!proxyUserCheck || !requesterCheck || !flowGroupCheck) && !onInit) {
-      // roll back the increased counts in this block
-      decrementQuotaUsage(proxyUserToJobCount, 
DagManagerUtils.getUserQuotaKey(proxyUser, dagNode));
-      decrementQuotaUsage(flowGroupToJobCount, 
DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode));
-      decrementQuotaUsageForUsers(usersQuotaIncrement);
-      runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
-      throw new QuotaExceededException(requesterMessage.toString());
-    }
-  }
+  void init(Collection<Dag<JobExecutionPlan>> dags) throws IOException;
 
   /**
-   * Increment quota by one for the given map and key.
-   * @return a negative number if quota is already reached
-   *         a positive number if the quota is not reached
-   *         the absolute value of the number is the used quota before this 
increment request
-   *         0 if quota usage is not changed
+   * Checks if the dagNode exceeds the statically configured user quota for 
the proxy user, requester user and flowGroup.
+   * It also increases the quota usage for proxy user, requester and the 
flowGroup of the given DagNode by one.
+   * @throws QuotaExceededException if the quota is exceeded
    */
-  private int incrementJobCountAndCheckQuota(String key, Map<String, Integer> 
quotaMap, Dag.DagNode<JobExecutionPlan> dagNode, int quotaForKey) {
-    // Only increment job count for first attempt, since job is considered 
running between retries
-    // Include the scenario where currentAttempts is 0 (when checked by the 
scheduler)
-    // but it will not double increment due to first ensuring that the dag is 
not already incremented
-    if (dagNode.getValue().getCurrentAttempts() > 1) {
-      return 0;
-    }
-
-    Integer currentCount;
-    // Modifications must be thread safe since DAGs on DagManagerThreads may 
update the quota for the same user
-    do {
-      currentCount = quotaMap.get(key);
-    } while (currentCount == null ? quotaMap.putIfAbsent(key, 1) != null : 
!quotaMap.replace(key, currentCount, currentCount + 1));
-
-    if (currentCount == null) {
-      currentCount = 0;
-    }
-
-    if (currentCount >= quotaForKey) {
-      return -currentCount; // increment must have crossed the quota
-    } else {
-      return currentCount;
-    }
-  }
+  void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;
 
   /**
    * Decrement the quota by one for the proxy user and requesters 
corresponding to the provided {@link Dag.DagNode}.
-   * Returns true if the dag existed in the set of running dags and was 
removed successfully
+   * Returns true if successfully reduces the quota usage
    */
-  public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) {
-    Boolean val = runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
-    if (val == null) {
-      return false;
-    }
-    String proxyUser = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), 
AzkabanProjectConfig.USER_TO_PROXY, null);
-    if (proxyUser != null) {
-      String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, 
dagNode);
-      decrementQuotaUsage(proxyUserToJobCount, proxyUserKey);
-    }
-    String flowGroup = 
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
-        ConfigurationKeys.FLOW_GROUP_KEY, "");
-    decrementQuotaUsage(flowGroupToJobCount, 
DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode));
-    String serializedRequesters = 
DagManagerUtils.getSerializedRequesterList(dagNode);
-    if (serializedRequesters != null) {
-      try {
-        for (ServiceRequester requester : 
RequesterService.deserialize(serializedRequesters)) {
-          String requesterKey = 
DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode);
-          decrementQuotaUsage(requesterToJobCount, requesterKey);
-        }
-      } catch (IOException e) {
-        log.error("Failed to release quota for requester list " + 
serializedRequesters, e);
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private void decrementQuotaUsage(Map<String, Integer> quotaMap, String key) {
-    Integer currentCount;
-    if (key == null) {
-      return;
-    }
-    do {
-      currentCount = quotaMap.get(key);
-    } while (currentCount != null && currentCount > 0 && 
!quotaMap.replace(key, currentCount, currentCount - 1));
-  }
-
-  private void decrementQuotaUsageForUsers(Set<String> 
requestersToDecreaseCount) {
-    for (String requester : requestersToDecreaseCount) {
-      decrementQuotaUsage(requesterToJobCount, requester);
-    }
-  }
-
-  private int getQuotaForUser(String user) {
-    return this.perUserQuota.getOrDefault(user, defaultQuota);
-  }
-
-  private int getQuotaForFlowGroup(String flowGroup) {
-    return this.perFlowGroupQuota.getOrDefault(flowGroup, defaultQuota);
-  }
-
-  private boolean isDagCurrentlyRunning(Dag.DagNode<JobExecutionPlan> dagNode) 
{
-    return 
this.runningDagIds.containsKey(DagManagerUtils.generateDagId(dagNode));
-  }
-
+  boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws 
IOException;
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index c8286740a..438ccff8d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -35,7 +35,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.exception.QuotaExceededException;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.MetricContext;
@@ -336,8 +335,8 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
       if (quotaManager.isPresent()) {
         // QuotaManager has idempotent checks for a dagNode, so this check 
won't double add quotas for a flow in the DagManager
         try {
-          quotaManager.get().checkQuota(dag.getNodes().get(0), false);
-        } catch (QuotaExceededException e) {
+          quotaManager.get().checkQuota(dag.getNodes().get(0));
+        } catch (IOException e) {
           throw new RuntimeException(e);
         }
       }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index a66717c56..02f6f58de 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -28,8 +28,9 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
+
+import 
org.apache.gobblin.service.modules.orchestration.AbstractUserQuotaManager;
 import 
org.apache.gobblin.service.modules.orchestration.ServiceAzkabanConfigKeys;
-import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import org.apache.hadoop.fs.Path;
 import org.eclipse.jetty.http.HttpStatus;
 import org.eclipse.jgit.api.Git;
@@ -179,7 +180,7 @@ public class GobblinServiceManagerTest {
     
serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY,
 false);
 
     
serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY,
 MockedSpecCompiler.class.getCanonicalName());
-    serviceCoreProperties.put(UserQuotaManager.PER_USER_QUOTA, "testUser:1");
+    serviceCoreProperties.put(AbstractUserQuotaManager.PER_USER_QUOTA, 
"testUser:1");
     transportClientProperties.put(HttpClientFactory.HTTP_REQUEST_TIMEOUT, 
"10000");
 
     // Create a bare repository
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index b414a76e6..d85c708f4 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -99,8 +99,8 @@ public class DagManagerTest {
     this.resumeQueue = new LinkedBlockingQueue<>();
     this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
     Config quotaConfig = ConfigFactory.empty()
-        .withValue(UserQuotaManager.PER_USER_QUOTA, 
ConfigValueFactory.fromAnyRef("user:1"));
-    this._gobblinServiceQuotaManager = new UserQuotaManager(quotaConfig);
+        .withValue(AbstractUserQuotaManager.PER_USER_QUOTA, 
ConfigValueFactory.fromAnyRef("user:1"));
+    this._gobblinServiceQuotaManager = new 
InMemoryUserQuotaManager(quotaConfig);
     this._dagManagerMetrics = new DagManagerMetrics(metricContext);
     this._dagManagerMetrics.activate();
     this._dagManagerThread = new 
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, 
_failedDagStateStore, queue, cancelQueue,
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java
similarity index 83%
rename from 
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
rename to 
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java
index 9fc9438db..5e63a0f00 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java
@@ -29,16 +29,16 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 
-public class UserQuotaManagerTest {
+public class InMemoryUserQuotaManagerTest {
 
-  UserQuotaManager _quotaManager;
+  InMemoryUserQuotaManager _quotaManager;
 
   @BeforeClass
   public void setUp() {
     Config quotaConfig = ConfigFactory.empty()
-        .withValue(UserQuotaManager.PER_USER_QUOTA, 
ConfigValueFactory.fromAnyRef("user:1,user2:1,user3:1,user6:1"))
-        .withValue(UserQuotaManager.PER_FLOWGROUP_QUOTA, 
ConfigValueFactory.fromAnyRef("group1:1,group2:2"));
-    this._quotaManager = new UserQuotaManager(quotaConfig);
+        .withValue(AbstractUserQuotaManager.PER_USER_QUOTA, 
ConfigValueFactory.fromAnyRef("user:1,user2:1,user3:1,user6:1"))
+        .withValue(AbstractUserQuotaManager.PER_FLOWGROUP_QUOTA, 
ConfigValueFactory.fromAnyRef("group1:1,group2:2"));
+    this._quotaManager = new InMemoryUserQuotaManager(quotaConfig);
   }
 
   // Tests that if exceeding the quota on startup, do not throw an exception 
and do not decrement the counter
@@ -49,9 +49,8 @@ public class UserQuotaManagerTest {
     dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1);
     dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1);
 
-    this._quotaManager.checkQuota(dags.get(0).getNodes().get(0), true);
     // Should not be throwing the exception
-    this._quotaManager.checkQuota(dags.get(1).getNodes().get(0), true);
+    this._quotaManager.init(dags);
   }
 
   @Test
@@ -62,9 +61,9 @@ public class UserQuotaManagerTest {
     dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1);
     dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1);
 
-    this._quotaManager.checkQuota(dags.get(0).getNodes().get(0), false);
+    this._quotaManager.checkQuota(dags.get(0).getNodes().get(0));
     Assert.assertThrows(IOException.class, () -> {
-      this._quotaManager.checkQuota(dags.get(1).getNodes().get(0), false);
+      this._quotaManager.checkQuota(dags.get(1).getNodes().get(0));
     });
   }
 
@@ -77,7 +76,7 @@ public class UserQuotaManagerTest {
     dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1);
     dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1);
 
-    this._quotaManager.checkQuota(dags.get(0).getNodes().get(0), false);
+    this._quotaManager.checkQuota(dags.get(0).getNodes().get(0));
     
Assert.assertTrue(this._quotaManager.releaseQuota(dags.get(0).getNodes().get(0)));
     
Assert.assertFalse(this._quotaManager.releaseQuota(dags.get(0).getNodes().get(0)));
   }
@@ -92,9 +91,9 @@ public class UserQuotaManagerTest {
     dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1);
     dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1);
 
-    this._quotaManager.checkQuota(dags.get(0).getNodes().get(0), false);
+    this._quotaManager.checkQuota(dags.get(0).getNodes().get(0));
     Assert.assertThrows(IOException.class, () -> {
-      this._quotaManager.checkQuota(dags.get(1).getNodes().get(0), false);
+      this._quotaManager.checkQuota(dags.get(1).getNodes().get(0));
     });
   }
 
@@ -116,20 +115,20 @@ public class UserQuotaManagerTest {
     dag3.getNodes().get(0).getValue().setCurrentAttempts(1);
     dag4.getNodes().get(0).getValue().setCurrentAttempts(1);
 
-    this._quotaManager.checkQuota(dag1.getNodes().get(0), false);
-    this._quotaManager.checkQuota(dag2.getNodes().get(0), false);
+    this._quotaManager.checkQuota(dag1.getNodes().get(0));
+    this._quotaManager.checkQuota(dag2.getNodes().get(0));
 
     // Should fail due to user quota
     Assert.assertThrows(IOException.class, () -> {
-      this._quotaManager.checkQuota(dag3.getNodes().get(0), false);
+      this._quotaManager.checkQuota(dag3.getNodes().get(0));
     });
     // Should fail due to flowgroup quota
     Assert.assertThrows(IOException.class, () -> {
-      this._quotaManager.checkQuota(dag4.getNodes().get(0), false);
+      this._quotaManager.checkQuota(dag4.getNodes().get(0));
     });
     // should pass due to quota being released
     this._quotaManager.releaseQuota(dag2.getNodes().get(0));
-    this._quotaManager.checkQuota(dag3.getNodes().get(0), false);
-    this._quotaManager.checkQuota(dag4.getNodes().get(0), false);
+    this._quotaManager.checkQuota(dag3.getNodes().get(0));
+    this._quotaManager.checkQuota(dag4.getNodes().get(0));
   }
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
new file mode 100644
index 000000000..4bdd45597
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+
+public class MysqlUserQuotaManagerTest {
+  private static final String USER = "testUser";
+  private static final String PASSWORD = "testPassword";
+  private static final String TABLE = "quotas";
+  private static final String PROXY_USER = "abora";
+  private MysqlUserQuotaManager quotaManager;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+
+    Config config = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, 
testDb.getJdbcUrl())
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+        .build();
+
+    this.quotaManager = new MysqlUserQuotaManager(config);
+  }
+
+  @Test
+  public void testIncreaseCount() throws Exception {
+    int prevCount = this.quotaManager.incrementJobCount(PROXY_USER, 
AbstractUserQuotaManager.CountType.USER_COUNT);
+    Assert.assertEquals(prevCount, 0);
+
+    prevCount = this.quotaManager.incrementJobCount(PROXY_USER, 
AbstractUserQuotaManager.CountType.USER_COUNT);
+    Assert.assertEquals(prevCount, 1);
+    Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, 
AbstractUserQuotaManager.CountType.USER_COUNT), 2);
+
+    prevCount = this.quotaManager.incrementJobCount(PROXY_USER, 
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+    Assert.assertEquals(prevCount, 0);
+
+    prevCount = this.quotaManager.incrementJobCount(PROXY_USER, 
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+    Assert.assertEquals(prevCount, 1);
+  }
+
+  @Test(dependsOnMethods = "testIncreaseCount")
+  public void testDecreaseCount() throws Exception {
+    this.quotaManager.decrementJobCount(PROXY_USER, 
AbstractUserQuotaManager.CountType.USER_COUNT);
+    Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, 
AbstractUserQuotaManager.CountType.USER_COUNT), 1);
+
+    this.quotaManager.decrementJobCount(PROXY_USER, 
AbstractUserQuotaManager.CountType.USER_COUNT);
+    Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, 
AbstractUserQuotaManager.CountType.USER_COUNT), 0);
+
+    this.quotaManager.decrementJobCount(PROXY_USER, 
AbstractUserQuotaManager.CountType.USER_COUNT);
+    Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, 
AbstractUserQuotaManager.CountType.USER_COUNT), 0);
+
+    this.quotaManager.decrementJobCount(PROXY_USER, 
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+    Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, 
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT), 1);
+    this.quotaManager.decrementJobCount(PROXY_USER, 
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+    // on count reduced to zero, the row should get deleted and the get call 
should return -1 instead of 0.
+    Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, 
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT), -1);
+  }
+
+  class ChangeCountRunnable implements Runnable {
+    boolean increaseOrDecrease;
+
+    public ChangeCountRunnable(boolean increaseOrDecrease) {
+      this.increaseOrDecrease = increaseOrDecrease;
+    }
+
+    @Override
+    public void run() {
+      int i = 0;
+      while (i++ < 1000) {
+        try {
+          if (increaseOrDecrease) {
+            
MysqlUserQuotaManagerTest.this.quotaManager.incrementJobCount(PROXY_USER, 
AbstractUserQuotaManager.CountType.USER_COUNT);
+          } else {
+            
MysqlUserQuotaManagerTest.this.quotaManager.decrementJobCount(PROXY_USER, 
AbstractUserQuotaManager.CountType.USER_COUNT);
+          }
+        } catch (IOException e) {
+          Assert.fail("Thread got an exception.", e);
+        }
+      }
+    }
+  }
+
+  @Test(dependsOnMethods = "testDecreaseCount")
+  public void testConcurrentChanges() throws IOException, InterruptedException 
{
+    Runnable increaseCountRunnable = new ChangeCountRunnable(true);
+    Runnable decreaseCountRunnable = new ChangeCountRunnable(false);
+    Thread thread1 = new Thread(increaseCountRunnable);
+    Thread thread2 = new Thread(increaseCountRunnable);
+    Thread thread3 = new Thread(increaseCountRunnable);
+    Thread thread4 = new Thread(decreaseCountRunnable);
+    Thread thread5 = new Thread(decreaseCountRunnable);
+    Thread thread6 = new Thread(decreaseCountRunnable);
+
+    thread1.start();
+    thread2.start();
+    thread3.start();
+    thread1.join();
+    thread2.join();
+    thread3.join();
+    Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, 
AbstractUserQuotaManager.CountType.USER_COUNT), 3000);
+    thread4.start();
+    thread5.start();
+    thread6.start();
+    thread4.join();
+    thread5.join();
+    thread6.join();
+    Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, 
AbstractUserQuotaManager.CountType.USER_COUNT), -1);
+  }
+}
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index a219148bc..107892243 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -47,6 +47,8 @@ import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
 import org.apache.gobblin.service.modules.flow.SpecCompiler;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.AbstractUserQuotaManager;
+import 
org.apache.gobblin.service.modules.orchestration.InMemoryUserQuotaManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalogTest;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
@@ -76,7 +78,7 @@ public class GobblinServiceJobSchedulerTest {
   private Config quotaConfig;
   @BeforeClass
   public void setUp() {
-    this.quotaConfig = 
ConfigFactory.empty().withValue(UserQuotaManager.PER_FLOWGROUP_QUOTA, 
ConfigValueFactory.fromAnyRef("group1:1"));
+    this.quotaConfig = 
ConfigFactory.empty().withValue(AbstractUserQuotaManager.PER_FLOWGROUP_QUOTA, 
ConfigValueFactory.fromAnyRef("group1:1"));
   }
   /**
    * Test whenever JobScheduler is calling setActive, the FlowSpec is loading 
into scheduledFlowSpecs (eventually)
@@ -107,7 +109,7 @@ public class GobblinServiceJobSchedulerTest {
     Assert.assertEquals(flowCatalog.getSpecs().size(), 2);
 
     Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
-    UserQuotaManager quotaManager = new UserQuotaManager(quotaConfig);
+    UserQuotaManager quotaManager = new InMemoryUserQuotaManager(quotaConfig);
 
     // Mock a GaaS scheduler.
     TestGobblinServiceJobScheduler scheduler = new 
TestGobblinServiceJobScheduler("testscheduler",
@@ -197,7 +199,7 @@ public class GobblinServiceJobSchedulerTest {
 
     // Mock a GaaS scheduler.
     TestGobblinServiceJobScheduler scheduler = new 
TestGobblinServiceJobScheduler("testscheduler",
-        ConfigFactory.empty(), Optional.of(flowCatalog), null, 
mockOrchestrator, Optional.of(new UserQuotaManager(quotaConfig)), null, false);
+        ConfigFactory.empty(), Optional.of(flowCatalog), null, 
mockOrchestrator, Optional.of(new InMemoryUserQuotaManager(quotaConfig)), null, 
false);
 
     SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
     Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
@@ -260,7 +262,7 @@ public class GobblinServiceJobSchedulerTest {
     SchedulerService schedulerService = new SchedulerService(new Properties());
     // Mock a GaaS scheduler.
     TestGobblinServiceJobScheduler scheduler = new 
TestGobblinServiceJobScheduler("testscheduler",
-        ConfigFactory.empty(), Optional.of(flowCatalog), null, 
mockOrchestrator, Optional.of(new UserQuotaManager(quotaConfig)), 
schedulerService, false);
+        ConfigFactory.empty(), Optional.of(flowCatalog), null, 
mockOrchestrator, Optional.of(new InMemoryUserQuotaManager(quotaConfig)), 
schedulerService, false);
 
     schedulerService.startAsync().awaitRunning();
     scheduler.startUp();
@@ -330,7 +332,7 @@ public class GobblinServiceJobSchedulerTest {
     SchedulerService schedulerService = new SchedulerService(new Properties());
     // Mock a GaaS scheduler not in warm standby mode
     GobblinServiceJobScheduler scheduler = new 
GobblinServiceJobScheduler("testscheduler",
-        ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), 
null, mockOrchestrator, schedulerService, Optional.of(new 
UserQuotaManager(quotaConfig)), Optional.absent(), false);
+        ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), 
null, mockOrchestrator, schedulerService, Optional.of(new 
InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), false);
 
     schedulerService.startAsync().awaitRunning();
     scheduler.startUp();
@@ -348,7 +350,7 @@ public class GobblinServiceJobSchedulerTest {
 
     //Mock a GaaS scheduler in warm standby mode, where we don't check quota
     GobblinServiceJobScheduler schedulerWithWarmStandbyEnabled = new 
GobblinServiceJobScheduler("testscheduler",
-        ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), 
null, mockOrchestrator, schedulerService, Optional.of(new 
UserQuotaManager(quotaConfig)), Optional.absent(), true);
+        ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), 
null, mockOrchestrator, schedulerService, Optional.of(new 
InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), true);
 
     schedulerWithWarmStandbyEnabled.startUp();
     schedulerWithWarmStandbyEnabled.setActive(true);

Reply via email to