ZihanLi58 commented on code in PR #3550:
URL: https://github.com/apache/gobblin/pull/3550#discussion_r984836918
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java:
##########
@@ -47,55 +50,219 @@
@Slf4j
@Singleton
public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
- private final MysqlQuotaStore mysqlStore;
+ public final MysqlQuotaStore quotaStore;
+ public final RunningDagIdsStore runningDagIds;
+
@Inject
public MysqlUserQuotaManager(Config config) throws IOException {
super(config);
- this.mysqlStore = createQuotaStore(config);
+ this.quotaStore = createQuotaStore(config);
+ this.runningDagIds = createRunningDagStore(config);
+ }
+
+ void addDagId(Connection connection, String dagId) throws IOException {
+ this.runningDagIds.add(connection, dagId);
+ }
+
+ @Override
+ boolean containsDagId(String dagId) throws IOException {
+ return this.runningDagIds.contains(dagId);
+ }
+
+ boolean removeDagId(Connection connection, String dagId) throws IOException {
+ return this.runningDagIds.remove(connection, dagId);
}
// This implementation does not need to update quota usage when the service
restarts or it's leadership status changes
public void init(Collection<Dag<JobExecutionPlan>> dags) {
}
@Override
- int incrementJobCount(String user, CountType countType) throws IOException {
- try {
- return this.mysqlStore.increaseCount(user, countType);
+ public void checkQuota(Collection<Dag.DagNode<JobExecutionPlan>> dagNodes)
throws IOException {
+ try (Connection connection = this.quotaStore.dataSource.getConnection()) {
+ connection.setAutoCommit(false);
+
+ for (Dag.DagNode<JobExecutionPlan> dagNode : dagNodes) {
+ QuotaCheck quotaCheck = increaseAndCheckQuota(connection, dagNode);
+ if ((!quotaCheck.proxyUserCheck || !quotaCheck.requesterCheck ||
!quotaCheck.flowGroupCheck)) {
+ connection.rollback();
+ throw new QuotaExceededException(quotaCheck.requesterMessage);
+ }
+ }
+ connection.commit();
} catch (SQLException e) {
throw new IOException(e);
}
}
- @Override
- void decrementJobCount(String user, CountType countType) throws IOException {
+ int incrementJobCount(Connection connection, String user, CountType
countType) throws IOException, SQLException {
+ return this.quotaStore.increaseCount(connection, user, countType);
+ }
+
+ void decrementJobCount(Connection connection,String user, CountType
countType) throws IOException, SQLException {
+ this.quotaStore.decreaseCount(connection, user, countType);
+ }
+
+ protected QuotaCheck increaseAndCheckQuota(Connection connection,
Dag.DagNode<JobExecutionPlan> dagNode)
+ throws SQLException, IOException {
+ QuotaCheck quotaCheck = new QuotaCheck(true, true, true, "");
+ StringBuilder requesterMessage = new StringBuilder();
+
+ // Dag is already being tracked, no need to double increment for retries
and multihop flows
+ if (containsDagId(DagManagerUtils.generateDagId(dagNode).toString())) {
+ return quotaCheck;
+ } else {
+ addDagId(connection, DagManagerUtils.generateDagId(dagNode).toString());
+ }
+
+ String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
+ String flowGroup =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+ ConfigurationKeys.FLOW_GROUP_KEY, "");
+ String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+
+ boolean proxyUserCheck;
+
+ if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
+ int proxyQuotaIncrement = incrementJobCountAndCheckQuota(connection,
+ DagManagerUtils.getUserQuotaKey(proxyUser, dagNode),
getQuotaForUser(proxyUser), CountType.USER_COUNT);
+ proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check
succeeds
+ quotaCheck.setProxyUserCheck(proxyUserCheck);
+ if (!proxyUserCheck) {
+ // add 1 to proxyUserIncrement since proxyQuotaIncrement is the count
before the increment
+ requesterMessage.append(String.format(
+ "Quota exceeded for proxy user %s on executor %s : quota=%s,
requests above quota=%d%n",
+ proxyUser, specExecutorUri, getQuotaForUser(proxyUser),
Math.abs(proxyQuotaIncrement) + 1 - getQuotaForUser(proxyUser)));
+ }
+ }
+
+ String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
+ boolean requesterCheck = true;
+
+ if (dagNode.getValue().getCurrentAttempts() <= 1) {
+ List<String> uniqueRequesters =
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
+ for (String requester : uniqueRequesters) {
+ int userQuotaIncrement = incrementJobCountAndCheckQuota(connection,
DagManagerUtils.getUserQuotaKey(requester, dagNode),
+ getQuotaForUser(requester), CountType.REQUESTER_COUNT);
+ boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota
check succeeds
+ requesterCheck = requesterCheck && thisRequesterCheck;
+ quotaCheck.setRequesterCheck(requesterCheck);
+ if (!thisRequesterCheck) {
+ requesterMessage.append(String.format("Quota exceeded for requester
%s on executor %s : quota=%s, requests above quota=%d%n. ",
+ requester, specExecutorUri, getQuotaForUser(requester),
Math.abs(userQuotaIncrement) + 1 - getQuotaForUser(requester)));
+ }
+ }
+ }
+
+ boolean flowGroupCheck;
+
+ if (dagNode.getValue().getCurrentAttempts() <= 1) {
+ int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(connection,
+ DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode),
getQuotaForFlowGroup(flowGroup), CountType.FLOWGROUP_COUNT);
+ flowGroupCheck = flowGroupQuotaIncrement >= 0;
+ quotaCheck.setFlowGroupCheck(flowGroupCheck);
+ if (!flowGroupCheck) {
+ requesterMessage.append(String.format("Quota exceeded for flowgroup %s
on executor %s : quota=%s, requests above quota=%d%n",
+ flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
+ Math.abs(flowGroupQuotaIncrement) + 1 -
getQuotaForFlowGroup(flowGroup)));
+ }
+ }
+
+ quotaCheck.setRequesterMessage(requesterMessage.toString());
+
+ return quotaCheck;
+ }
+
+ protected int incrementJobCountAndCheckQuota(Connection connection, String
key, int keyQuota, CountType countType)
+ throws IOException, SQLException {
+ int currentCount = incrementJobCount(connection, key, countType);
+ if (currentCount >= keyQuota) {
+ return -currentCount;
+ } else {
+ return currentCount;
+ }
+ }
+
+ /**
+ * Decrement the quota by one for the proxy user and requesters
corresponding to the provided {@link Dag.DagNode}.
+ * Returns true if the dag existed in the set of running dags and was
removed successfully
+ */
+ public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws
IOException {
+ Connection connection;
try {
- this.mysqlStore.decreaseCount(user, countType);
+ connection = this.quotaStore.dataSource.getConnection();
+ connection.setAutoCommit(false);
} catch (SQLException e) {
throw new IOException(e);
}
+
+ try {
+ boolean val = removeDagId(connection,
DagManagerUtils.generateDagId(dagNode).toString());
+ if (!val) {
+ return false;
+ }
+
+ String proxyUser =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
AzkabanProjectConfig.USER_TO_PROXY, null);
+ if (proxyUser != null) {
+ String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser,
dagNode);
+ decrementJobCount(connection, proxyUserKey, CountType.USER_COUNT);
+ }
+
+ String flowGroup =
+ ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
ConfigurationKeys.FLOW_GROUP_KEY, "");
+ decrementJobCount(connection,
DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode),
CountType.FLOWGROUP_COUNT);
+
+ String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
+ try {
+ for (String requester :
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters)) {
+ String requesterKey = DagManagerUtils.getUserQuotaKey(requester,
dagNode);
+ decrementJobCount(connection, requesterKey,
CountType.REQUESTER_COUNT);
+ }
+ } catch (IOException e) {
+ log.error("Failed to release quota for requester list " +
serializedRequesters, e);
+ return false;
+ }
+ } catch (SQLException ex) {
+ throw new IOException(ex);
+ } finally {
+ try {
+ connection.close();
Review Comment:
Where did you commit the connection?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]