This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 78708b8f8 [AMORO-3078]: A lock-free TableBlocker implementation and 
does not rely on TableRuntime anymore. (#3079)
78708b8f8 is described below

commit 78708b8f80210da7dbd6b5e1dc130333e00c286c
Author: baiyangtx <[email protected]>
AuthorDate: Tue Aug 13 17:59:04 2024 +0800

    [AMORO-3078]: A lock-free TableBlocker implementation and does not rely on 
TableRuntime anymore. (#3079)
    
    * Fix unit tests
    
    * stash
    
    * ut is passed
    
    * spotless
    
    * fix reviewers
    
    * fix conflict
    
    * fix spotless
    
    * fix spotless
---
 .../amoro/server/catalog/InternalCatalog.java      |   5 +-
 .../amoro/server/persistence/PersistentBase.java   |  14 ++
 .../persistence/mapper/TableBlockerMapper.java     |  80 ++++++-----
 .../amoro/server/table/DefaultTableService.java    |  91 +++++++++++--
 .../apache/amoro/server/table/TableRuntime.java    | 147 ++-------------------
 .../amoro/server/table/blocker/TableBlocker.java   |  83 +++++++++++-
 .../table/executor/BlockerExpiringExecutor.java    |   6 +-
 .../src/main/resources/derby/ams-derby-init.sql    |   4 +-
 .../src/main/resources/mysql/ams-mysql-init.sql    |   3 +-
 .../resources/mysql/upgrade-0.6.1-to-0.7.0.sql     |   2 +-
 .../src/main/resources/mysql/upgrade.sql           |   6 +
 .../main/resources/postgres/ams-postgres-init.sql  |   8 +-
 .../resources/postgres/upgrade-0.6.1-to-0.7.0.sql  |   4 +-
 .../src/main/resources/postgres/upgrade.sql        |   7 +
 .../server/persistence/mapper/MySQLTestBase.java   |  21 ---
 .../executor/TestBlockerExpiringExecutor.java      |  27 +++-
 16 files changed, 280 insertions(+), 228 deletions(-)

diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java
index 952e7b207..4bdfe2373 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java
@@ -165,7 +165,10 @@ public abstract class InternalCatalog extends 
ServerCatalog {
             doAs(
                 TableMetaMapper.class,
                 mapper -> mapper.deleteTableMetaById(tableIdentifier.getId())),
-        () -> doAs(TableBlockerMapper.class, mapper -> 
mapper.deleteBlockers(tableIdentifier)),
+        () ->
+            doAs(
+                TableBlockerMapper.class,
+                mapper -> mapper.deleteTableBlockers(this.name(), 
databaseName, tableName)),
         () -> dropTableInternal(databaseName, tableName),
         () ->
             doAsExisted(
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/PersistentBase.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/PersistentBase.java
index 07e688793..8c9d00389 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/PersistentBase.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/PersistentBase.java
@@ -42,6 +42,20 @@ public abstract class PersistentBase {
                 .openSession(TransactionIsolationLevel.READ_COMMITTED));
   }
 
+  public final <T> Long updateAs(Class<T> mapperClz, Function<T, Number> 
updateFunction) {
+    try (NestedSqlSession session = beginSession()) {
+      try {
+        T mapper = getMapper(session, mapperClz);
+        Number number = updateFunction.apply(mapper);
+        session.commit();
+        return number.longValue();
+      } catch (Throwable t) {
+        session.rollback();
+        throw AmoroRuntimeException.wrap(t, PersistenceException::new);
+      }
+    }
+  }
+
   protected final <T> void doAs(Class<T> mapperClz, Consumer<T> consumer) {
     try (NestedSqlSession session = beginSession()) {
       try {
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableBlockerMapper.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableBlockerMapper.java
index 2b772cfa5..57e2ebd94 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableBlockerMapper.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableBlockerMapper.java
@@ -18,7 +18,6 @@
 
 package org.apache.amoro.server.persistence.mapper;
 
-import org.apache.amoro.api.ServerTableIdentifier;
 import org.apache.amoro.server.persistence.converter.List2StringConverter;
 import org.apache.amoro.server.persistence.converter.Long2TsConverter;
 import org.apache.amoro.server.persistence.converter.Map2StringConverter;
@@ -42,14 +41,15 @@ public interface TableBlockerMapper {
           + "expiration_time,properties FROM "
           + TABLE_NAME
           + " "
-          + "WHERE catalog_name = #{tableIdentifier.catalog} AND db_name = 
#{tableIdentifier.database} "
-          + "AND table_name = #{tableIdentifier.tableName} "
+          + "WHERE catalog_name = #{catalog} "
+          + "AND db_name = #{database} "
+          + "AND table_name = #{tableName} "
           + "AND expiration_time > #{now, 
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}")
   @Results({
     @Result(property = "blockerId", column = "blocker_id"),
-    @Result(property = "tableIdentifier.catalog", column = "catalog_name"),
-    @Result(property = "tableIdentifier.database", column = "db_name"),
-    @Result(property = "tableIdentifier.tableName", column = "table_name"),
+    @Result(property = "catalog", column = "catalog_name"),
+    @Result(property = "database", column = "db_name"),
+    @Result(property = "tableName", column = "table_name"),
     @Result(
         property = "operations",
         column = "operations",
@@ -62,7 +62,10 @@ public interface TableBlockerMapper {
     @Result(property = "properties", column = "properties", typeHandler = 
Map2StringConverter.class)
   })
   List<TableBlocker> selectBlockers(
-      @Param("tableIdentifier") ServerTableIdentifier tableIdentifier, 
@Param("now") long now);
+      @Param("catalog") String catalog,
+      @Param("database") String database,
+      @Param("tableName") String tableName,
+      @Param("now") long now);
 
   @Select(
       "SELECT 
blocker_id,catalog_name,db_name,table_name,operations,create_time,"
@@ -73,9 +76,9 @@ public interface TableBlockerMapper {
           + "AND expiration_time > #{now, 
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}")
   @Results({
     @Result(property = "blockerId", column = "blocker_id"),
-    @Result(property = "tableIdentifier.catalog", column = "catalog_name"),
-    @Result(property = "tableIdentifier.database", column = "db_name"),
-    @Result(property = "tableIdentifier.tableName", column = "table_name"),
+    @Result(property = "catalog", column = "catalog_name"),
+    @Result(property = "database", column = "db_name"),
+    @Result(property = "tableName", column = "table_name"),
     @Result(
         property = "operations",
         column = "operations",
@@ -89,31 +92,34 @@ public interface TableBlockerMapper {
   })
   TableBlocker selectBlocker(@Param("blockerId") long blockerId, @Param("now") 
long now);
 
+  @Update(
+      "UPDATE "
+          + TABLE_NAME
+          + " "
+          + "SET "
+          + "expiration_time = #{expiration, 
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter} "
+          + "WHERE blocker_id = #{blockerId} "
+          + "AND expiration_time > #{now, 
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}")
+  int renewBlocker(
+      @Param("blockerId") long blockerId,
+      @Param("now") long now,
+      @Param("expiration") long expiration);
+
   @Insert(
       "INSERT INTO "
           + TABLE_NAME
-          + " (catalog_name,db_name,table_name,operations,create_time,"
-          + "expiration_time,properties) VALUES ("
-          + "#{blocker.tableIdentifier.catalog},"
-          + "#{blocker.tableIdentifier.database},"
-          + "#{blocker.tableIdentifier.tableName},"
+          + 
"(catalog_name,db_name,table_name,operations,create_time,expiration_time,prev_blocker_id,properties)
 "
+          + "VALUES ( "
+          + "#{blocker.catalog},"
+          + "#{blocker.database},"
+          + "#{blocker.tableName},"
           + 
"#{blocker.operations,typeHandler=org.apache.amoro.server.persistence.converter.List2StringConverter},"
           + 
"#{blocker.createTime,typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter},"
           + 
"#{blocker.expirationTime,typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter},"
-          + 
"#{blocker.properties,typeHandler=org.apache.amoro.server.persistence.converter.Map2StringConverter}"
-          + ")")
+          + "#{blocker.prevBlockerId},"
+          + 
"#{blocker.properties,typeHandler=org.apache.amoro.server.persistence.converter.Map2StringConverter})")
   @Options(useGeneratedKeys = true, keyProperty = "blocker.blockerId")
-  void insertBlocker(@Param("blocker") TableBlocker blocker);
-
-  @Update(
-      "UPDATE "
-          + TABLE_NAME
-          + " SET "
-          + "expiration_time = #{expirationTime, "
-          + 
"typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter} "
-          + "WHERE blocker_id = #{blockerId}")
-  void updateBlockerExpirationTime(
-      @Param("blockerId") long blockerId, @Param("expirationTime") long 
expirationTime);
+  int insert(@Param("blocker") TableBlocker blocker);
 
   @Delete("DELETE FROM " + TABLE_NAME + " " + "WHERE blocker_id = 
#{blockerId}")
   void deleteBlocker(@Param("blockerId") long blockerId);
@@ -122,17 +128,23 @@ public interface TableBlockerMapper {
       "DELETE FROM "
           + TABLE_NAME
           + " "
-          + "WHERE catalog_name = #{tableIdentifier.catalog} AND db_name = 
#{tableIdentifier.database} "
-          + "AND table_name = #{tableIdentifier.tableName} "
+          + "WHERE catalog_name = #{catalog} AND db_name = #{database} "
+          + "AND table_name = #{tableName} "
           + "AND expiration_time <= #{now, 
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}")
   int deleteExpiredBlockers(
-      @Param("tableIdentifier") ServerTableIdentifier tableIdentifier, 
@Param("now") long now);
+      @Param("catalog") String catalog,
+      @Param("database") String database,
+      @Param("tableName") String tableName,
+      @Param("now") long now);
 
   @Delete(
       "DELETE FROM "
           + TABLE_NAME
           + " "
-          + "WHERE catalog_name = #{tableIdentifier.catalog} AND db_name = 
#{tableIdentifier.database} "
-          + "AND table_name = #{tableIdentifier.tableName}")
-  int deleteBlockers(@Param("tableIdentifier") ServerTableIdentifier 
tableIdentifier);
+          + "WHERE catalog_name = #{catalog} AND db_name = #{database} "
+          + "AND table_name = #{tableName}")
+  int deleteTableBlockers(
+      @Param("catalog") String catalog,
+      @Param("database") String database,
+      @Param("tableName") String tableName);
 }
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
index b3266b9d7..9b54d18c9 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
@@ -35,12 +35,15 @@ import org.apache.amoro.server.catalog.ExternalCatalog;
 import org.apache.amoro.server.catalog.InternalCatalog;
 import org.apache.amoro.server.catalog.ServerCatalog;
 import org.apache.amoro.server.exception.AlreadyExistsException;
+import org.apache.amoro.server.exception.BlockerConflictException;
 import org.apache.amoro.server.exception.IllegalMetadataException;
 import org.apache.amoro.server.exception.ObjectNotExistsException;
+import org.apache.amoro.server.exception.PersistenceException;
 import org.apache.amoro.server.manager.MetricManager;
 import org.apache.amoro.server.optimizing.OptimizingStatus;
 import org.apache.amoro.server.persistence.StatedPersistentBase;
 import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper;
+import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
 import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
 import org.apache.amoro.server.table.blocker.TableBlocker;
 import 
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
@@ -55,6 +58,7 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -75,6 +79,7 @@ import java.util.stream.Collectors;
 public class DefaultTableService extends StatedPersistentBase implements 
TableService {
 
   public static final Logger LOG = 
LoggerFactory.getLogger(DefaultTableService.class);
+  private static final int TABLE_BLOCKER_RETRY = 3;
   private final long externalCatalogRefreshingInterval;
   private final long blockerTimeout;
   private final Map<String, InternalCatalog> internalCatalogMap = new 
ConcurrentHashMap<>();
@@ -251,32 +256,90 @@ public class DefaultTableService extends 
StatedPersistentBase implements TableSe
       TableIdentifier tableIdentifier,
       List<BlockableOperation> operations,
       Map<String, String> properties) {
-    checkStarted();
-    return getAndCheckExist(getOrSyncServerTableIdentifier(tableIdentifier))
-        .block(operations, properties, blockerTimeout)
-        .buildBlocker();
+    Preconditions.checkNotNull(operations, "operations should not be null");
+    Preconditions.checkArgument(!operations.isEmpty(), "operations should not 
be empty");
+    Preconditions.checkArgument(blockerTimeout > 0, "blocker timeout must > 
0");
+    String catalog = tableIdentifier.getCatalog();
+    String database = tableIdentifier.getDatabase();
+    String table = tableIdentifier.getTableName();
+    int tryCount = 0;
+    while (tryCount++ < TABLE_BLOCKER_RETRY) {
+      long now = System.currentTimeMillis();
+      doAs(
+          TableBlockerMapper.class,
+          mapper -> mapper.deleteExpiredBlockers(catalog, database, table, 
now));
+      List<TableBlocker> tableBlockers =
+          getAs(
+              TableBlockerMapper.class,
+              mapper ->
+                  mapper.selectBlockers(
+                      tableIdentifier.getCatalog(),
+                      tableIdentifier.getDatabase(),
+                      tableIdentifier.getTableName(),
+                      now));
+      if (TableBlocker.conflict(operations, tableBlockers)) {
+        throw new BlockerConflictException(operations + " is conflict with " + 
tableBlockers);
+      }
+      Optional<Long> maxBlockerOpt =
+          tableBlockers.stream()
+              .map(TableBlocker::getBlockerId)
+              .max(Comparator.comparingLong(l -> l));
+      long prevBlockerId = maxBlockerOpt.orElse(-1L);
+
+      TableBlocker tableBlocker =
+          TableBlocker.buildTableBlocker(
+              tableIdentifier, operations, properties, now, blockerTimeout, 
prevBlockerId);
+      try {
+        doAs(TableBlockerMapper.class, mapper -> mapper.insert(tableBlocker));
+        if (tableBlocker.getBlockerId() > 0) {
+          return tableBlocker.buildBlocker();
+        }
+      } catch (PersistenceException e) {
+        LOG.warn("An exception occurs when creating a blocker:{}", 
tableBlocker, e);
+      }
+    }
+    throw new BlockerConflictException("Failed to create a blocker: conflict 
meet max retry");
   }
 
   @Override
   public void releaseBlocker(TableIdentifier tableIdentifier, String 
blockerId) {
-    checkStarted();
-    TableRuntime tableRuntime = 
getRuntime(getServerTableIdentifier(tableIdentifier));
-    if (tableRuntime != null) {
-      tableRuntime.release(blockerId);
-    }
+    doAs(TableBlockerMapper.class, mapper -> 
mapper.deleteBlocker(Long.parseLong(blockerId)));
   }
 
   @Override
   public long renewBlocker(TableIdentifier tableIdentifier, String blockerId) {
-    checkStarted();
-    TableRuntime tableRuntime = 
getAndCheckExist(getServerTableIdentifier(tableIdentifier));
-    return tableRuntime.renew(blockerId, blockerTimeout);
+    int retry = 0;
+    while (retry++ < TABLE_BLOCKER_RETRY) {
+      long now = System.currentTimeMillis();
+      long id = Long.parseLong(blockerId);
+      TableBlocker tableBlocker =
+          getAs(TableBlockerMapper.class, mapper -> mapper.selectBlocker(id, 
now));
+      if (tableBlocker == null) {
+        throw new ObjectNotExistsException("Blocker " + blockerId + " of " + 
tableIdentifier);
+      }
+      long current = System.currentTimeMillis();
+      long expirationTime = now + blockerTimeout;
+      long effectRow =
+          updateAs(
+              TableBlockerMapper.class, mapper -> mapper.renewBlocker(id, 
current, expirationTime));
+      if (effectRow > 0) {
+        return expirationTime;
+      }
+    }
+    throw new BlockerConflictException("Failed to renew a blocker: conflict 
meet max retry");
   }
 
   @Override
   public List<Blocker> getBlockers(TableIdentifier tableIdentifier) {
-    checkStarted();
-    return 
getAndCheckExist(getOrSyncServerTableIdentifier(tableIdentifier)).getBlockers().stream()
+    return getAs(
+            TableBlockerMapper.class,
+            mapper ->
+                mapper.selectBlockers(
+                    tableIdentifier.getCatalog(),
+                    tableIdentifier.getDatabase(),
+                    tableIdentifier.getTableName(),
+                    System.currentTimeMillis()))
+        .stream()
         .map(TableBlocker::buildBlocker)
         .collect(Collectors.toList());
   }
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java
index e97143133..95c68e08d 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java
@@ -26,8 +26,6 @@ import org.apache.amoro.api.StateField;
 import org.apache.amoro.api.config.OptimizingConfig;
 import org.apache.amoro.api.config.TableConfiguration;
 import org.apache.amoro.server.AmoroServiceConstants;
-import org.apache.amoro.server.exception.BlockerConflictException;
-import org.apache.amoro.server.exception.ObjectNotExistsException;
 import org.apache.amoro.server.metrics.MetricRegistry;
 import org.apache.amoro.server.optimizing.OptimizingProcess;
 import org.apache.amoro.server.optimizing.OptimizingStatus;
@@ -43,24 +41,18 @@ import org.apache.amoro.server.utils.IcebergTableUtil;
 import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
 import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
 import org.apache.amoro.table.MixedTable;
-import org.apache.amoro.table.blocker.RenewableBlocker;
 import org.apache.iceberg.Snapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
-
 import java.math.BigDecimal;
 import java.math.RoundingMode;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
 
 public class TableRuntime extends StatedPersistentBase {
 
@@ -95,7 +87,6 @@ public class TableRuntime extends StatedPersistentBase {
   @StateField private volatile OptimizingEvaluator.PendingInput pendingInput;
   private volatile long lastPlanTime;
   private final TableOptimizingMetrics optimizingMetrics;
-  private final ReentrantLock blockerLock = new ReentrantLock();
 
   protected TableRuntime(
       ServerTableIdentifier tableIdentifier,
@@ -513,96 +504,6 @@ public class TableRuntime extends StatedPersistentBase {
         .doubleValue();
   }
 
-  /**
-   * Get all valid blockers.
-   *
-   * @return all valid blockers
-   */
-  public List<TableBlocker> getBlockers() {
-    blockerLock.lock();
-    try {
-      return getAs(
-          TableBlockerMapper.class,
-          mapper -> mapper.selectBlockers(tableIdentifier, 
System.currentTimeMillis()));
-    } finally {
-      blockerLock.unlock();
-    }
-  }
-
-  /**
-   * Block some operations for table.
-   *
-   * @param operations - operations to be blocked
-   * @param properties -
-   * @param blockerTimeout -
-   * @return TableBlocker if success
-   */
-  public TableBlocker block(
-      List<BlockableOperation> operations,
-      @Nonnull Map<String, String> properties,
-      long blockerTimeout) {
-    Preconditions.checkNotNull(operations, "operations should not be null");
-    Preconditions.checkArgument(!operations.isEmpty(), "operations should not 
be empty");
-    Preconditions.checkArgument(blockerTimeout > 0, "blocker timeout must > 
0");
-    blockerLock.lock();
-    try {
-      long now = System.currentTimeMillis();
-      List<TableBlocker> tableBlockers =
-          getAs(TableBlockerMapper.class, mapper -> 
mapper.selectBlockers(tableIdentifier, now));
-      if (conflict(operations, tableBlockers)) {
-        throw new BlockerConflictException(operations + " is conflict with " + 
tableBlockers);
-      }
-      TableBlocker tableBlocker =
-          buildTableBlocker(tableIdentifier, operations, properties, now, 
blockerTimeout);
-      doAs(TableBlockerMapper.class, mapper -> 
mapper.insertBlocker(tableBlocker));
-      return tableBlocker;
-    } finally {
-      blockerLock.unlock();
-    }
-  }
-
-  /**
-   * Renew blocker.
-   *
-   * @param blockerId - blockerId
-   * @param blockerTimeout - timeout
-   * @throws IllegalStateException if blocker not exist
-   */
-  public long renew(String blockerId, long blockerTimeout) {
-    blockerLock.lock();
-    try {
-      long now = System.currentTimeMillis();
-      TableBlocker tableBlocker =
-          getAs(
-              TableBlockerMapper.class,
-              mapper -> mapper.selectBlocker(Long.parseLong(blockerId), now));
-      if (tableBlocker == null) {
-        throw new ObjectNotExistsException("Blocker " + blockerId + " of " + 
tableIdentifier);
-      }
-      long expirationTime = now + blockerTimeout;
-      doAs(
-          TableBlockerMapper.class,
-          mapper -> 
mapper.updateBlockerExpirationTime(Long.parseLong(blockerId), expirationTime));
-      return expirationTime;
-    } finally {
-      blockerLock.unlock();
-    }
-  }
-
-  /**
-   * Release blocker, succeed when blocker not exist.
-   *
-   * @param blockerId - blockerId
-   */
-  public void release(String blockerId) {
-    blockerLock.lock();
-    try {
-      doAs(TableBlockerMapper.class, mapper -> 
mapper.deleteBlocker(Long.parseLong(blockerId)));
-    } finally {
-      blockerLock.unlock();
-    }
-  }
-
   /**
    * Check if operation are blocked now.
    *
@@ -610,43 +511,15 @@ public class TableRuntime extends StatedPersistentBase {
    * @return true if blocked
    */
   public boolean isBlocked(BlockableOperation operation) {
-    blockerLock.lock();
-    try {
-      List<TableBlocker> tableBlockers =
-          getAs(
-              TableBlockerMapper.class,
-              mapper -> mapper.selectBlockers(tableIdentifier, 
System.currentTimeMillis()));
-      return conflict(operation, tableBlockers);
-    } finally {
-      blockerLock.unlock();
-    }
-  }
-
-  private boolean conflict(
-      List<BlockableOperation> blockableOperations, List<TableBlocker> 
blockers) {
-    return blockableOperations.stream().anyMatch(operation -> 
conflict(operation, blockers));
-  }
-
-  private boolean conflict(BlockableOperation blockableOperation, 
List<TableBlocker> blockers) {
-    return blockers.stream()
-        .anyMatch(blocker -> 
blocker.getOperations().contains(blockableOperation.name()));
-  }
-
-  private TableBlocker buildTableBlocker(
-      ServerTableIdentifier tableIdentifier,
-      List<BlockableOperation> operations,
-      Map<String, String> properties,
-      long now,
-      long blockerTimeout) {
-    TableBlocker tableBlocker = new TableBlocker();
-    tableBlocker.setTableIdentifier(tableIdentifier);
-    tableBlocker.setCreateTime(now);
-    tableBlocker.setExpirationTime(now + blockerTimeout);
-    tableBlocker.setOperations(
-        
operations.stream().map(BlockableOperation::name).collect(Collectors.toList()));
-    HashMap<String, String> propertiesOfTableBlocker = new 
HashMap<>(properties);
-    propertiesOfTableBlocker.put(RenewableBlocker.BLOCKER_TIMEOUT, 
blockerTimeout + "");
-    tableBlocker.setProperties(propertiesOfTableBlocker);
-    return tableBlocker;
+    List<TableBlocker> tableBlockers =
+        getAs(
+            TableBlockerMapper.class,
+            mapper ->
+                mapper.selectBlockers(
+                    tableIdentifier.getCatalog(),
+                    tableIdentifier.getDatabase(),
+                    tableIdentifier.getTableName(),
+                    System.currentTimeMillis()));
+    return TableBlocker.conflict(operation, tableBlockers);
   }
 }
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/blocker/TableBlocker.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/blocker/TableBlocker.java
index 8cb68d0a1..000b5054a 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/blocker/TableBlocker.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/blocker/TableBlocker.java
@@ -20,31 +20,89 @@ package org.apache.amoro.server.table.blocker;
 
 import org.apache.amoro.api.BlockableOperation;
 import org.apache.amoro.api.Blocker;
-import org.apache.amoro.api.ServerTableIdentifier;
+import org.apache.amoro.api.TableIdentifier;
 import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.amoro.table.blocker.RenewableBlocker;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
 public class TableBlocker {
-  private ServerTableIdentifier tableIdentifier;
+  private String catalog;
+  private String database;
+  private String tableName;
   private long blockerId;
   private List<String> operations;
   private long createTime;
   private long expirationTime;
+  private long prevBlockerId;
   private Map<String, String> properties;
 
+  public static boolean conflict(
+      List<BlockableOperation> blockableOperations, List<TableBlocker> 
blockers) {
+    return blockableOperations.stream().anyMatch(operation -> 
conflict(operation, blockers));
+  }
+
+  public static boolean conflict(
+      BlockableOperation blockableOperation, List<TableBlocker> blockers) {
+    return blockers.stream()
+        .anyMatch(blocker -> 
blocker.getOperations().contains(blockableOperation.name()));
+  }
+
+  public static TableBlocker buildTableBlocker(
+      TableIdentifier tableIdentifier,
+      List<BlockableOperation> operations,
+      Map<String, String> properties,
+      long now,
+      long blockerTimeout,
+      long prevBlockerId) {
+    TableBlocker tableBlocker = new TableBlocker();
+    tableBlocker.setCatalog(tableIdentifier.getCatalog());
+    tableBlocker.setDatabase(tableIdentifier.getDatabase());
+    tableBlocker.setTableName(tableIdentifier.getTableName());
+    tableBlocker.setCreateTime(now);
+    tableBlocker.setExpirationTime(now + blockerTimeout);
+    tableBlocker.setOperations(
+        
operations.stream().map(BlockableOperation::name).collect(Collectors.toList()));
+    HashMap<String, String> propertiesOfTableBlocker = new 
HashMap<>(properties);
+    propertiesOfTableBlocker.put(RenewableBlocker.BLOCKER_TIMEOUT, 
blockerTimeout + "");
+    tableBlocker.setProperties(propertiesOfTableBlocker);
+    return tableBlocker;
+  }
+
   public TableBlocker() {}
 
-  public ServerTableIdentifier getTableIdentifier() {
-    return tableIdentifier;
+  public String getCatalog() {
+    return catalog;
+  }
+
+  public void setCatalog(String catalog) {
+    this.catalog = catalog;
+  }
+
+  public String getDatabase() {
+    return database;
   }
 
-  public void setTableIdentifier(ServerTableIdentifier tableIdentifier) {
-    this.tableIdentifier = tableIdentifier;
+  public void setDatabase(String database) {
+    this.database = database;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
+  public void setTableIdentifier(TableIdentifier tableIdentifier) {
+    this.catalog = tableIdentifier.getCatalog();
+    this.database = tableIdentifier.getDatabase();
+    this.tableName = tableIdentifier.getTableName();
   }
 
   public long getBlockerId() {
@@ -79,6 +137,14 @@ public class TableBlocker {
     this.expirationTime = expirationTime;
   }
 
+  public void setPrevBlockerId(long prevBlockerId) {
+    this.prevBlockerId = prevBlockerId;
+  }
+
+  public long getPrevBlockerId() {
+    return this.prevBlockerId;
+  }
+
   public Map<String, String> getProperties() {
     return properties;
   }
@@ -99,11 +165,14 @@ public class TableBlocker {
   @Override
   public String toString() {
     return MoreObjects.toStringHelper(this)
-        .add("tableIdentifier", tableIdentifier)
+        .add("catalog", catalog)
+        .add("database", database)
+        .add("tableName", tableName)
         .add("blockerId", blockerId)
         .add("operations", operations)
         .add("createTime", createTime)
         .add("expirationTime", expirationTime)
+        .add("prevBlockerId", prevBlockerId)
         .add("properties", properties)
         .toString();
   }
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BlockerExpiringExecutor.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BlockerExpiringExecutor.java
index 169819815..909d7c6a0 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BlockerExpiringExecutor.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BlockerExpiringExecutor.java
@@ -55,11 +55,13 @@ public class BlockerExpiringExecutor extends 
BaseTableExecutor {
   private static class Persistency extends PersistentBase {
 
     public void doExpiring(TableRuntime tableRuntime) {
+      String catalog = tableRuntime.getTableIdentifier().getCatalog();
+      String database = tableRuntime.getTableIdentifier().getDatabase();
+      String table = tableRuntime.getTableIdentifier().getTableName();
       doAs(
           TableBlockerMapper.class,
           mapper ->
-              mapper.deleteExpiredBlockers(
-                  tableRuntime.getTableIdentifier(), 
System.currentTimeMillis()));
+              mapper.deleteExpiredBlockers(catalog, database, table, 
System.currentTimeMillis()));
     }
   }
 }
diff --git 
a/amoro-ams/amoro-ams-server/src/main/resources/derby/ams-derby-init.sql 
b/amoro-ams/amoro-ams-server/src/main/resources/derby/ams-derby-init.sql
index e7856a57d..a7dc2ecf1 100644
--- a/amoro-ams/amoro-ams-server/src/main/resources/derby/ams-derby-init.sql
+++ b/amoro-ams/amoro-ams-server/src/main/resources/derby/ams-derby-init.sql
@@ -200,5 +200,7 @@ CREATE TABLE table_blocker (
   create_time timestamp DEFAULT NULL,
   expiration_time timestamp DEFAULT NULL,
   properties clob(64m),
-  PRIMARY KEY (blocker_id)
+  prev_blocker_id bigint NOT NULL DEFAULT -1,
+  PRIMARY KEY (blocker_id),
+  CONSTRAINT prev_uq UNIQUE (catalog_name, db_name, table_name, 
prev_blocker_id)
 );
diff --git 
a/amoro-ams/amoro-ams-server/src/main/resources/mysql/ams-mysql-init.sql 
b/amoro-ams/amoro-ams-server/src/main/resources/mysql/ams-mysql-init.sql
index 82fe7072d..45bc86092 100644
--- a/amoro-ams/amoro-ams-server/src/main/resources/mysql/ams-mysql-init.sql
+++ b/amoro-ams/amoro-ams-server/src/main/resources/mysql/ams-mysql-init.sql
@@ -219,6 +219,7 @@ CREATE TABLE `table_blocker` (
   `create_time` timestamp NULL DEFAULT NULL COMMENT 'Blocker create time',
   `expiration_time` timestamp NULL DEFAULT NULL COMMENT 'Blocker expiration 
time',
   `properties` mediumtext COMMENT 'Blocker properties',
+  `prev_blocker_id` bigint(20) NOT NULL DEFAULT -1 COMMENT 'prev blocker id 
when created',
   PRIMARY KEY (`blocker_id`),
-  KEY `table_index` (`catalog_name`,`db_name`,`table_name`)
+  UNIQUE KEY `uq_prev` (`catalog_name`,`db_name`,`table_name`, 
`prev_blocker_id`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Table blockers' 
ROW_FORMAT=DYNAMIC;
diff --git 
a/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade-0.6.1-to-0.7.0.sql
 
b/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade-0.6.1-to-0.7.0.sql
index 048919990..3fff47388 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade-0.6.1-to-0.7.0.sql
+++ 
b/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade-0.6.1-to-0.7.0.sql
@@ -17,4 +17,4 @@ ALTER TABLE table_identifier CHANGE COLUMN table_name 
table_name varchar(256) NO
 ALTER TABLE table_optimizing_process CHANGE COLUMN table_name table_name 
varchar(256) NOT NULL;
 ALTER TABLE table_metadata CHANGE COLUMN table_name table_name varchar(256) 
NOT NULL;
 ALTER TABLE table_runtime CHANGE COLUMN table_name table_name varchar(256) NOT 
NULL;
-ALTER TABLE table_blocker CHANGE COLUMN table_name table_name varchar(256) NOT 
NULL;
\ No newline at end of file
+ALTER TABLE table_blocker CHANGE COLUMN table_name table_name varchar(256) NOT 
NULL;
diff --git a/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade.sql 
b/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade.sql
index 69f409555..a457d458d 100644
--- a/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade.sql
+++ b/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade.sql
@@ -15,3 +15,9 @@
 
 -- If you have any changes to the AMS database, please record them in this 
file.
 -- We will confirm the corresponding version of these upgrade scripts when 
releasing.
+
+-- NEW SCHEMA CHANGE FOR CAS BASE BLOCKER
+TRUNCATE TABLE `table_blocker`;
+ALTER TABLE `table_blocker` DROP INDEX `table_index`;
+ALTER TABLE `table_blocker` ADD COLUMN `prev_blocker_id` bigint(20) NOT NULL 
DEFAULT -1 COMMENT 'prev blocker id when created';
+ALTER TABLE `table_blocker` ADD UNIQUE KEY `uq_prev` 
(`catalog_name`,`db_name`,`table_name`, `prev_blocker_id`);
diff --git 
a/amoro-ams/amoro-ams-server/src/main/resources/postgres/ams-postgres-init.sql 
b/amoro-ams/amoro-ams-server/src/main/resources/postgres/ams-postgres-init.sql
index dd6f0e0dd..40688a5ed 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/resources/postgres/ams-postgres-init.sql
+++ 
b/amoro-ams/amoro-ams-server/src/main/resources/postgres/ams-postgres-init.sql
@@ -352,9 +352,10 @@ CREATE TABLE table_blocker
     operations VARCHAR(128) NOT NULL,
     create_time TIMESTAMP,
     expiration_time TIMESTAMP,
-    properties TEXT
+    properties TEXT,
+    prev_blocker_id BIGSERIAL NOT NULL
 );
-CREATE INDEX blocker_index ON table_optimizing_process (catalog_name, db_name, 
table_name);
+CREATE UNIQUE INDEX uq_prev ON table_blocker (catalog_name, db_name, 
table_name, prev_blocker_id);
 
 COMMENT ON TABLE table_blocker IS 'Table blockers';
 COMMENT ON COLUMN table_blocker.blocker_id IS 'Blocker unique ID';
@@ -364,4 +365,5 @@ COMMENT ON COLUMN table_blocker.table_name IS 'Table name';
 COMMENT ON COLUMN table_blocker.operations IS 'Blocked operations';
 COMMENT ON COLUMN table_blocker.create_time IS 'Blocker create time';
 COMMENT ON COLUMN table_blocker.expiration_time IS 'Blocker expiration time';
-COMMENT ON COLUMN table_blocker.properties IS 'Blocker properties';
\ No newline at end of file
+COMMENT ON COLUMN table_blocker.properties IS 'Blocker properties';
+COMMENT ON COLUMN table_blocker.prev_blocker_id is 'prev blocker id when 
created';
diff --git 
a/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade-0.6.1-to-0.7.0.sql
 
b/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade-0.6.1-to-0.7.0.sql
index c1bb51995..84cc86db2 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade-0.6.1-to-0.7.0.sql
+++ 
b/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade-0.6.1-to-0.7.0.sql
@@ -20,4 +20,6 @@ ALTER TABLE table_identifier ALTER COLUMN table_name TYPE 
varchar(256) NOT NULL;
 ALTER TABLE table_optimizing_process ALTER COLUMN table_name TYPE varchar(256) 
NOT NULL;
 ALTER TABLE table_metadata ALTER COLUMN table_name TYPE varchar(256) NOT NULL;
 ALTER TABLE table_runtime ALTER COLUMN table_name TYPE varchar(256) NOT NULL;
-ALTER TABLE table_blocker ALTER COLUMN table_name TYPE varchar(256) NOT NULL;
\ No newline at end of file
+ALTER TABLE table_blocker ALTER COLUMN table_name TYPE varchar(256) NOT NULL;
+
+
diff --git a/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade.sql 
b/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade.sql
index 69f409555..0b07e8805 100644
--- a/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade.sql
+++ b/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade.sql
@@ -15,3 +15,10 @@
 
 -- If you have any changes to the AMS database, please record them in this 
file.
 -- We will confirm the corresponding version of these upgrade scripts when 
releasing.
+
+-- NEW SCHEMA CHANGE FOR CAS BASE BLOCKER
+TRUNCATE TABLE `table_blocker`;
+ALTER TABLE `table_blocker` DROP INDEX `table_index`;
+ALTER TABLE `table_blocker` ADD COLUMN `prev_blocker_id` bigint(20) NOT NULL 
DEFAULT -1;
+COMMENT ON COLUMN table_blocker.prev_blocker_id IS 'prev blocker id when 
created';
+ALTER TABLE `table_blocker` ADD UNIQUE KEY `uq_prev` 
(`catalog_name`,`db_name`,`table_name`, `prev_blocker_id`);
diff --git 
a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/persistence/mapper/MySQLTestBase.java
 
b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/persistence/mapper/MySQLTestBase.java
deleted file mode 100644
index 0ca6d6ced..000000000
--- 
a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/persistence/mapper/MySQLTestBase.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.server.persistence.mapper;
-
-public class MySQLTestBase {}
diff --git 
a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/executor/TestBlockerExpiringExecutor.java
 
b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/executor/TestBlockerExpiringExecutor.java
index 5a38cea30..ef87ea60f 100644
--- 
a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/executor/TestBlockerExpiringExecutor.java
+++ 
b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/executor/TestBlockerExpiringExecutor.java
@@ -55,19 +55,23 @@ public class TestBlockerExpiringExecutor extends 
TableServiceTestBase {
   public void testExpireBlocker() {
     BlockerExpiringExecutor blockerExpiringExecutor = new 
BlockerExpiringExecutor(tableManager);
     TableBlocker tableBlocker = new TableBlocker();
-    tableBlocker.setTableIdentifier(tableIdentifier);
+    tableBlocker.setTableIdentifier(tableIdentifier.getIdentifier());
     tableBlocker.setExpirationTime(System.currentTimeMillis() - 10);
     tableBlocker.setCreateTime(System.currentTimeMillis() - 20);
     
tableBlocker.setOperations(Collections.singletonList(BlockableOperation.OPTIMIZE.name()));
+    tableBlocker.setPrevBlockerId(-1L);
     persistency.insertTableBlocker(tableBlocker);
 
     TableBlocker tableBlocker2 = new TableBlocker();
-    tableBlocker2.setTableIdentifier(tableIdentifier);
+    tableBlocker2.setTableIdentifier(tableIdentifier.getIdentifier());
     tableBlocker2.setExpirationTime(System.currentTimeMillis() + 100000);
     tableBlocker2.setCreateTime(System.currentTimeMillis() - 20);
     
tableBlocker2.setOperations(Collections.singletonList(BlockableOperation.BATCH_WRITE.name()));
+    tableBlocker2.setPrevBlockerId(tableBlocker.getBlockerId());
     persistency.insertTableBlocker(tableBlocker2);
 
+    Assert.assertThrows(Exception.class, () -> 
persistency.insertTableBlocker(tableBlocker2));
+
     Assert.assertEquals(2, 
persistency.selectTableBlockers(tableIdentifier).size());
     
Assert.assertNotNull(persistency.selectTableBlocker(tableBlocker.getBlockerId()));
     
Assert.assertNotNull(persistency.selectTableBlocker(tableBlocker2.getBlockerId()));
@@ -83,15 +87,28 @@ public class TestBlockerExpiringExecutor extends 
TableServiceTestBase {
 
   private static class Persistency extends PersistentBase {
     public void insertTableBlocker(TableBlocker tableBlocker) {
-      doAs(TableBlockerMapper.class, mapper -> 
mapper.insertBlocker(tableBlocker));
+      doAs(TableBlockerMapper.class, mapper -> mapper.insert(tableBlocker));
     }
 
     public List<TableBlocker> selectTableBlockers(ServerTableIdentifier 
tableIdentifier) {
-      return getAs(TableBlockerMapper.class, mapper -> 
mapper.selectBlockers(tableIdentifier, 1));
+      return getAs(
+          TableBlockerMapper.class,
+          mapper ->
+              mapper.selectBlockers(
+                  tableIdentifier.getCatalog(),
+                  tableIdentifier.getDatabase(),
+                  tableIdentifier.getTableName(),
+                  1));
     }
 
     public void deleteBlockers(ServerTableIdentifier tableIdentifier) {
-      doAs(TableBlockerMapper.class, mapper -> 
mapper.deleteBlockers(tableIdentifier));
+      doAs(
+          TableBlockerMapper.class,
+          mapper ->
+              mapper.deleteTableBlockers(
+                  tableIdentifier.getCatalog(),
+                  tableIdentifier.getDatabase(),
+                  tableIdentifier.getTableName()));
     }
 
     public TableBlocker selectTableBlocker(long blockerId) {


Reply via email to