This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new d6f6aae7f [AMORO-3685] Optimize the performance of fillSkipSet for
planning (#3688)
d6f6aae7f is described below
commit d6f6aae7fa11e343a7d5f247badae3bc4120bbc6
Author: Marig_Weizhi <[email protected]>
AuthorDate: Tue Aug 5 20:29:33 2025 +0800
[AMORO-3685] Optimize the performance of fillSkipSet for planning (#3688)
* Optimize the performance of fillSkipSet for planning
* Refactoring code
* fix tests
---------
Co-authored-by: ZhouJinsong <[email protected]>
---
.../amoro/server/optimizing/OptimizingQueue.java | 25 ++++++++++++++++++++++
.../amoro/server/optimizing/SchedulingPolicy.java | 2 --
.../persistence/mapper/TableBlockerMapper.java | 23 ++++++++++++++++++++
.../amoro/server/table/blocker/TableBlocker.java | 7 ++++--
4 files changed, 53 insertions(+), 4 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index ff5305e2a..65b2daf03 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -21,6 +21,7 @@ package org.apache.amoro.server.optimizing;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.OptimizerProperties;
import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.api.BlockableOperation;
import org.apache.amoro.api.OptimizingTaskId;
import org.apache.amoro.exception.OptimizingClosedException;
import org.apache.amoro.exception.PersistenceException;
@@ -37,16 +38,19 @@ import org.apache.amoro.server.manager.MetricManager;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.TaskFilesPersistence;
import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.QuotaProvider;
import org.apache.amoro.server.table.DefaultOptimizingState;
import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.server.table.blocker.TableBlocker;
import org.apache.amoro.server.utils.IcebergTableUtil;
import
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.table.MixedTable;
+import org.apache.amoro.table.TableIdentifier;
import org.apache.amoro.utils.CompatiblePropertyUtil;
import org.apache.amoro.utils.ExceptionUtil;
import org.apache.amoro.utils.MixedDataFiles;
@@ -237,11 +241,32 @@ public class OptimizingQueue extends PersistentBase {
private void scheduleTableIfNecessary(long startTime) {
if (planningTables.size() < maxPlanningParallelism) {
Set<ServerTableIdentifier> skipTables = new HashSet<>(planningTables);
+ skipBlockedTables(skipTables);
Optional.ofNullable(scheduler.scheduleTable(skipTables))
.ifPresent(tableRuntime -> triggerAsyncPlanning(tableRuntime,
skipTables, startTime));
}
}
+ private void skipBlockedTables(Set<ServerTableIdentifier> skipTables) {
+ List<TableBlocker> tableBlockerList =
+ getAs(
+ TableBlockerMapper.class,
+ mapper -> mapper.selectAllBlockers(System.currentTimeMillis()));
+ Map<TableIdentifier, ServerTableIdentifier> identifierMap =
Maps.newHashMap();
+ for (ServerTableIdentifier identifier :
scheduler.getTableRuntimeMap().keySet()) {
+ identifierMap.put(identifier.getIdentifier(), identifier);
+ }
+ tableBlockerList.stream()
+ .filter(blocker -> TableBlocker.conflict(BlockableOperation.OPTIMIZE,
blocker))
+ .map(
+ blocker ->
+ TableIdentifier.of(
+ blocker.getCatalog(), blocker.getDatabase(),
blocker.getTableName()))
+ .map(identifierMap::get)
+ .filter(Objects::nonNull)
+ .forEach(skipTables::add);
+ }
+
private void triggerAsyncPlanning(
DefaultTableRuntime tableRuntime, Set<ServerTableIdentifier> skipTables,
long startTime) {
LOG.info(
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java
index f25aadc12..9497cb024 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java
@@ -19,7 +19,6 @@
package org.apache.amoro.server.optimizing;
import org.apache.amoro.ServerTableIdentifier;
-import org.apache.amoro.api.BlockableOperation;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.optimizing.sorter.QuotaOccupySorter;
import org.apache.amoro.server.optimizing.sorter.SorterFactory;
@@ -128,7 +127,6 @@ public class SchedulingPolicy {
.filter(
optimizingState ->
!isTablePending(optimizingState)
- || optimizingState.isBlocked(BlockableOperation.OPTIMIZE)
|| currentTime - optimizingState.getLastPlanTime()
<
optimizingState.getOptimizingConfig().getMinPlanInterval())
.forEach(tableRuntime ->
originalSet.add(tableRuntime.getTableIdentifier()));
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableBlockerMapper.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableBlockerMapper.java
index 57e2ebd94..861daea44 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableBlockerMapper.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableBlockerMapper.java
@@ -36,6 +36,29 @@ import java.util.List;
public interface TableBlockerMapper {
String TABLE_NAME = "table_blocker";
+ @Select(
+ "SELECT
blocker_id,catalog_name,db_name,table_name,operations,create_time,"
+ + "expiration_time,properties FROM "
+ + TABLE_NAME
+ + " WHERE expiration_time > #{now,
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}")
+ @Results({
+ @Result(property = "blockerId", column = "blocker_id"),
+ @Result(property = "catalog", column = "catalog_name"),
+ @Result(property = "database", column = "db_name"),
+ @Result(property = "tableName", column = "table_name"),
+ @Result(
+ property = "operations",
+ column = "operations",
+ typeHandler = List2StringConverter.class),
+ @Result(property = "createTime", column = "create_time", typeHandler =
Long2TsConverter.class),
+ @Result(
+ property = "expirationTime",
+ column = "expiration_time",
+ typeHandler = Long2TsConverter.class),
+ @Result(property = "properties", column = "properties", typeHandler =
Map2StringConverter.class)
+ })
+ List<TableBlocker> selectAllBlockers(@Param("now") long now);
+
@Select(
"SELECT
blocker_id,catalog_name,db_name,table_name,operations,create_time,"
+ "expiration_time,properties FROM "
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/blocker/TableBlocker.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/blocker/TableBlocker.java
index 000b5054a..e678ecfab 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/blocker/TableBlocker.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/blocker/TableBlocker.java
@@ -48,8 +48,11 @@ public class TableBlocker {
public static boolean conflict(
BlockableOperation blockableOperation, List<TableBlocker> blockers) {
- return blockers.stream()
- .anyMatch(blocker ->
blocker.getOperations().contains(blockableOperation.name()));
+ return blockers.stream().anyMatch(blocker -> conflict(blockableOperation,
blocker));
+ }
+
+ public static boolean conflict(BlockableOperation blockableOperation,
TableBlocker blocker) {
+ return blocker.getOperations().contains(blockableOperation.name());
}
public static TableBlocker buildTableBlocker(