[
https://issues.apache.org/jira/browse/GOBBLIN-1703?focusedWorklogId=813532&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-813532
]
ASF GitHub Bot logged work on GOBBLIN-1703:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 30/Sep/22 00:39
Start Date: 30/Sep/22 00:39
Worklog Time Spent: 10m
Work Description: ZihanLi58 commented on code in PR #3550:
URL: https://github.com/apache/gobblin/pull/3550#discussion_r984119194
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java:
##########
@@ -47,55 +50,261 @@
@Slf4j
@Singleton
public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
- private final MysqlQuotaStore mysqlStore;
+ public final MysqlQuotaStore quotaStore;
+ public final RunningDagIdsStore runningDagIds;
+
@Inject
public MysqlUserQuotaManager(Config config) throws IOException {
super(config);
- this.mysqlStore = createQuotaStore(config);
+ this.quotaStore = createQuotaStore(config);
+ this.runningDagIds = createRunningDagStore(config);
+ }
+
+ void addDagId(Connection connection, String dagId) throws IOException {
+ this.runningDagIds.add(connection, dagId);
+ }
+
+ @Override
+ boolean containsDagId(String dagId) throws IOException {
+ return this.runningDagIds.contains(dagId);
+ }
+
+ boolean removeDagId(Connection connection, String dagId) throws IOException {
+ return this.runningDagIds.remove(connection, dagId);
}
// This implementation does not need to update quota usage when the service
restarts or it's leadership status changes
public void init(Collection<Dag<JobExecutionPlan>> dags) {
}
@Override
- int incrementJobCount(String user, CountType countType) throws IOException {
- try {
- return this.mysqlStore.increaseCount(user, countType);
+ public void checkQuota(Collection<Dag.DagNode<JobExecutionPlan>> dagNodes)
throws IOException {
+ try (Connection connection = this.quotaStore.dataSource.getConnection()) {
+ connection.setAutoCommit(false);
+
+ for (Dag.DagNode<JobExecutionPlan> dagNode : dagNodes) {
+ QuotaCheck quotaCheck = increaseAndCheckQuota(connection, dagNode);
+ if ((!quotaCheck.proxyUserCheck || !quotaCheck.requesterCheck ||
!quotaCheck.flowGroupCheck)) {
+ connection.rollback();
+ throw new QuotaExceededException(quotaCheck.requesterMessage);
+ }
+ }
+ connection.commit();
} catch (SQLException e) {
throw new IOException(e);
}
}
+ int incrementJobCount(Connection connection, String user, CountType
countType) throws IOException, SQLException {
+ return this.quotaStore.increaseCount(connection, user, countType);
+ }
+
+ void decrementJobCount(Connection connection,String user, CountType
countType) throws IOException, SQLException {
+ this.quotaStore.decreaseCount(connection, user, countType);
+ }
+
+ protected QuotaCheck increaseAndCheckQuota(Connection connection,
Dag.DagNode<JobExecutionPlan> dagNode)
+ throws SQLException, IOException {
+ QuotaCheck quotaCheck = new QuotaCheck(true, true, true, "");
+ StringBuilder requesterMessage = new StringBuilder();
+
+ // Dag is already being tracked, no need to double increment for retries
and multihop flows
+ if (containsDagId(DagManagerUtils.generateDagId(dagNode).toString())) {
+ return quotaCheck;
+ } else {
+ addDagId(connection, DagManagerUtils.generateDagId(dagNode).toString());
+ }
+
+ String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
+ String flowGroup =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+ ConfigurationKeys.FLOW_GROUP_KEY, "");
+ String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+
+ boolean proxyUserCheck;
+
+ if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
+ int proxyQuotaIncrement = incrementJobCountAndCheckQuota(connection,
+ DagManagerUtils.getUserQuotaKey(proxyUser, dagNode),
getQuotaForUser(proxyUser), CountType.USER_COUNT);
+ proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check
succeeds
+ quotaCheck.setProxyUserCheck(proxyUserCheck);
+ if (!proxyUserCheck) {
+ // add 1 to proxyUserIncrement since proxyQuotaIncrement is the count
before the increment
+ requesterMessage.append(String.format(
+ "Quota exceeded for proxy user %s on executor %s : quota=%s,
requests above quota=%d%n",
+ proxyUser, specExecutorUri, getQuotaForUser(proxyUser),
Math.abs(proxyQuotaIncrement) + 1 - getQuotaForUser(proxyUser)));
+ }
+ }
+
+ String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
+ boolean requesterCheck = true;
+
+ if (dagNode.getValue().getCurrentAttempts() <= 1) {
+ List<String> uniqueRequesters =
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
+ for (String requester : uniqueRequesters) {
+ int userQuotaIncrement = incrementJobCountAndCheckQuota(connection,
DagManagerUtils.getUserQuotaKey(requester, dagNode),
+ getQuotaForUser(requester), CountType.REQUESTER_COUNT);
+ boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota
check succeeds
+ requesterCheck = requesterCheck && thisRequesterCheck;
+ quotaCheck.setRequesterCheck(requesterCheck);
+ if (!thisRequesterCheck) {
+ requesterMessage.append(String.format("Quota exceeded for requester
%s on executor %s : quota=%s, requests above quota=%d%n. ",
+ requester, specExecutorUri, getQuotaForUser(requester),
Math.abs(userQuotaIncrement) + 1 - getQuotaForUser(requester)));
+ }
+ }
+ }
+
+ boolean flowGroupCheck;
+
+ if (dagNode.getValue().getCurrentAttempts() <= 1) {
+ int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(connection,
+ DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode),
getQuotaForFlowGroup(flowGroup), CountType.FLOWGROUP_COUNT);
+ flowGroupCheck = flowGroupQuotaIncrement >= 0;
+ quotaCheck.setFlowGroupCheck(flowGroupCheck);
+ if (!flowGroupCheck) {
+ requesterMessage.append(String.format("Quota exceeded for flowgroup %s
on executor %s : quota=%s, requests above quota=%d%n",
+ flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
+ Math.abs(flowGroupQuotaIncrement) + 1 -
getQuotaForFlowGroup(flowGroup)));
+ }
+ }
+
+ quotaCheck.setRequesterMessage(requesterMessage.toString());
+
+ return quotaCheck;
+ }
+
@Override
- void decrementJobCount(String user, CountType countType) throws IOException {
+ protected void rollbackIncrements(Dag.DagNode<JobExecutionPlan> dagNode)
throws IOException {
Review Comment:
Do we need this method at all in mysql quota manager, seems not called
anywhere?
Issue Time Tracking
-------------------
Worklog Id: (was: 813532)
Time Spent: 1h 40m (was: 1.5h)
> avoid double quota usage increment for ad hoc flows
> ---------------------------------------------------
>
> Key: GOBBLIN-1703
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1703
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 1h 40m
> Remaining Estimate: 0h
>
> When a gaas flow request comes, resource handler checks the quota right there.
> However, if the flow has runImmediately=true, the quota will be checked again
> when the first job starts. This should be avoided.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)