This is an automated email from the ASF dual-hosted git repository.

nathanma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new cc2968800 [AMORO-3239] Fix stack overflow caused by reading too many 
partitions in the filter (#3240)
cc2968800 is described below

commit cc29688004ecd0b24b5bbafaa2f8326cb19a087e
Author: 7hong <[email protected]>
AuthorDate: Wed Oct 16 20:31:40 2024 +0800

    [AMORO-3239] Fix stack overflow caused by reading too many partitions in 
the filter (#3240)
    
    * [AMORO-3239] Fix stack overflow caused by reading too many partitions in 
the filter
    
    * [AMORO-3239] Add the "ignore-filter-partition-count" parameter
    
    * move parameter "optimizer.ignore-filter-partition-count" to 
"self-optimizing.skip-filter-partition-count"
    
    * move parameter "self-optimizing.skip-filter-partition-count" to 
"refresh-tables.max-pending-partition-count"
---
 .../main/java/org/apache/amoro/server/AmoroManagementConf.java    | 6 ++++++
 .../apache/amoro/server/optimizing/plan/OptimizingEvaluator.java  | 6 +++++-
 .../apache/amoro/server/optimizing/plan/OptimizingPlanner.java    | 2 +-
 .../apache/amoro/server/table/executor/AsyncTableExecutors.java   | 3 ++-
 .../amoro/server/table/executor/TableRuntimeRefreshExecutor.java  | 8 ++++++--
 .../org/apache/amoro/server/TestDefaultOptimizingService.java     | 2 +-
 .../java/org/apache/amoro/server/dashboard/TestOverviewCache.java | 2 +-
 .../amoro/server/optimizing/plan/TestOptimizingEvaluator.java     | 2 +-
 .../org/apache/amoro/server/table/TestTableSummaryMetrics.java    | 2 +-
 dist/src/main/amoro-bin/conf/config.yaml                          | 1 +
 10 files changed, 25 insertions(+), 9 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index 415995d81..f52a0113f 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -162,6 +162,12 @@ public class AmoroManagementConf {
           .defaultValue(60000L)
           .withDescription("Interval for refreshing table metadata.");
 
+  public static final ConfigOption<Integer> REFRESH_MAX_PENDING_PARTITIONS =
+      ConfigOptions.key("refresh-tables.max-pending-partition-count")
+          .intType()
+          .defaultValue(100)
+          .withDescription("Filters will not be used beyond that number of 
partitions");
+
   public static final ConfigOption<Long> BLOCKER_TIMEOUT =
       ConfigOptions.key("blocker.timeout")
           .longType()
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
index 9be5be797..22ebd838b 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
@@ -62,15 +62,18 @@ public class OptimizingEvaluator {
   protected final MixedTable mixedTable;
   protected final TableRuntime tableRuntime;
   protected final TableSnapshot currentSnapshot;
+  protected final int maxPendingPartitions;
   protected boolean isInitialized = false;
 
   protected Map<String, PartitionEvaluator> needOptimizingPlanMap = 
Maps.newHashMap();
   protected Map<String, PartitionEvaluator> partitionPlanMap = 
Maps.newHashMap();
 
-  public OptimizingEvaluator(TableRuntime tableRuntime, MixedTable table) {
+  public OptimizingEvaluator(
+      TableRuntime tableRuntime, MixedTable table, int maxPendingPartitions) {
     this.tableRuntime = tableRuntime;
     this.mixedTable = table;
     this.currentSnapshot = IcebergTableUtil.getSnapshot(table, tableRuntime);
+    this.maxPendingPartitions = maxPendingPartitions;
   }
 
   public TableRuntime getTableRuntime() {
@@ -137,6 +140,7 @@ public class OptimizingEvaluator {
     needOptimizingPlanMap.putAll(
         partitionPlanMap.entrySet().stream()
             .filter(entry -> entry.getValue().isNecessary())
+            .limit(maxPendingPartitions)
             .collect(Collectors.toMap(entry -> entry.getKey(), entry -> 
entry.getValue())));
   }
 
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
index 3e93129c8..c87a66c8a 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
@@ -65,7 +65,7 @@ public class OptimizingPlanner extends OptimizingEvaluator {
       MixedTable table,
       double availableCore,
       long maxInputSizePerThread) {
-    super(tableRuntime, table);
+    super(tableRuntime, table, Integer.MAX_VALUE);
     this.partitionFilter =
         tableRuntime.getPendingInput() == null
             ? Expressions.alwaysTrue()
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java
index a45385026..118523255 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java
@@ -77,7 +77,8 @@ public class AsyncTableExecutors {
         new TableRuntimeRefreshExecutor(
             tableManager,
             conf.getInteger(AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT),
-            conf.getLong(AmoroManagementConf.REFRESH_TABLES_INTERVAL));
+            conf.getLong(AmoroManagementConf.REFRESH_TABLES_INTERVAL),
+            
conf.getInteger(AmoroManagementConf.REFRESH_MAX_PENDING_PARTITIONS));
     if (conf.getBoolean(AmoroManagementConf.AUTO_CREATE_TAGS_ENABLED)) {
       this.tagsAutoCreatingExecutor =
           new TagsAutoCreatingExecutor(
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
index 4fa3598a8..e613027f0 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
@@ -31,10 +31,13 @@ public class TableRuntimeRefreshExecutor extends 
BaseTableExecutor {
 
   // 1 minutes
   private final long interval;
+  private final int maxPendingPartitions;
 
-  public TableRuntimeRefreshExecutor(TableManager tableRuntimes, int poolSize, 
long interval) {
+  public TableRuntimeRefreshExecutor(
+      TableManager tableRuntimes, int poolSize, long interval, int 
maxPendingPartitions) {
     super(tableRuntimes, poolSize);
     this.interval = interval;
+    this.maxPendingPartitions = maxPendingPartitions;
   }
 
   @Override
@@ -48,7 +51,8 @@ public class TableRuntimeRefreshExecutor extends 
BaseTableExecutor {
 
   private void tryEvaluatingPendingInput(TableRuntime tableRuntime, MixedTable 
table) {
     if (tableRuntime.isOptimizingEnabled() && 
!tableRuntime.getOptimizingStatus().isProcessing()) {
-      OptimizingEvaluator evaluator = new OptimizingEvaluator(tableRuntime, 
table);
+      OptimizingEvaluator evaluator =
+          new OptimizingEvaluator(tableRuntime, table, maxPendingPartitions);
       if (evaluator.isNecessary()) {
         OptimizingEvaluator.PendingInput pendingInput = 
evaluator.getOptimizingPendingInput();
         logger.debug(
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
index 5a237700d..7945dd8ee 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
@@ -414,7 +414,7 @@ public class TestDefaultOptimizingService extends 
AMSTableTestBase {
   private class TableRuntimeRefresher extends TableRuntimeRefreshExecutor {
 
     public TableRuntimeRefresher() {
-      super(tableService(), 1, Integer.MAX_VALUE);
+      super(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE);
     }
 
     void refreshPending() {
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewCache.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewCache.java
index 1b59308b9..5ac3c85c3 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewCache.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewCache.java
@@ -109,7 +109,7 @@ public class TestOverviewCache extends AMSTableTestBase {
 
   void refreshPending() {
     TableRuntimeRefreshExecutor refresher =
-        new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE);
+        new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE, 
Integer.MAX_VALUE);
     
refresher.execute(tableService().getRuntime(serverTableIdentifier().getId()));
     refresher.dispose();
   }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
index 63a853fe1..395aa653d 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
@@ -110,7 +110,7 @@ public class TestOptimizingEvaluator extends 
MixedTablePlanTestBase {
   }
 
   protected OptimizingEvaluator buildOptimizingEvaluator() {
-    return new OptimizingEvaluator(getTableRuntime(), getMixedTable());
+    return new OptimizingEvaluator(getTableRuntime(), getMixedTable(), 100);
   }
 
   protected void assertEmptyInput(OptimizingEvaluator.PendingInput input) {
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java
index d5d42186c..bd2436ccb 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java
@@ -142,7 +142,7 @@ public class TestTableSummaryMetrics extends 
AMSTableTestBase {
 
   void refreshPending() {
     TableRuntimeRefreshExecutor refresher =
-        new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE);
+        new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE, 
Integer.MAX_VALUE);
     
refresher.execute(tableService().getRuntime(serverTableIdentifier().getId()));
     refresher.dispose();
   }
diff --git a/dist/src/main/amoro-bin/conf/config.yaml 
b/dist/src/main/amoro-bin/conf/config.yaml
index ae1de24e4..d887239bb 100644
--- a/dist/src/main/amoro-bin/conf/config.yaml
+++ b/dist/src/main/amoro-bin/conf/config.yaml
@@ -43,6 +43,7 @@ ams:
   refresh-tables:
     thread-count: 10
     interval: 60000 # 1min
+    max-pending-partition-count: 100 # default 100
 
   self-optimizing:
     commit-thread-count: 10

Reply via email to