This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 78708b8f8 [AMORO-3078]: A lock-free TableBlocker implementation and
does not rely on TableRuntime anymore. (#3079)
78708b8f8 is described below
commit 78708b8f80210da7dbd6b5e1dc130333e00c286c
Author: baiyangtx <[email protected]>
AuthorDate: Tue Aug 13 17:59:04 2024 +0800
[AMORO-3078]: A lock-free TableBlocker implementation and does not rely on
TableRuntime anymore. (#3079)
* Fix unit tests
* stash
* ut is passed
* spotless
* fix reviewers
* fix conflict
* fix spotless
* fix spotless
---
.../amoro/server/catalog/InternalCatalog.java | 5 +-
.../amoro/server/persistence/PersistentBase.java | 14 ++
.../persistence/mapper/TableBlockerMapper.java | 80 ++++++-----
.../amoro/server/table/DefaultTableService.java | 91 +++++++++++--
.../apache/amoro/server/table/TableRuntime.java | 147 ++-------------------
.../amoro/server/table/blocker/TableBlocker.java | 83 +++++++++++-
.../table/executor/BlockerExpiringExecutor.java | 6 +-
.../src/main/resources/derby/ams-derby-init.sql | 4 +-
.../src/main/resources/mysql/ams-mysql-init.sql | 3 +-
.../resources/mysql/upgrade-0.6.1-to-0.7.0.sql | 2 +-
.../src/main/resources/mysql/upgrade.sql | 6 +
.../main/resources/postgres/ams-postgres-init.sql | 8 +-
.../resources/postgres/upgrade-0.6.1-to-0.7.0.sql | 4 +-
.../src/main/resources/postgres/upgrade.sql | 7 +
.../server/persistence/mapper/MySQLTestBase.java | 21 ---
.../executor/TestBlockerExpiringExecutor.java | 27 +++-
16 files changed, 280 insertions(+), 228 deletions(-)
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java
index 952e7b207..4bdfe2373 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java
@@ -165,7 +165,10 @@ public abstract class InternalCatalog extends
ServerCatalog {
doAs(
TableMetaMapper.class,
mapper -> mapper.deleteTableMetaById(tableIdentifier.getId())),
- () -> doAs(TableBlockerMapper.class, mapper ->
mapper.deleteBlockers(tableIdentifier)),
+ () ->
+ doAs(
+ TableBlockerMapper.class,
+ mapper -> mapper.deleteTableBlockers(this.name(),
databaseName, tableName)),
() -> dropTableInternal(databaseName, tableName),
() ->
doAsExisted(
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/PersistentBase.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/PersistentBase.java
index 07e688793..8c9d00389 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/PersistentBase.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/PersistentBase.java
@@ -42,6 +42,20 @@ public abstract class PersistentBase {
.openSession(TransactionIsolationLevel.READ_COMMITTED));
}
+ public final <T> Long updateAs(Class<T> mapperClz, Function<T, Number>
updateFunction) {
+ try (NestedSqlSession session = beginSession()) {
+ try {
+ T mapper = getMapper(session, mapperClz);
+ Number number = updateFunction.apply(mapper);
+ session.commit();
+ return number.longValue();
+ } catch (Throwable t) {
+ session.rollback();
+ throw AmoroRuntimeException.wrap(t, PersistenceException::new);
+ }
+ }
+ }
+
protected final <T> void doAs(Class<T> mapperClz, Consumer<T> consumer) {
try (NestedSqlSession session = beginSession()) {
try {
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableBlockerMapper.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableBlockerMapper.java
index 2b772cfa5..57e2ebd94 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableBlockerMapper.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableBlockerMapper.java
@@ -18,7 +18,6 @@
package org.apache.amoro.server.persistence.mapper;
-import org.apache.amoro.api.ServerTableIdentifier;
import org.apache.amoro.server.persistence.converter.List2StringConverter;
import org.apache.amoro.server.persistence.converter.Long2TsConverter;
import org.apache.amoro.server.persistence.converter.Map2StringConverter;
@@ -42,14 +41,15 @@ public interface TableBlockerMapper {
+ "expiration_time,properties FROM "
+ TABLE_NAME
+ " "
- + "WHERE catalog_name = #{tableIdentifier.catalog} AND db_name =
#{tableIdentifier.database} "
- + "AND table_name = #{tableIdentifier.tableName} "
+ + "WHERE catalog_name = #{catalog} "
+ + "AND db_name = #{database} "
+ + "AND table_name = #{tableName} "
+ "AND expiration_time > #{now,
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}")
@Results({
@Result(property = "blockerId", column = "blocker_id"),
- @Result(property = "tableIdentifier.catalog", column = "catalog_name"),
- @Result(property = "tableIdentifier.database", column = "db_name"),
- @Result(property = "tableIdentifier.tableName", column = "table_name"),
+ @Result(property = "catalog", column = "catalog_name"),
+ @Result(property = "database", column = "db_name"),
+ @Result(property = "tableName", column = "table_name"),
@Result(
property = "operations",
column = "operations",
@@ -62,7 +62,10 @@ public interface TableBlockerMapper {
@Result(property = "properties", column = "properties", typeHandler =
Map2StringConverter.class)
})
List<TableBlocker> selectBlockers(
- @Param("tableIdentifier") ServerTableIdentifier tableIdentifier,
@Param("now") long now);
+ @Param("catalog") String catalog,
+ @Param("database") String database,
+ @Param("tableName") String tableName,
+ @Param("now") long now);
@Select(
"SELECT
blocker_id,catalog_name,db_name,table_name,operations,create_time,"
@@ -73,9 +76,9 @@ public interface TableBlockerMapper {
+ "AND expiration_time > #{now,
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}")
@Results({
@Result(property = "blockerId", column = "blocker_id"),
- @Result(property = "tableIdentifier.catalog", column = "catalog_name"),
- @Result(property = "tableIdentifier.database", column = "db_name"),
- @Result(property = "tableIdentifier.tableName", column = "table_name"),
+ @Result(property = "catalog", column = "catalog_name"),
+ @Result(property = "database", column = "db_name"),
+ @Result(property = "tableName", column = "table_name"),
@Result(
property = "operations",
column = "operations",
@@ -89,31 +92,34 @@ public interface TableBlockerMapper {
})
TableBlocker selectBlocker(@Param("blockerId") long blockerId, @Param("now")
long now);
+ @Update(
+ "UPDATE "
+ + TABLE_NAME
+ + " "
+ + "SET "
+ + "expiration_time = #{expiration,
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter} "
+ + "WHERE blocker_id = #{blockerId} "
+ + "AND expiration_time > #{now,
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}")
+ int renewBlocker(
+ @Param("blockerId") long blockerId,
+ @Param("now") long now,
+ @Param("expiration") long expiration);
+
@Insert(
"INSERT INTO "
+ TABLE_NAME
- + " (catalog_name,db_name,table_name,operations,create_time,"
- + "expiration_time,properties) VALUES ("
- + "#{blocker.tableIdentifier.catalog},"
- + "#{blocker.tableIdentifier.database},"
- + "#{blocker.tableIdentifier.tableName},"
+ +
"(catalog_name,db_name,table_name,operations,create_time,expiration_time,prev_blocker_id,properties)
"
+ + "VALUES ( "
+ + "#{blocker.catalog},"
+ + "#{blocker.database},"
+ + "#{blocker.tableName},"
+
"#{blocker.operations,typeHandler=org.apache.amoro.server.persistence.converter.List2StringConverter},"
+
"#{blocker.createTime,typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter},"
+
"#{blocker.expirationTime,typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter},"
- +
"#{blocker.properties,typeHandler=org.apache.amoro.server.persistence.converter.Map2StringConverter}"
- + ")")
+ + "#{blocker.prevBlockerId},"
+ +
"#{blocker.properties,typeHandler=org.apache.amoro.server.persistence.converter.Map2StringConverter})")
@Options(useGeneratedKeys = true, keyProperty = "blocker.blockerId")
- void insertBlocker(@Param("blocker") TableBlocker blocker);
-
- @Update(
- "UPDATE "
- + TABLE_NAME
- + " SET "
- + "expiration_time = #{expirationTime, "
- +
"typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter} "
- + "WHERE blocker_id = #{blockerId}")
- void updateBlockerExpirationTime(
- @Param("blockerId") long blockerId, @Param("expirationTime") long
expirationTime);
+ int insert(@Param("blocker") TableBlocker blocker);
@Delete("DELETE FROM " + TABLE_NAME + " " + "WHERE blocker_id =
#{blockerId}")
void deleteBlocker(@Param("blockerId") long blockerId);
@@ -122,17 +128,23 @@ public interface TableBlockerMapper {
"DELETE FROM "
+ TABLE_NAME
+ " "
- + "WHERE catalog_name = #{tableIdentifier.catalog} AND db_name =
#{tableIdentifier.database} "
- + "AND table_name = #{tableIdentifier.tableName} "
+ + "WHERE catalog_name = #{catalog} AND db_name = #{database} "
+ + "AND table_name = #{tableName} "
+ "AND expiration_time <= #{now,
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}")
int deleteExpiredBlockers(
- @Param("tableIdentifier") ServerTableIdentifier tableIdentifier,
@Param("now") long now);
+ @Param("catalog") String catalog,
+ @Param("database") String database,
+ @Param("tableName") String tableName,
+ @Param("now") long now);
@Delete(
"DELETE FROM "
+ TABLE_NAME
+ " "
- + "WHERE catalog_name = #{tableIdentifier.catalog} AND db_name =
#{tableIdentifier.database} "
- + "AND table_name = #{tableIdentifier.tableName}")
- int deleteBlockers(@Param("tableIdentifier") ServerTableIdentifier
tableIdentifier);
+ + "WHERE catalog_name = #{catalog} AND db_name = #{database} "
+ + "AND table_name = #{tableName}")
+ int deleteTableBlockers(
+ @Param("catalog") String catalog,
+ @Param("database") String database,
+ @Param("tableName") String tableName);
}
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
index b3266b9d7..9b54d18c9 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
@@ -35,12 +35,15 @@ import org.apache.amoro.server.catalog.ExternalCatalog;
import org.apache.amoro.server.catalog.InternalCatalog;
import org.apache.amoro.server.catalog.ServerCatalog;
import org.apache.amoro.server.exception.AlreadyExistsException;
+import org.apache.amoro.server.exception.BlockerConflictException;
import org.apache.amoro.server.exception.IllegalMetadataException;
import org.apache.amoro.server.exception.ObjectNotExistsException;
+import org.apache.amoro.server.exception.PersistenceException;
import org.apache.amoro.server.manager.MetricManager;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.persistence.StatedPersistentBase;
import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper;
+import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
import org.apache.amoro.server.table.blocker.TableBlocker;
import
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
@@ -55,6 +58,7 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -75,6 +79,7 @@ import java.util.stream.Collectors;
public class DefaultTableService extends StatedPersistentBase implements
TableService {
public static final Logger LOG =
LoggerFactory.getLogger(DefaultTableService.class);
+ private static final int TABLE_BLOCKER_RETRY = 3;
private final long externalCatalogRefreshingInterval;
private final long blockerTimeout;
private final Map<String, InternalCatalog> internalCatalogMap = new
ConcurrentHashMap<>();
@@ -251,32 +256,90 @@ public class DefaultTableService extends
StatedPersistentBase implements TableSe
TableIdentifier tableIdentifier,
List<BlockableOperation> operations,
Map<String, String> properties) {
- checkStarted();
- return getAndCheckExist(getOrSyncServerTableIdentifier(tableIdentifier))
- .block(operations, properties, blockerTimeout)
- .buildBlocker();
+ Preconditions.checkNotNull(operations, "operations should not be null");
+ Preconditions.checkArgument(!operations.isEmpty(), "operations should not
be empty");
+ Preconditions.checkArgument(blockerTimeout > 0, "blocker timeout must >
0");
+ String catalog = tableIdentifier.getCatalog();
+ String database = tableIdentifier.getDatabase();
+ String table = tableIdentifier.getTableName();
+ int tryCount = 0;
+ while (tryCount++ < TABLE_BLOCKER_RETRY) {
+ long now = System.currentTimeMillis();
+ doAs(
+ TableBlockerMapper.class,
+ mapper -> mapper.deleteExpiredBlockers(catalog, database, table,
now));
+ List<TableBlocker> tableBlockers =
+ getAs(
+ TableBlockerMapper.class,
+ mapper ->
+ mapper.selectBlockers(
+ tableIdentifier.getCatalog(),
+ tableIdentifier.getDatabase(),
+ tableIdentifier.getTableName(),
+ now));
+ if (TableBlocker.conflict(operations, tableBlockers)) {
+ throw new BlockerConflictException(operations + " is conflict with " +
tableBlockers);
+ }
+ Optional<Long> maxBlockerOpt =
+ tableBlockers.stream()
+ .map(TableBlocker::getBlockerId)
+ .max(Comparator.comparingLong(l -> l));
+ long prevBlockerId = maxBlockerOpt.orElse(-1L);
+
+ TableBlocker tableBlocker =
+ TableBlocker.buildTableBlocker(
+ tableIdentifier, operations, properties, now, blockerTimeout,
prevBlockerId);
+ try {
+ doAs(TableBlockerMapper.class, mapper -> mapper.insert(tableBlocker));
+ if (tableBlocker.getBlockerId() > 0) {
+ return tableBlocker.buildBlocker();
+ }
+ } catch (PersistenceException e) {
+ LOG.warn("An exception occurs when creating a blocker:{}",
tableBlocker, e);
+ }
+ }
+ throw new BlockerConflictException("Failed to create a blocker: conflict
meet max retry");
}
@Override
public void releaseBlocker(TableIdentifier tableIdentifier, String
blockerId) {
- checkStarted();
- TableRuntime tableRuntime =
getRuntime(getServerTableIdentifier(tableIdentifier));
- if (tableRuntime != null) {
- tableRuntime.release(blockerId);
- }
+ doAs(TableBlockerMapper.class, mapper ->
mapper.deleteBlocker(Long.parseLong(blockerId)));
}
@Override
public long renewBlocker(TableIdentifier tableIdentifier, String blockerId) {
- checkStarted();
- TableRuntime tableRuntime =
getAndCheckExist(getServerTableIdentifier(tableIdentifier));
- return tableRuntime.renew(blockerId, blockerTimeout);
+ int retry = 0;
+ while (retry++ < TABLE_BLOCKER_RETRY) {
+ long now = System.currentTimeMillis();
+ long id = Long.parseLong(blockerId);
+ TableBlocker tableBlocker =
+ getAs(TableBlockerMapper.class, mapper -> mapper.selectBlocker(id,
now));
+ if (tableBlocker == null) {
+ throw new ObjectNotExistsException("Blocker " + blockerId + " of " +
tableIdentifier);
+ }
+ long current = System.currentTimeMillis();
+ long expirationTime = now + blockerTimeout;
+ long effectRow =
+ updateAs(
+ TableBlockerMapper.class, mapper -> mapper.renewBlocker(id,
current, expirationTime));
+ if (effectRow > 0) {
+ return expirationTime;
+ }
+ }
+ throw new BlockerConflictException("Failed to renew a blocker: conflict
meet max retry");
}
@Override
public List<Blocker> getBlockers(TableIdentifier tableIdentifier) {
- checkStarted();
- return
getAndCheckExist(getOrSyncServerTableIdentifier(tableIdentifier)).getBlockers().stream()
+ return getAs(
+ TableBlockerMapper.class,
+ mapper ->
+ mapper.selectBlockers(
+ tableIdentifier.getCatalog(),
+ tableIdentifier.getDatabase(),
+ tableIdentifier.getTableName(),
+ System.currentTimeMillis()))
+ .stream()
.map(TableBlocker::buildBlocker)
.collect(Collectors.toList());
}
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java
index e97143133..95c68e08d 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java
@@ -26,8 +26,6 @@ import org.apache.amoro.api.StateField;
import org.apache.amoro.api.config.OptimizingConfig;
import org.apache.amoro.api.config.TableConfiguration;
import org.apache.amoro.server.AmoroServiceConstants;
-import org.apache.amoro.server.exception.BlockerConflictException;
-import org.apache.amoro.server.exception.ObjectNotExistsException;
import org.apache.amoro.server.metrics.MetricRegistry;
import org.apache.amoro.server.optimizing.OptimizingProcess;
import org.apache.amoro.server.optimizing.OptimizingStatus;
@@ -43,24 +41,18 @@ import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.table.MixedTable;
-import org.apache.amoro.table.blocker.RenewableBlocker;
import org.apache.iceberg.Snapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nonnull;
-
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
public class TableRuntime extends StatedPersistentBase {
@@ -95,7 +87,6 @@ public class TableRuntime extends StatedPersistentBase {
@StateField private volatile OptimizingEvaluator.PendingInput pendingInput;
private volatile long lastPlanTime;
private final TableOptimizingMetrics optimizingMetrics;
- private final ReentrantLock blockerLock = new ReentrantLock();
protected TableRuntime(
ServerTableIdentifier tableIdentifier,
@@ -513,96 +504,6 @@ public class TableRuntime extends StatedPersistentBase {
.doubleValue();
}
- /**
- * Get all valid blockers.
- *
- * @return all valid blockers
- */
- public List<TableBlocker> getBlockers() {
- blockerLock.lock();
- try {
- return getAs(
- TableBlockerMapper.class,
- mapper -> mapper.selectBlockers(tableIdentifier,
System.currentTimeMillis()));
- } finally {
- blockerLock.unlock();
- }
- }
-
- /**
- * Block some operations for table.
- *
- * @param operations - operations to be blocked
- * @param properties -
- * @param blockerTimeout -
- * @return TableBlocker if success
- */
- public TableBlocker block(
- List<BlockableOperation> operations,
- @Nonnull Map<String, String> properties,
- long blockerTimeout) {
- Preconditions.checkNotNull(operations, "operations should not be null");
- Preconditions.checkArgument(!operations.isEmpty(), "operations should not
be empty");
- Preconditions.checkArgument(blockerTimeout > 0, "blocker timeout must >
0");
- blockerLock.lock();
- try {
- long now = System.currentTimeMillis();
- List<TableBlocker> tableBlockers =
- getAs(TableBlockerMapper.class, mapper ->
mapper.selectBlockers(tableIdentifier, now));
- if (conflict(operations, tableBlockers)) {
- throw new BlockerConflictException(operations + " is conflict with " +
tableBlockers);
- }
- TableBlocker tableBlocker =
- buildTableBlocker(tableIdentifier, operations, properties, now,
blockerTimeout);
- doAs(TableBlockerMapper.class, mapper ->
mapper.insertBlocker(tableBlocker));
- return tableBlocker;
- } finally {
- blockerLock.unlock();
- }
- }
-
- /**
- * Renew blocker.
- *
- * @param blockerId - blockerId
- * @param blockerTimeout - timeout
- * @throws IllegalStateException if blocker not exist
- */
- public long renew(String blockerId, long blockerTimeout) {
- blockerLock.lock();
- try {
- long now = System.currentTimeMillis();
- TableBlocker tableBlocker =
- getAs(
- TableBlockerMapper.class,
- mapper -> mapper.selectBlocker(Long.parseLong(blockerId), now));
- if (tableBlocker == null) {
- throw new ObjectNotExistsException("Blocker " + blockerId + " of " +
tableIdentifier);
- }
- long expirationTime = now + blockerTimeout;
- doAs(
- TableBlockerMapper.class,
- mapper ->
mapper.updateBlockerExpirationTime(Long.parseLong(blockerId), expirationTime));
- return expirationTime;
- } finally {
- blockerLock.unlock();
- }
- }
-
- /**
- * Release blocker, succeed when blocker not exist.
- *
- * @param blockerId - blockerId
- */
- public void release(String blockerId) {
- blockerLock.lock();
- try {
- doAs(TableBlockerMapper.class, mapper ->
mapper.deleteBlocker(Long.parseLong(blockerId)));
- } finally {
- blockerLock.unlock();
- }
- }
-
/**
* Check if operation are blocked now.
*
@@ -610,43 +511,15 @@ public class TableRuntime extends StatedPersistentBase {
* @return true if blocked
*/
public boolean isBlocked(BlockableOperation operation) {
- blockerLock.lock();
- try {
- List<TableBlocker> tableBlockers =
- getAs(
- TableBlockerMapper.class,
- mapper -> mapper.selectBlockers(tableIdentifier,
System.currentTimeMillis()));
- return conflict(operation, tableBlockers);
- } finally {
- blockerLock.unlock();
- }
- }
-
- private boolean conflict(
- List<BlockableOperation> blockableOperations, List<TableBlocker>
blockers) {
- return blockableOperations.stream().anyMatch(operation ->
conflict(operation, blockers));
- }
-
- private boolean conflict(BlockableOperation blockableOperation,
List<TableBlocker> blockers) {
- return blockers.stream()
- .anyMatch(blocker ->
blocker.getOperations().contains(blockableOperation.name()));
- }
-
- private TableBlocker buildTableBlocker(
- ServerTableIdentifier tableIdentifier,
- List<BlockableOperation> operations,
- Map<String, String> properties,
- long now,
- long blockerTimeout) {
- TableBlocker tableBlocker = new TableBlocker();
- tableBlocker.setTableIdentifier(tableIdentifier);
- tableBlocker.setCreateTime(now);
- tableBlocker.setExpirationTime(now + blockerTimeout);
- tableBlocker.setOperations(
-
operations.stream().map(BlockableOperation::name).collect(Collectors.toList()));
- HashMap<String, String> propertiesOfTableBlocker = new
HashMap<>(properties);
- propertiesOfTableBlocker.put(RenewableBlocker.BLOCKER_TIMEOUT,
blockerTimeout + "");
- tableBlocker.setProperties(propertiesOfTableBlocker);
- return tableBlocker;
+ List<TableBlocker> tableBlockers =
+ getAs(
+ TableBlockerMapper.class,
+ mapper ->
+ mapper.selectBlockers(
+ tableIdentifier.getCatalog(),
+ tableIdentifier.getDatabase(),
+ tableIdentifier.getTableName(),
+ System.currentTimeMillis()));
+ return TableBlocker.conflict(operation, tableBlockers);
}
}
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/blocker/TableBlocker.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/blocker/TableBlocker.java
index 8cb68d0a1..000b5054a 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/blocker/TableBlocker.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/blocker/TableBlocker.java
@@ -20,31 +20,89 @@ package org.apache.amoro.server.table.blocker;
import org.apache.amoro.api.BlockableOperation;
import org.apache.amoro.api.Blocker;
-import org.apache.amoro.api.ServerTableIdentifier;
+import org.apache.amoro.api.TableIdentifier;
import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.table.blocker.RenewableBlocker;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class TableBlocker {
- private ServerTableIdentifier tableIdentifier;
+ private String catalog;
+ private String database;
+ private String tableName;
private long blockerId;
private List<String> operations;
private long createTime;
private long expirationTime;
+ private long prevBlockerId;
private Map<String, String> properties;
+ public static boolean conflict(
+ List<BlockableOperation> blockableOperations, List<TableBlocker>
blockers) {
+ return blockableOperations.stream().anyMatch(operation ->
conflict(operation, blockers));
+ }
+
+ public static boolean conflict(
+ BlockableOperation blockableOperation, List<TableBlocker> blockers) {
+ return blockers.stream()
+ .anyMatch(blocker ->
blocker.getOperations().contains(blockableOperation.name()));
+ }
+
+ public static TableBlocker buildTableBlocker(
+ TableIdentifier tableIdentifier,
+ List<BlockableOperation> operations,
+ Map<String, String> properties,
+ long now,
+ long blockerTimeout,
+ long prevBlockerId) {
+ TableBlocker tableBlocker = new TableBlocker();
+ tableBlocker.setCatalog(tableIdentifier.getCatalog());
+ tableBlocker.setDatabase(tableIdentifier.getDatabase());
+ tableBlocker.setTableName(tableIdentifier.getTableName());
+ tableBlocker.setCreateTime(now);
+ tableBlocker.setExpirationTime(now + blockerTimeout);
+ tableBlocker.setOperations(
+
operations.stream().map(BlockableOperation::name).collect(Collectors.toList()));
+ HashMap<String, String> propertiesOfTableBlocker = new
HashMap<>(properties);
+ propertiesOfTableBlocker.put(RenewableBlocker.BLOCKER_TIMEOUT,
blockerTimeout + "");
+ tableBlocker.setProperties(propertiesOfTableBlocker);
+ return tableBlocker;
+ }
+
public TableBlocker() {}
- public ServerTableIdentifier getTableIdentifier() {
- return tableIdentifier;
+ public String getCatalog() {
+ return catalog;
+ }
+
+ public void setCatalog(String catalog) {
+ this.catalog = catalog;
+ }
+
+ public String getDatabase() {
+ return database;
}
- public void setTableIdentifier(ServerTableIdentifier tableIdentifier) {
- this.tableIdentifier = tableIdentifier;
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public void setTableIdentifier(TableIdentifier tableIdentifier) {
+ this.catalog = tableIdentifier.getCatalog();
+ this.database = tableIdentifier.getDatabase();
+ this.tableName = tableIdentifier.getTableName();
}
public long getBlockerId() {
@@ -79,6 +137,14 @@ public class TableBlocker {
this.expirationTime = expirationTime;
}
+ public void setPrevBlockerId(long prevBlockerId) {
+ this.prevBlockerId = prevBlockerId;
+ }
+
+ public long getPrevBlockerId() {
+ return this.prevBlockerId;
+ }
+
public Map<String, String> getProperties() {
return properties;
}
@@ -99,11 +165,14 @@ public class TableBlocker {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
- .add("tableIdentifier", tableIdentifier)
+ .add("catalog", catalog)
+ .add("database", database)
+ .add("tableName", tableName)
.add("blockerId", blockerId)
.add("operations", operations)
.add("createTime", createTime)
.add("expirationTime", expirationTime)
+ .add("prevBlockerId", prevBlockerId)
.add("properties", properties)
.toString();
}
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BlockerExpiringExecutor.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BlockerExpiringExecutor.java
index 169819815..909d7c6a0 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BlockerExpiringExecutor.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BlockerExpiringExecutor.java
@@ -55,11 +55,13 @@ public class BlockerExpiringExecutor extends
BaseTableExecutor {
private static class Persistency extends PersistentBase {
public void doExpiring(TableRuntime tableRuntime) {
+ String catalog = tableRuntime.getTableIdentifier().getCatalog();
+ String database = tableRuntime.getTableIdentifier().getDatabase();
+ String table = tableRuntime.getTableIdentifier().getTableName();
doAs(
TableBlockerMapper.class,
mapper ->
- mapper.deleteExpiredBlockers(
- tableRuntime.getTableIdentifier(),
System.currentTimeMillis()));
+ mapper.deleteExpiredBlockers(catalog, database, table,
System.currentTimeMillis()));
}
}
}
diff --git
a/amoro-ams/amoro-ams-server/src/main/resources/derby/ams-derby-init.sql
b/amoro-ams/amoro-ams-server/src/main/resources/derby/ams-derby-init.sql
index e7856a57d..a7dc2ecf1 100644
--- a/amoro-ams/amoro-ams-server/src/main/resources/derby/ams-derby-init.sql
+++ b/amoro-ams/amoro-ams-server/src/main/resources/derby/ams-derby-init.sql
@@ -200,5 +200,7 @@ CREATE TABLE table_blocker (
create_time timestamp DEFAULT NULL,
expiration_time timestamp DEFAULT NULL,
properties clob(64m),
- PRIMARY KEY (blocker_id)
+ prev_blocker_id bigint NOT NULL DEFAULT -1,
+ PRIMARY KEY (blocker_id),
+ CONSTRAINT prev_uq UNIQUE (catalog_name, db_name, table_name,
prev_blocker_id)
);
diff --git
a/amoro-ams/amoro-ams-server/src/main/resources/mysql/ams-mysql-init.sql
b/amoro-ams/amoro-ams-server/src/main/resources/mysql/ams-mysql-init.sql
index 82fe7072d..45bc86092 100644
--- a/amoro-ams/amoro-ams-server/src/main/resources/mysql/ams-mysql-init.sql
+++ b/amoro-ams/amoro-ams-server/src/main/resources/mysql/ams-mysql-init.sql
@@ -219,6 +219,7 @@ CREATE TABLE `table_blocker` (
`create_time` timestamp NULL DEFAULT NULL COMMENT 'Blocker create time',
`expiration_time` timestamp NULL DEFAULT NULL COMMENT 'Blocker expiration
time',
`properties` mediumtext COMMENT 'Blocker properties',
+ `prev_blocker_id` bigint(20) NOT NULL DEFAULT -1 COMMENT 'prev blocker id
when created',
PRIMARY KEY (`blocker_id`),
- KEY `table_index` (`catalog_name`,`db_name`,`table_name`)
+ UNIQUE KEY `uq_prev` (`catalog_name`,`db_name`,`table_name`,
`prev_blocker_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Table blockers'
ROW_FORMAT=DYNAMIC;
diff --git
a/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade-0.6.1-to-0.7.0.sql
b/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade-0.6.1-to-0.7.0.sql
index 048919990..3fff47388 100644
---
a/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade-0.6.1-to-0.7.0.sql
+++
b/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade-0.6.1-to-0.7.0.sql
@@ -17,4 +17,4 @@ ALTER TABLE table_identifier CHANGE COLUMN table_name
table_name varchar(256) NO
ALTER TABLE table_optimizing_process CHANGE COLUMN table_name table_name
varchar(256) NOT NULL;
ALTER TABLE table_metadata CHANGE COLUMN table_name table_name varchar(256)
NOT NULL;
ALTER TABLE table_runtime CHANGE COLUMN table_name table_name varchar(256) NOT
NULL;
-ALTER TABLE table_blocker CHANGE COLUMN table_name table_name varchar(256) NOT
NULL;
\ No newline at end of file
+ALTER TABLE table_blocker CHANGE COLUMN table_name table_name varchar(256) NOT
NULL;
diff --git a/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade.sql
b/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade.sql
index 69f409555..a457d458d 100644
--- a/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade.sql
+++ b/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade.sql
@@ -15,3 +15,9 @@
-- If you have any changes to the AMS database, please record them in this
file.
-- We will confirm the corresponding version of these upgrade scripts when
releasing.
+
+-- NEW SCHEMA CHANGE FOR CAS BASE BLOCKER
+TRUNCATE TABLE `table_blocker`;
+ALTER TABLE `table_blocker` DROP INDEX `table_index`;
+ALTER TABLE `table_blocker` ADD COLUMN `prev_blocker_id` bigint(20) NOT NULL
DEFAULT -1 COMMENT 'prev blocker id when created';
+ALTER TABLE `table_blocker` ADD UNIQUE KEY `uq_prev`
(`catalog_name`,`db_name`,`table_name`, `prev_blocker_id`);
diff --git
a/amoro-ams/amoro-ams-server/src/main/resources/postgres/ams-postgres-init.sql
b/amoro-ams/amoro-ams-server/src/main/resources/postgres/ams-postgres-init.sql
index dd6f0e0dd..40688a5ed 100644
---
a/amoro-ams/amoro-ams-server/src/main/resources/postgres/ams-postgres-init.sql
+++
b/amoro-ams/amoro-ams-server/src/main/resources/postgres/ams-postgres-init.sql
@@ -352,9 +352,10 @@ CREATE TABLE table_blocker
operations VARCHAR(128) NOT NULL,
create_time TIMESTAMP,
expiration_time TIMESTAMP,
- properties TEXT
+ properties TEXT,
+ prev_blocker_id BIGSERIAL NOT NULL
);
-CREATE INDEX blocker_index ON table_optimizing_process (catalog_name, db_name,
table_name);
+CREATE UNIQUE INDEX uq_prev ON table_blocker (catalog_name, db_name,
table_name, prev_blocker_id);
COMMENT ON TABLE table_blocker IS 'Table blockers';
COMMENT ON COLUMN table_blocker.blocker_id IS 'Blocker unique ID';
@@ -364,4 +365,5 @@ COMMENT ON COLUMN table_blocker.table_name IS 'Table name';
COMMENT ON COLUMN table_blocker.operations IS 'Blocked operations';
COMMENT ON COLUMN table_blocker.create_time IS 'Blocker create time';
COMMENT ON COLUMN table_blocker.expiration_time IS 'Blocker expiration time';
-COMMENT ON COLUMN table_blocker.properties IS 'Blocker properties';
\ No newline at end of file
+COMMENT ON COLUMN table_blocker.properties IS 'Blocker properties';
+COMMENT ON COLUMN table_blocker.prev_blocker_id is 'prev blocker id when
created';
diff --git
a/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade-0.6.1-to-0.7.0.sql
b/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade-0.6.1-to-0.7.0.sql
index c1bb51995..84cc86db2 100644
---
a/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade-0.6.1-to-0.7.0.sql
+++
b/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade-0.6.1-to-0.7.0.sql
@@ -20,4 +20,6 @@ ALTER TABLE table_identifier ALTER COLUMN table_name TYPE
varchar(256) NOT NULL;
ALTER TABLE table_optimizing_process ALTER COLUMN table_name TYPE varchar(256)
NOT NULL;
ALTER TABLE table_metadata ALTER COLUMN table_name TYPE varchar(256) NOT NULL;
ALTER TABLE table_runtime ALTER COLUMN table_name TYPE varchar(256) NOT NULL;
-ALTER TABLE table_blocker ALTER COLUMN table_name TYPE varchar(256) NOT NULL;
\ No newline at end of file
+ALTER TABLE table_blocker ALTER COLUMN table_name TYPE varchar(256) NOT NULL;
+
+
diff --git a/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade.sql
b/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade.sql
index 69f409555..0b07e8805 100644
--- a/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade.sql
+++ b/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade.sql
@@ -15,3 +15,10 @@
-- If you have any changes to the AMS database, please record them in this
file.
-- We will confirm the corresponding version of these upgrade scripts when
releasing.
+
+-- NEW SCHEMA CHANGE FOR CAS BASE BLOCKER
+TRUNCATE TABLE `table_blocker`;
+ALTER TABLE `table_blocker` DROP INDEX `table_index`;
+ALTER TABLE `table_blocker` ADD COLUMN `prev_blocker_id` bigint(20) NOT NULL
DEFAULT -1;
+COMMENT ON COLUMN table_blocker.prev_blocker_id IS 'prev blocker id when
created';
+ALTER TABLE `table_blocker` ADD UNIQUE KEY `uq_prev`
(`catalog_name`,`db_name`,`table_name`, `prev_blocker_id`);
diff --git
a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/persistence/mapper/MySQLTestBase.java
b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/persistence/mapper/MySQLTestBase.java
deleted file mode 100644
index 0ca6d6ced..000000000
---
a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/persistence/mapper/MySQLTestBase.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.server.persistence.mapper;
-
-public class MySQLTestBase {}
diff --git
a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/executor/TestBlockerExpiringExecutor.java
b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/executor/TestBlockerExpiringExecutor.java
index 5a38cea30..ef87ea60f 100644
---
a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/executor/TestBlockerExpiringExecutor.java
+++
b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/executor/TestBlockerExpiringExecutor.java
@@ -55,19 +55,23 @@ public class TestBlockerExpiringExecutor extends
TableServiceTestBase {
public void testExpireBlocker() {
BlockerExpiringExecutor blockerExpiringExecutor = new
BlockerExpiringExecutor(tableManager);
TableBlocker tableBlocker = new TableBlocker();
- tableBlocker.setTableIdentifier(tableIdentifier);
+ tableBlocker.setTableIdentifier(tableIdentifier.getIdentifier());
tableBlocker.setExpirationTime(System.currentTimeMillis() - 10);
tableBlocker.setCreateTime(System.currentTimeMillis() - 20);
tableBlocker.setOperations(Collections.singletonList(BlockableOperation.OPTIMIZE.name()));
+ tableBlocker.setPrevBlockerId(-1L);
persistency.insertTableBlocker(tableBlocker);
TableBlocker tableBlocker2 = new TableBlocker();
- tableBlocker2.setTableIdentifier(tableIdentifier);
+ tableBlocker2.setTableIdentifier(tableIdentifier.getIdentifier());
tableBlocker2.setExpirationTime(System.currentTimeMillis() + 100000);
tableBlocker2.setCreateTime(System.currentTimeMillis() - 20);
tableBlocker2.setOperations(Collections.singletonList(BlockableOperation.BATCH_WRITE.name()));
+ tableBlocker2.setPrevBlockerId(tableBlocker.getBlockerId());
persistency.insertTableBlocker(tableBlocker2);
+ Assert.assertThrows(Exception.class, () ->
persistency.insertTableBlocker(tableBlocker2));
+
Assert.assertEquals(2,
persistency.selectTableBlockers(tableIdentifier).size());
Assert.assertNotNull(persistency.selectTableBlocker(tableBlocker.getBlockerId()));
Assert.assertNotNull(persistency.selectTableBlocker(tableBlocker2.getBlockerId()));
@@ -83,15 +87,28 @@ public class TestBlockerExpiringExecutor extends
TableServiceTestBase {
private static class Persistency extends PersistentBase {
public void insertTableBlocker(TableBlocker tableBlocker) {
- doAs(TableBlockerMapper.class, mapper ->
mapper.insertBlocker(tableBlocker));
+ doAs(TableBlockerMapper.class, mapper -> mapper.insert(tableBlocker));
}
public List<TableBlocker> selectTableBlockers(ServerTableIdentifier
tableIdentifier) {
- return getAs(TableBlockerMapper.class, mapper ->
mapper.selectBlockers(tableIdentifier, 1));
+ return getAs(
+ TableBlockerMapper.class,
+ mapper ->
+ mapper.selectBlockers(
+ tableIdentifier.getCatalog(),
+ tableIdentifier.getDatabase(),
+ tableIdentifier.getTableName(),
+ 1));
}
public void deleteBlockers(ServerTableIdentifier tableIdentifier) {
- doAs(TableBlockerMapper.class, mapper ->
mapper.deleteBlockers(tableIdentifier));
+ doAs(
+ TableBlockerMapper.class,
+ mapper ->
+ mapper.deleteTableBlockers(
+ tableIdentifier.getCatalog(),
+ tableIdentifier.getDatabase(),
+ tableIdentifier.getTableName()));
}
public TableBlocker selectTableBlocker(long blockerId) {