baiyangtx commented on code in PR #3079:
URL: https://github.com/apache/amoro/pull/3079#discussion_r1713717024


##########
amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java:
##########
@@ -251,32 +255,94 @@ public Blocker block(
       TableIdentifier tableIdentifier,
       List<BlockableOperation> operations,
       Map<String, String> properties) {
-    checkStarted();
-    return getAndCheckExist(getOrSyncServerTableIdentifier(tableIdentifier))
-        .block(operations, properties, blockerTimeout)
-        .buildBlocker();
+    Preconditions.checkNotNull(operations, "operations should not be null");
+    Preconditions.checkArgument(!operations.isEmpty(), "operations should not 
be empty");
+    Preconditions.checkArgument(blockerTimeout > 0, "blocker timeout must > 
0");
+    String catalog = tableIdentifier.getCatalog();
+    String database = tableIdentifier.getDatabase();
+    String table = tableIdentifier.getTableName();
+    int tryCount = 0;
+    while (tryCount++ < 3) {
+      long now = System.currentTimeMillis();
+      doAs(
+          TableBlockerMapper.class,
+          mapper -> mapper.deleteExpiredBlockers(catalog, database, table, 
now));
+      List<TableBlocker> tableBlockers =
+          getAs(
+              TableBlockerMapper.class,
+              mapper ->
+                  mapper.selectBlockers(
+                      tableIdentifier.getCatalog(),
+                      tableIdentifier.getDatabase(),
+                      tableIdentifier.getTableName(),
+                      now));
+      if (TableBlocker.conflict(operations, tableBlockers)) {
+        throw new BlockerConflictException(operations + " is conflict with " + 
tableBlockers);
+      }
+      Optional<Long> maxBlockerOpt =
+          tableBlockers.stream()
+              .map(TableBlocker::getBlockerId)
+              .max(Comparator.comparingLong(l -> l));
+      long prevBlockerId = maxBlockerOpt.orElse(-1L);
+
+      TableBlocker tableBlocker =
+          TableBlocker.buildTableBlocker(
+              tableIdentifier, operations, properties, now, blockerTimeout, 
prevBlockerId);
+      try {
+        doAs(TableBlockerMapper.class, mapper -> mapper.insert(tableBlocker));
+        if (tableBlocker.getBlockerId() > 0) {
+          return tableBlocker.buildBlocker();
+        }
+      } catch (PersistenceException e) {
+        LOG.warn(
+            "Exception when create a blocker:{}, error message:{}", 
tableBlocker, e.getMessage());
+        if (e.getMessage() == null || !e.getMessage().contains("duplicate 
key")) {
+          LOG.error("Exception when create a blocker:{}", tableBlocker, e);

Review Comment:
   I'm not sure if all divers will throw this format of exception. I only 
tested the case of derby.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to