umustafi commented on code in PR #3550:
URL: https://github.com/apache/gobblin/pull/3550#discussion_r981844268
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java:
##########
@@ -256,4 +283,70 @@ public void decreaseCount(String name, CountType
countType) throws IOException,
}
}
}
+
+ static class RunningDagIdsStore {
+ protected final DataSource dataSource;
+ final String tableName;
+ private final String CONTAINS_DAG_ID;
+ private final String ADD_DAG_ID;
+ private final String REMOVE_DAG_ID;
+
+ public RunningDagIdsStore(BasicDataSource dataSource, String tableName)
+ throws IOException {
+ this.dataSource = dataSource;
+ this.tableName = tableName;
+
+ CONTAINS_DAG_ID = "SELECT EXISTS(SELECT * FROM " + tableName + " WHERE
dagId = ?)" ;
+ ADD_DAG_ID = "INSERT INTO " + tableName + " (dagId) VALUES (?) ";
+ REMOVE_DAG_ID = "DELETE FROM " + tableName + " WHERE dagId = ?";
+
+ String createQuotaTable = "CREATE TABLE IF NOT EXISTS " + tableName + "
(dagId VARCHAR(500) CHARACTER SET latin1 NOT NULL, "
+ + "PRIMARY KEY (dagId), UNIQUE INDEX ind (dagId))";
+ try (Connection connection = dataSource.getConnection();
PreparedStatement createStatement =
connection.prepareStatement(createQuotaTable)) {
+ createStatement.executeUpdate();
+ } catch (SQLException e) {
+ throw new IOException("Failure creation table " + tableName, e);
Review Comment:
can you also log if the validation query is set in case connection is
refused?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java:
##########
@@ -181,6 +190,18 @@ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
// always try to compile the flow to verify if it is compilable
Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);
+
+ if (this.warmStandbyEnabled &&
+
(!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)
|| PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(),
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
+ try {
+ // todo : we should probably check quota for all of the start nodes
+ userQuotaManager.checkQuota(dag.getNodes().get(0));
Review Comment:
we should iterate over and check all startNodes like we discussed right.
Only if we have capacity for all start nodes, we add this dag.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]