umustafi commented on code in PR #3715:
URL: https://github.com/apache/gobblin/pull/3715#discussion_r1261809318
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -137,79 +146,98 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws
IOException {
this.linger = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
+ String createArbiterStatement = String.format(
+ CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName);
try (Connection connection = dataSource.getConnection();
- PreparedStatement createStatement =
connection.prepareStatement(String.format(
- CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName))) {
+ PreparedStatement createStatement =
connection.prepareStatement(createArbiterStatement)) {
createStatement.executeUpdate();
connection.commit();
} catch (SQLException e) {
throw new IOException("Table creation failure for " +
leaseArbiterTableName, e);
}
- withPreparedStatement(String.format(CREATE_CONSTANTS_TABLE_STATEMENT,
this.constantsTableName, this.constantsTableName),
- createStatement -> {
- int i = 0;
- createStatement.setInt(++i, epsilon);
- createStatement.setInt(++i, linger);
- return createStatement.executeUpdate();}, true);
+ String createConstantsStatement =
String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
+ withPreparedStatement(createConstantsStatement, createStatement ->
createStatement.executeUpdate(), true);
+
+ int count = withPreparedStatement(String.format(GET_ROW_COUNT_STATEMENT,
this.constantsTableName), getStatement -> {
+ ResultSet resultSet = getStatement.executeQuery();
+ if (resultSet.next()) {
+ return resultSet.getInt(1);
+ }
+ return -1;
+ }, true);
+
+ // Only insert epsilon and linger values from config if this table does
not contain pre-existing values.
+ if (count == 0) {
+ String insertConstantsStatement =
String.format(INSERT_IN_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
+ withPreparedStatement(insertConstantsStatement, insertStatement -> {
+ int i = 0;
+ insertStatement.setInt(++i, epsilon);
+ insertStatement.setInt(++i, linger);
+ return insertStatement.executeUpdate();
+ }, true);
+ }
+
+ log.info("MysqlMultiActiveLeaseArbiter initialized");
}
@Override
public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
// Check table for an existing entry for this flow action and event time
- ResultSet resultSet = withPreparedStatement(
+ GetEventInfoResult getResult = withPreparedStatement(
String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName,
this.constantsTableName),
getInfoStatement -> {
int i = 0;
- getInfoStatement.setTimestamp(++i, new Timestamp(eventTimeMillis));
getInfoStatement.setString(++i, flowAction.getFlowGroup());
getInfoStatement.setString(++i, flowAction.getFlowName());
getInfoStatement.setString(++i, flowAction.getFlowExecutionId());
getInfoStatement.setString(++i,
flowAction.getFlowActionType().toString());
- return getInfoStatement.executeQuery();
+ ResultSet resultSet = getInfoStatement.executeQuery();
+ if (!resultSet.next()) {
+ return null;
+ }
+ return new GetEventInfoResult(resultSet);
}, true);
- String formattedSelectAfterInsertStatement =
- String.format(SELECT_AFTER_INSERT_STATEMENT,
this.leaseArbiterTableName, this.constantsTableName);
try {
// CASE 1: If no existing row for this flow action, then go ahead and
insert
- if (!resultSet.next()) {
+ if (getResult == null) {
+ log.debug("CASE 1: no existing row for this flow action, then go ahead
and insert");
Review Comment:
Let me move these all to debug level logs instead of comments. I am
numbering these cases here explicitly actually and referencing these numbers in
the unit tests. They are not defined elsewhere.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]