umustafi commented on code in PR #3545: URL: https://github.com/apache/gobblin/pull/3545#discussion_r955301288
########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.collect.ImmutableMap; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.RequesterService; +import org.apache.gobblin.service.ServiceRequester; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.util.ConfigUtils; + + +/** + * An abstract implementation of {@link UserQuotaManager} that Review Comment: looks like this docstring was unfinished ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java: ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; + +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase; +import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory; + +public class MysqlUserQuotaManagerTest { + private static final String USER = "testUser"; + private static final String PASSWORD = "testPassword"; + private static final String TABLE = "quotas"; + private static final String TEST_NAME = "abora"; + private MysqlUserQuotaManager quotaManager; + + @BeforeClass + public void setUp() throws Exception { + ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get(); + + Config config = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl()) + .addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER) + .addPrimitive(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD) + .addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE) + .build(); + + this.quotaManager = new MysqlUserQuotaManager(config); + } + + @Test + public void testIncreaseCount() throws Exception { + int prevCount = this.quotaManager.incrementJobCount(TEST_NAME, AbstractUserQuotaManager.CountType.USER_COUNT); + Assert.assertEquals(prevCount, 0); + + prevCount = this.quotaManager.incrementJobCount(TEST_NAME, AbstractUserQuotaManager.CountType.USER_COUNT); + Assert.assertEquals(prevCount, 1); + Assert.assertEquals(this.quotaManager.getCount(TEST_NAME, AbstractUserQuotaManager.CountType.USER_COUNT), 2); + + prevCount = this.quotaManager.incrementJobCount(TEST_NAME, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT); + Assert.assertEquals(prevCount, 0); + + prevCount = this.quotaManager.incrementJobCount(TEST_NAME, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT); + Assert.assertEquals(prevCount, 1); + } + + @Test(dependsOnMethods = "testIncreaseCount") + public void testDecreaseCount() throws Exception { + this.quotaManager.decrementJobCount(TEST_NAME, AbstractUserQuotaManager.CountType.USER_COUNT); + Assert.assertEquals(this.quotaManager.getCount(TEST_NAME, AbstractUserQuotaManager.CountType.USER_COUNT), 1); + + this.quotaManager.decrementJobCount(TEST_NAME, AbstractUserQuotaManager.CountType.USER_COUNT); + Assert.assertEquals(this.quotaManager.getCount(TEST_NAME, AbstractUserQuotaManager.CountType.USER_COUNT), 0); + + this.quotaManager.decrementJobCount(TEST_NAME, AbstractUserQuotaManager.CountType.USER_COUNT); + Assert.assertEquals(this.quotaManager.getCount(TEST_NAME, AbstractUserQuotaManager.CountType.USER_COUNT), 0); + + this.quotaManager.decrementJobCount(TEST_NAME, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT); + Assert.assertEquals(this.quotaManager.getCount(TEST_NAME, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT), 1); + this.quotaManager.decrementJobCount(TEST_NAME, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT); + Assert.assertEquals(this.quotaManager.getCount(TEST_NAME, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT), -1); + } + + class ChangeCountRunnable implements Runnable { + boolean increaseOrDecrease; + + public ChangeCountRunnable(boolean increaseOrDecrease) { + this.increaseOrDecrease = increaseOrDecrease; + } + + @Override + public void run() { + int i = 0; + while (i++ < 1000) { + try { + if (increaseOrDecrease) { + MysqlUserQuotaManagerTest.this.quotaManager.incrementJobCount(TEST_NAME, AbstractUserQuotaManager.CountType.USER_COUNT); + } else { + MysqlUserQuotaManagerTest.this.quotaManager.decrementJobCount(TEST_NAME, AbstractUserQuotaManager.CountType.USER_COUNT); + } + } catch (IOException e) { + Assert.fail("Thread got an exception.", e); + } + } + } + } + + @Test(dependsOnMethods = "testDecreaseCount") + public void testConcurrentChanges() throws IOException, InterruptedException { + Runnable increaseCountRunnable = new ChangeCountRunnable(true); + Runnable decreaseCountRunnable = new ChangeCountRunnable(false); + Thread thread1 = new Thread(increaseCountRunnable); + Thread thread2 = new Thread(increaseCountRunnable); + Thread thread3 = new Thread(increaseCountRunnable); + Thread thread4 = new Thread(decreaseCountRunnable); + Thread thread5 = new Thread(decreaseCountRunnable); + Thread thread6 = new Thread(decreaseCountRunnable); + + thread1.start(); + thread2.start(); + thread3.start(); + thread1.join(); + thread2.join(); + thread3.join(); + Assert.assertEquals(this.quotaManager.getCount(TEST_NAME, AbstractUserQuotaManager.CountType.USER_COUNT), 3000); + thread4.start(); + thread5.start(); + thread6.start(); + thread4.join(); + thread5.join(); + thread6.join(); + Assert.assertEquals(this.quotaManager.getCount(TEST_NAME, AbstractUserQuotaManager.CountType.USER_COUNT), -1); Review Comment: why does this become -1, should it decrease by 3? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.collect.ImmutableMap; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.RequesterService; +import org.apache.gobblin.service.ServiceRequester; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.util.ConfigUtils; + + +/** + * An abstract implementation of {@link UserQuotaManager} that + */ +@Slf4j +abstract public class AbstractUserQuotaManager implements UserQuotaManager { + public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perUserQuota"; + public static final String PER_FLOWGROUP_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota"; + public static final String USER_JOB_QUOTA_KEY = DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota"; + public static final String QUOTA_SEPERATOR = ":"; + public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE; + private final Map<String, Integer> perUserQuota; + private final Map<String, Integer> perFlowGroupQuota; + Set<String> runningDagIds = ConcurrentHashMap.newKeySet(); + private final int defaultQuota; + + public AbstractUserQuotaManager(Config config) { + this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA); + ImmutableMap.Builder<String, Integer> userMapBuilder = ImmutableMap.builder(); + ImmutableMap.Builder<String, Integer> flowGroupMapBuilder = ImmutableMap.builder(); + // Quotas will take form of user:<Quota> and flowGroup:<Quota> + for (String flowGroupQuota : ConfigUtils.getStringList(config, PER_FLOWGROUP_QUOTA)) { + flowGroupMapBuilder.put(flowGroupQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(flowGroupQuota.split(QUOTA_SEPERATOR)[1])); + } + // Keep quotas per user as well in form user:<Quota> which apply for all flowgroups + for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) { + userMapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1])); + } + this.perUserQuota = userMapBuilder.build(); + this.perFlowGroupQuota = flowGroupMapBuilder.build(); + } + + abstract int incrementJobCount(String key, CountType countType) throws IOException; + + abstract void decrementJobCount(String user, CountType countType) throws IOException; + + public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean onInit) throws IOException { + // Dag is already being tracked, no need to double increment for retries and multihop flows + if (isDagCurrentlyRunning(dagNode)) { + return; + } + String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null); + String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), + ConfigurationKeys.FLOW_GROUP_KEY, ""); + String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode); + boolean proxyUserCheck = true; + Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for which quota is increased + StringBuilder requesterMessage = new StringBuilder(); + runningDagIds.add(DagManagerUtils.generateDagId(dagNode)); + if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) { + int proxyQuotaIncrement = incrementJobCountAndCheckQuota( + DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), getQuotaForUser(proxyUser), CountType.USER_COUNT); + proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check succeeds + if (!proxyUserCheck) { + // add 1 to proxyUserIncrement since count starts at 0, and is negative if quota is exceeded + requesterMessage.append(String.format( + "Quota exceeded for proxy user %s on executor %s : quota=%s, requests above quota=%d%n", + proxyUser, specExecutorUri, getQuotaForUser(proxyUser), Math.abs(proxyQuotaIncrement)+1-getQuotaForUser(proxyUser))); + } + } + + String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode); + boolean requesterCheck = true; + + if (serializedRequesters != null && dagNode.getValue().getCurrentAttempts() <= 1) { + List<String> uniqueRequesters = DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters); + for (String requester : uniqueRequesters) { + int userQuotaIncrement = incrementJobCountAndCheckQuota( + DagManagerUtils.getUserQuotaKey(requester, dagNode), getQuotaForUser(requester), CountType.REQUESTER_COUNT); + boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota check succeeds + usersQuotaIncrement.add(requester); + requesterCheck = requesterCheck && thisRequesterCheck; + if (!thisRequesterCheck) { + requesterMessage.append(String.format( + "Quota exceeded for requester %s on executor %s : quota=%s, requests above quota=%d%n", + requester, specExecutorUri, getQuotaForUser(requester), Math.abs(userQuotaIncrement)-getQuotaForUser(requester))); + } + } + } + + boolean flowGroupCheck = true; + if (dagNode.getValue().getCurrentAttempts() <= 1) { + int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota( + DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), getQuotaForFlowGroup(flowGroup), CountType.FLOW_COUNT); + flowGroupCheck = flowGroupQuotaIncrement >= 0; + if (!flowGroupCheck) { + requesterMessage.append(String.format("Quota exceeded for flowgroup %s on executor %s : quota=%s, requests above quota=%d%n", + flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup), + Math.abs(flowGroupQuotaIncrement) + 1 - getQuotaForFlowGroup(flowGroup))); + } + } + + // Throw errors for reach quota at the end to avoid inconsistent job counts Review Comment: nice! this is a good catch ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.collect.ImmutableMap; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.RequesterService; +import org.apache.gobblin.service.ServiceRequester; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.util.ConfigUtils; + + +/** + * An abstract implementation of {@link UserQuotaManager} that + */ +@Slf4j +abstract public class AbstractUserQuotaManager implements UserQuotaManager { + public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perUserQuota"; + public static final String PER_FLOWGROUP_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota"; + public static final String USER_JOB_QUOTA_KEY = DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota"; + public static final String QUOTA_SEPERATOR = ":"; + public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE; + private final Map<String, Integer> perUserQuota; + private final Map<String, Integer> perFlowGroupQuota; + Set<String> runningDagIds = ConcurrentHashMap.newKeySet(); + private final int defaultQuota; + + public AbstractUserQuotaManager(Config config) { + this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA); Review Comment: where is this being used ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManager.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.orchestration; + +import com.google.inject.Inject; +import com.typesafe.config.Config; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import javax.inject.Singleton; +import lombok.extern.slf4j.Slf4j; + + +/** + * An implementation of {@link UserQuotaManager} that stores quota usage in memory. + */ +@Slf4j +@Singleton +public class InMemoryUserQuotaManager extends AbstractUserQuotaManager { + private final Map<String, Integer> proxyUserToJobCount = new ConcurrentHashMap<>(); + private final Map<String, Integer> flowGroupToJobCount = new ConcurrentHashMap<>(); + private final Map<String, Integer> requesterToJobCount = new ConcurrentHashMap<>(); + + @Inject + public InMemoryUserQuotaManager(Config config) { + super(config); + } + + private int incrementJobCount(String key, Map<String, Integer> quotaMap) { + Integer currentCount; + // Modifications must be thread safe since DAGs on DagManagerThreads may update the quota for the same user + do { + currentCount = quotaMap.get(key); + } while (currentCount == null ? quotaMap.putIfAbsent(key, 1) != null : !quotaMap.replace(key, currentCount, currentCount + 1)); + + if (currentCount == null) { + currentCount = 0; + } + + return currentCount; + } + + private void decrementJobCount(String key, Map<String, Integer> quotaMap) { + Integer currentCount; + if (key == null) { + return; + } + do { + currentCount = quotaMap.get(key); + } while (currentCount != null && currentCount > 0 && !quotaMap.replace(key, currentCount, currentCount - 1)); Review Comment: let's add an exception or ERROR log if there's a case decrement job count is called and `currentCount` is 0 or negative. We had seem some negative flow counts after host restart that were unexpected, so we want to be aware of odd behavior ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.collect.ImmutableMap; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.exception.QuotaExceededException; +import org.apache.gobblin.service.RequesterService; +import org.apache.gobblin.service.ServiceRequester; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.util.ConfigUtils; + + +/** + * An abstract implementation of {@link UserQuotaManager} that + */ +@Slf4j +abstract public class AbstractUserQuotaManager implements UserQuotaManager { + public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perUserQuota"; + public static final String PER_FLOWGROUP_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota"; + public static final String USER_JOB_QUOTA_KEY = DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota"; + public static final String QUOTA_SEPERATOR = ":"; + public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE; + private final Map<String, Integer> perUserQuota; + private final Map<String, Integer> perFlowGroupQuota; + Set<String> runningDagIds = ConcurrentHashMap.newKeySet(); + private final int defaultQuota; + + public AbstractUserQuotaManager(Config config) { + this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA); + ImmutableMap.Builder<String, Integer> userMapBuilder = ImmutableMap.builder(); + ImmutableMap.Builder<String, Integer> flowGroupMapBuilder = ImmutableMap.builder(); + // Quotas will take form of user:<Quota> and flowGroup:<Quota> + for (String flowGroupQuota : ConfigUtils.getStringList(config, PER_FLOWGROUP_QUOTA)) { + flowGroupMapBuilder.put(flowGroupQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(flowGroupQuota.split(QUOTA_SEPERATOR)[1])); + } + // Keep quotas per user as well in form user:<Quota> which apply for all flowgroups + for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) { + userMapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1])); + } + this.perUserQuota = userMapBuilder.build(); + this.perFlowGroupQuota = flowGroupMapBuilder.build(); + } + + abstract int incrementJobCount(String key, CountType countType) throws IOException; + + abstract void decrementJobCount(String user, CountType countType) throws IOException; + + public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean onInit) throws IOException { + // Dag is already being tracked, no need to double increment for retries and multihop flows + if (isDagCurrentlyRunning(dagNode)) { + return; + } + String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null); + String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), + ConfigurationKeys.FLOW_GROUP_KEY, ""); + String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode); + boolean proxyUserCheck = true; + Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for which quota is increased + StringBuilder requesterMessage = new StringBuilder(); + runningDagIds.add(DagManagerUtils.generateDagId(dagNode)); + if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) { + int proxyQuotaIncrement = incrementJobCountAndCheckQuota( + DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), getQuotaForUser(proxyUser), CountType.USER_COUNT); + proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check succeeds + if (!proxyUserCheck) { + // add 1 to proxyUserIncrement since count starts at 0, and is negative if quota is exceeded + requesterMessage.append(String.format( + "Quota exceeded for proxy user %s on executor %s : quota=%s, requests above quota=%d%n", + proxyUser, specExecutorUri, getQuotaForUser(proxyUser), Math.abs(proxyQuotaIncrement)+1-getQuotaForUser(proxyUser))); + } + } + + String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode); + boolean requesterCheck = true; + + if (serializedRequesters != null && dagNode.getValue().getCurrentAttempts() <= 1) { + List<String> uniqueRequesters = DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters); + for (String requester : uniqueRequesters) { + int userQuotaIncrement = incrementJobCountAndCheckQuota( + DagManagerUtils.getUserQuotaKey(requester, dagNode), getQuotaForUser(requester), CountType.REQUESTER_COUNT); + boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota check succeeds + usersQuotaIncrement.add(requester); + requesterCheck = requesterCheck && thisRequesterCheck; + if (!thisRequesterCheck) { + requesterMessage.append(String.format( + "Quota exceeded for requester %s on executor %s : quota=%s, requests above quota=%d%n", Review Comment: for requester check, should we also not add 1? ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java: ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; + +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase; +import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory; + +public class MysqlUserQuotaManagerTest { + private static final String USER = "testUser"; + private static final String PASSWORD = "testPassword"; + private static final String TABLE = "quotas"; + private static final String TEST_NAME = "abora"; Review Comment: rename this variable to make it more clear if this is the flow's name or user/requester/flowGroup name -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
