This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new d6f6aae7f [AMORO-3685] Optimize the performance of fillSkipSet for 
planning (#3688)
d6f6aae7f is described below

commit d6f6aae7fa11e343a7d5f247badae3bc4120bbc6
Author: Marig_Weizhi <[email protected]>
AuthorDate: Tue Aug 5 20:29:33 2025 +0800

    [AMORO-3685] Optimize the performance of fillSkipSet for planning (#3688)
    
    * Optimize the performance of fillSkipSet for planning
    
    * Refactoring code
    
    * fix tests
    
    ---------
    
    Co-authored-by: ZhouJinsong <[email protected]>
---
 .../amoro/server/optimizing/OptimizingQueue.java   | 25 ++++++++++++++++++++++
 .../amoro/server/optimizing/SchedulingPolicy.java  |  2 --
 .../persistence/mapper/TableBlockerMapper.java     | 23 ++++++++++++++++++++
 .../amoro/server/table/blocker/TableBlocker.java   |  7 ++++--
 4 files changed, 53 insertions(+), 4 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index ff5305e2a..65b2daf03 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -21,6 +21,7 @@ package org.apache.amoro.server.optimizing;
 import org.apache.amoro.AmoroTable;
 import org.apache.amoro.OptimizerProperties;
 import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.api.BlockableOperation;
 import org.apache.amoro.api.OptimizingTaskId;
 import org.apache.amoro.exception.OptimizingClosedException;
 import org.apache.amoro.exception.PersistenceException;
@@ -37,16 +38,19 @@ import org.apache.amoro.server.manager.MetricManager;
 import org.apache.amoro.server.persistence.PersistentBase;
 import org.apache.amoro.server.persistence.TaskFilesPersistence;
 import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
 import org.apache.amoro.server.resource.OptimizerInstance;
 import org.apache.amoro.server.resource.QuotaProvider;
 import org.apache.amoro.server.table.DefaultOptimizingState;
 import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.server.table.blocker.TableBlocker;
 import org.apache.amoro.server.utils.IcebergTableUtil;
 import 
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
 import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.amoro.table.MixedTable;
+import org.apache.amoro.table.TableIdentifier;
 import org.apache.amoro.utils.CompatiblePropertyUtil;
 import org.apache.amoro.utils.ExceptionUtil;
 import org.apache.amoro.utils.MixedDataFiles;
@@ -237,11 +241,32 @@ public class OptimizingQueue extends PersistentBase {
   private void scheduleTableIfNecessary(long startTime) {
     if (planningTables.size() < maxPlanningParallelism) {
       Set<ServerTableIdentifier> skipTables = new HashSet<>(planningTables);
+      skipBlockedTables(skipTables);
       Optional.ofNullable(scheduler.scheduleTable(skipTables))
           .ifPresent(tableRuntime -> triggerAsyncPlanning(tableRuntime, 
skipTables, startTime));
     }
   }
 
+  private void skipBlockedTables(Set<ServerTableIdentifier> skipTables) {
+    List<TableBlocker> tableBlockerList =
+        getAs(
+            TableBlockerMapper.class,
+            mapper -> mapper.selectAllBlockers(System.currentTimeMillis()));
+    Map<TableIdentifier, ServerTableIdentifier> identifierMap = 
Maps.newHashMap();
+    for (ServerTableIdentifier identifier : 
scheduler.getTableRuntimeMap().keySet()) {
+      identifierMap.put(identifier.getIdentifier(), identifier);
+    }
+    tableBlockerList.stream()
+        .filter(blocker -> TableBlocker.conflict(BlockableOperation.OPTIMIZE, 
blocker))
+        .map(
+            blocker ->
+                TableIdentifier.of(
+                    blocker.getCatalog(), blocker.getDatabase(), 
blocker.getTableName()))
+        .map(identifierMap::get)
+        .filter(Objects::nonNull)
+        .forEach(skipTables::add);
+  }
+
   private void triggerAsyncPlanning(
       DefaultTableRuntime tableRuntime, Set<ServerTableIdentifier> skipTables, 
long startTime) {
     LOG.info(
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java
index f25aadc12..9497cb024 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java
@@ -19,7 +19,6 @@
 package org.apache.amoro.server.optimizing;
 
 import org.apache.amoro.ServerTableIdentifier;
-import org.apache.amoro.api.BlockableOperation;
 import org.apache.amoro.resource.ResourceGroup;
 import org.apache.amoro.server.optimizing.sorter.QuotaOccupySorter;
 import org.apache.amoro.server.optimizing.sorter.SorterFactory;
@@ -128,7 +127,6 @@ public class SchedulingPolicy {
         .filter(
             optimizingState ->
                 !isTablePending(optimizingState)
-                    || optimizingState.isBlocked(BlockableOperation.OPTIMIZE)
                     || currentTime - optimizingState.getLastPlanTime()
                         < 
optimizingState.getOptimizingConfig().getMinPlanInterval())
         .forEach(tableRuntime -> 
originalSet.add(tableRuntime.getTableIdentifier()));
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableBlockerMapper.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableBlockerMapper.java
index 57e2ebd94..861daea44 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableBlockerMapper.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableBlockerMapper.java
@@ -36,6 +36,29 @@ import java.util.List;
 public interface TableBlockerMapper {
   String TABLE_NAME = "table_blocker";
 
+  @Select(
+      "SELECT 
blocker_id,catalog_name,db_name,table_name,operations,create_time,"
+          + "expiration_time,properties FROM "
+          + TABLE_NAME
+          + " WHERE expiration_time > #{now, 
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}")
+  @Results({
+    @Result(property = "blockerId", column = "blocker_id"),
+    @Result(property = "catalog", column = "catalog_name"),
+    @Result(property = "database", column = "db_name"),
+    @Result(property = "tableName", column = "table_name"),
+    @Result(
+        property = "operations",
+        column = "operations",
+        typeHandler = List2StringConverter.class),
+    @Result(property = "createTime", column = "create_time", typeHandler = 
Long2TsConverter.class),
+    @Result(
+        property = "expirationTime",
+        column = "expiration_time",
+        typeHandler = Long2TsConverter.class),
+    @Result(property = "properties", column = "properties", typeHandler = 
Map2StringConverter.class)
+  })
+  List<TableBlocker> selectAllBlockers(@Param("now") long now);
+
   @Select(
       "SELECT 
blocker_id,catalog_name,db_name,table_name,operations,create_time,"
           + "expiration_time,properties FROM "
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/blocker/TableBlocker.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/blocker/TableBlocker.java
index 000b5054a..e678ecfab 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/blocker/TableBlocker.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/blocker/TableBlocker.java
@@ -48,8 +48,11 @@ public class TableBlocker {
 
   public static boolean conflict(
       BlockableOperation blockableOperation, List<TableBlocker> blockers) {
-    return blockers.stream()
-        .anyMatch(blocker -> 
blocker.getOperations().contains(blockableOperation.name()));
+    return blockers.stream().anyMatch(blocker -> conflict(blockableOperation, 
blocker));
+  }
+
+  public static boolean conflict(BlockableOperation blockableOperation, 
TableBlocker blocker) {
+    return blocker.getOperations().contains(blockableOperation.name());
   }
 
   public static TableBlocker buildTableBlocker(

Reply via email to