zhoujinsong commented on code in PR #3079:
URL: https://github.com/apache/amoro/pull/3079#discussion_r1706856129
##########
amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java:
##########
@@ -251,32 +255,94 @@ public Blocker block(
TableIdentifier tableIdentifier,
List<BlockableOperation> operations,
Map<String, String> properties) {
- checkStarted();
- return getAndCheckExist(getOrSyncServerTableIdentifier(tableIdentifier))
- .block(operations, properties, blockerTimeout)
- .buildBlocker();
+ Preconditions.checkNotNull(operations, "operations should not be null");
+ Preconditions.checkArgument(!operations.isEmpty(), "operations should not
be empty");
+ Preconditions.checkArgument(blockerTimeout > 0, "blocker timeout must >
0");
+ String catalog = tableIdentifier.getCatalog();
+ String database = tableIdentifier.getDatabase();
+ String table = tableIdentifier.getTableName();
+ int tryCount = 0;
+ while (tryCount++ < 3) {
Review Comment:
How about declaring a static variable for the max retry number?
##########
amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java:
##########
@@ -251,32 +255,94 @@ public Blocker block(
TableIdentifier tableIdentifier,
List<BlockableOperation> operations,
Map<String, String> properties) {
- checkStarted();
- return getAndCheckExist(getOrSyncServerTableIdentifier(tableIdentifier))
- .block(operations, properties, blockerTimeout)
- .buildBlocker();
+ Preconditions.checkNotNull(operations, "operations should not be null");
+ Preconditions.checkArgument(!operations.isEmpty(), "operations should not
be empty");
+ Preconditions.checkArgument(blockerTimeout > 0, "blocker timeout must >
0");
+ String catalog = tableIdentifier.getCatalog();
+ String database = tableIdentifier.getDatabase();
+ String table = tableIdentifier.getTableName();
+ int tryCount = 0;
+ while (tryCount++ < 3) {
+ long now = System.currentTimeMillis();
+ doAs(
+ TableBlockerMapper.class,
+ mapper -> mapper.deleteExpiredBlockers(catalog, database, table,
now));
+ List<TableBlocker> tableBlockers =
+ getAs(
+ TableBlockerMapper.class,
+ mapper ->
+ mapper.selectBlockers(
+ tableIdentifier.getCatalog(),
+ tableIdentifier.getDatabase(),
+ tableIdentifier.getTableName(),
+ now));
+ if (TableBlocker.conflict(operations, tableBlockers)) {
+ throw new BlockerConflictException(operations + " is conflict with " +
tableBlockers);
+ }
+ Optional<Long> maxBlockerOpt =
+ tableBlockers.stream()
+ .map(TableBlocker::getBlockerId)
+ .max(Comparator.comparingLong(l -> l));
+ long prevBlockerId = maxBlockerOpt.orElse(-1L);
+
+ TableBlocker tableBlocker =
+ TableBlocker.buildTableBlocker(
+ tableIdentifier, operations, properties, now, blockerTimeout,
prevBlockerId);
+ try {
+ doAs(TableBlockerMapper.class, mapper -> mapper.insert(tableBlocker));
+ if (tableBlocker.getBlockerId() > 0) {
+ return tableBlocker.buildBlocker();
+ }
+ } catch (PersistenceException e) {
+ LOG.warn(
+ "Exception when create a blocker:{}, error message:{}",
tableBlocker, e.getMessage());
+ if (e.getMessage() == null || !e.getMessage().contains("duplicate
key")) {
+ LOG.error("Exception when create a blocker:{}", tableBlocker, e);
Review Comment:
BTW, the error message seems to have some grammar errors.
"An exception occurs when creating a blocker" may be proper.
##########
amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java:
##########
@@ -251,32 +255,94 @@ public Blocker block(
TableIdentifier tableIdentifier,
List<BlockableOperation> operations,
Map<String, String> properties) {
- checkStarted();
- return getAndCheckExist(getOrSyncServerTableIdentifier(tableIdentifier))
- .block(operations, properties, blockerTimeout)
- .buildBlocker();
+ Preconditions.checkNotNull(operations, "operations should not be null");
+ Preconditions.checkArgument(!operations.isEmpty(), "operations should not
be empty");
+ Preconditions.checkArgument(blockerTimeout > 0, "blocker timeout must >
0");
+ String catalog = tableIdentifier.getCatalog();
+ String database = tableIdentifier.getDatabase();
+ String table = tableIdentifier.getTableName();
+ int tryCount = 0;
+ while (tryCount++ < 3) {
+ long now = System.currentTimeMillis();
+ doAs(
+ TableBlockerMapper.class,
+ mapper -> mapper.deleteExpiredBlockers(catalog, database, table,
now));
+ List<TableBlocker> tableBlockers =
+ getAs(
+ TableBlockerMapper.class,
+ mapper ->
+ mapper.selectBlockers(
+ tableIdentifier.getCatalog(),
+ tableIdentifier.getDatabase(),
+ tableIdentifier.getTableName(),
+ now));
+ if (TableBlocker.conflict(operations, tableBlockers)) {
+ throw new BlockerConflictException(operations + " is conflict with " +
tableBlockers);
+ }
+ Optional<Long> maxBlockerOpt =
+ tableBlockers.stream()
+ .map(TableBlocker::getBlockerId)
+ .max(Comparator.comparingLong(l -> l));
+ long prevBlockerId = maxBlockerOpt.orElse(-1L);
+
+ TableBlocker tableBlocker =
+ TableBlocker.buildTableBlocker(
+ tableIdentifier, operations, properties, now, blockerTimeout,
prevBlockerId);
+ try {
+ doAs(TableBlockerMapper.class, mapper -> mapper.insert(tableBlocker));
+ if (tableBlocker.getBlockerId() > 0) {
+ return tableBlocker.buildBlocker();
+ }
+ } catch (PersistenceException e) {
+ LOG.warn(
+ "Exception when create a blocker:{}, error message:{}",
tableBlocker, e.getMessage());
+ if (e.getMessage() == null || !e.getMessage().contains("duplicate
key")) {
+ LOG.error("Exception when create a blocker:{}", tableBlocker, e);
Review Comment:
Should we throw this exception rather than retrying when the error is not
the duplicated key one?
##########
amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java:
##########
@@ -251,32 +255,94 @@ public Blocker block(
TableIdentifier tableIdentifier,
List<BlockableOperation> operations,
Map<String, String> properties) {
- checkStarted();
- return getAndCheckExist(getOrSyncServerTableIdentifier(tableIdentifier))
- .block(operations, properties, blockerTimeout)
- .buildBlocker();
+ Preconditions.checkNotNull(operations, "operations should not be null");
+ Preconditions.checkArgument(!operations.isEmpty(), "operations should not
be empty");
+ Preconditions.checkArgument(blockerTimeout > 0, "blocker timeout must >
0");
+ String catalog = tableIdentifier.getCatalog();
+ String database = tableIdentifier.getDatabase();
+ String table = tableIdentifier.getTableName();
+ int tryCount = 0;
+ while (tryCount++ < 3) {
+ long now = System.currentTimeMillis();
+ doAs(
+ TableBlockerMapper.class,
+ mapper -> mapper.deleteExpiredBlockers(catalog, database, table,
now));
+ List<TableBlocker> tableBlockers =
+ getAs(
+ TableBlockerMapper.class,
+ mapper ->
+ mapper.selectBlockers(
+ tableIdentifier.getCatalog(),
+ tableIdentifier.getDatabase(),
+ tableIdentifier.getTableName(),
+ now));
+ if (TableBlocker.conflict(operations, tableBlockers)) {
+ throw new BlockerConflictException(operations + " is conflict with " +
tableBlockers);
+ }
+ Optional<Long> maxBlockerOpt =
+ tableBlockers.stream()
+ .map(TableBlocker::getBlockerId)
+ .max(Comparator.comparingLong(l -> l));
+ long prevBlockerId = maxBlockerOpt.orElse(-1L);
+
+ TableBlocker tableBlocker =
+ TableBlocker.buildTableBlocker(
+ tableIdentifier, operations, properties, now, blockerTimeout,
prevBlockerId);
+ try {
+ doAs(TableBlockerMapper.class, mapper -> mapper.insert(tableBlocker));
+ if (tableBlocker.getBlockerId() > 0) {
+ return tableBlocker.buildBlocker();
+ }
+ } catch (PersistenceException e) {
+ LOG.warn(
+ "Exception when create a blocker:{}, error message:{}",
tableBlocker, e.getMessage());
+ if (e.getMessage() == null || !e.getMessage().contains("duplicate
key")) {
+ LOG.error("Exception when create a blocker:{}", tableBlocker, e);
+ }
+ }
+ }
+ throw new BlockerConflictException("Failed to create a blocker: conflict
meet max retry");
}
@Override
public void releaseBlocker(TableIdentifier tableIdentifier, String
blockerId) {
- checkStarted();
- TableRuntime tableRuntime =
getRuntime(getServerTableIdentifier(tableIdentifier));
- if (tableRuntime != null) {
- tableRuntime.release(blockerId);
- }
+ doAs(TableBlockerMapper.class, mapper ->
mapper.deleteBlocker(Long.parseLong(blockerId)));
}
@Override
public long renewBlocker(TableIdentifier tableIdentifier, String blockerId) {
- checkStarted();
- TableRuntime tableRuntime =
getAndCheckExist(getServerTableIdentifier(tableIdentifier));
- return tableRuntime.renew(blockerId, blockerTimeout);
+ int retry = 0;
+ while (retry++ < 3) {
Review Comment:
The same issue about using a literal value.
##########
amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java:
##########
@@ -251,32 +255,94 @@ public Blocker block(
TableIdentifier tableIdentifier,
List<BlockableOperation> operations,
Map<String, String> properties) {
- checkStarted();
- return getAndCheckExist(getOrSyncServerTableIdentifier(tableIdentifier))
- .block(operations, properties, blockerTimeout)
- .buildBlocker();
+ Preconditions.checkNotNull(operations, "operations should not be null");
+ Preconditions.checkArgument(!operations.isEmpty(), "operations should not
be empty");
+ Preconditions.checkArgument(blockerTimeout > 0, "blocker timeout must >
0");
+ String catalog = tableIdentifier.getCatalog();
+ String database = tableIdentifier.getDatabase();
+ String table = tableIdentifier.getTableName();
+ int tryCount = 0;
+ while (tryCount++ < 3) {
+ long now = System.currentTimeMillis();
+ doAs(
+ TableBlockerMapper.class,
+ mapper -> mapper.deleteExpiredBlockers(catalog, database, table,
now));
+ List<TableBlocker> tableBlockers =
+ getAs(
+ TableBlockerMapper.class,
+ mapper ->
+ mapper.selectBlockers(
+ tableIdentifier.getCatalog(),
+ tableIdentifier.getDatabase(),
+ tableIdentifier.getTableName(),
+ now));
+ if (TableBlocker.conflict(operations, tableBlockers)) {
+ throw new BlockerConflictException(operations + " is conflict with " +
tableBlockers);
+ }
+ Optional<Long> maxBlockerOpt =
+ tableBlockers.stream()
+ .map(TableBlocker::getBlockerId)
+ .max(Comparator.comparingLong(l -> l));
+ long prevBlockerId = maxBlockerOpt.orElse(-1L);
+
+ TableBlocker tableBlocker =
+ TableBlocker.buildTableBlocker(
+ tableIdentifier, operations, properties, now, blockerTimeout,
prevBlockerId);
+ try {
+ doAs(TableBlockerMapper.class, mapper -> mapper.insert(tableBlocker));
+ if (tableBlocker.getBlockerId() > 0) {
+ return tableBlocker.buildBlocker();
+ }
+ } catch (PersistenceException e) {
+ LOG.warn(
Review Comment:
Do print the error stack rather than the message only.
##########
amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java:
##########
@@ -251,32 +255,94 @@ public Blocker block(
TableIdentifier tableIdentifier,
List<BlockableOperation> operations,
Map<String, String> properties) {
- checkStarted();
- return getAndCheckExist(getOrSyncServerTableIdentifier(tableIdentifier))
- .block(operations, properties, blockerTimeout)
- .buildBlocker();
+ Preconditions.checkNotNull(operations, "operations should not be null");
+ Preconditions.checkArgument(!operations.isEmpty(), "operations should not
be empty");
+ Preconditions.checkArgument(blockerTimeout > 0, "blocker timeout must >
0");
+ String catalog = tableIdentifier.getCatalog();
+ String database = tableIdentifier.getDatabase();
+ String table = tableIdentifier.getTableName();
+ int tryCount = 0;
+ while (tryCount++ < 3) {
+ long now = System.currentTimeMillis();
+ doAs(
+ TableBlockerMapper.class,
+ mapper -> mapper.deleteExpiredBlockers(catalog, database, table,
now));
Review Comment:
I am not sure if it is necessary to delete expired blockers every time
blocking is called.
It may increase the cost time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]