This is an automated email from the ASF dual-hosted git repository.
nathanma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new cc2968800 [AMORO-3239] Fix stack overflow caused by reading too many
partitions in the filter (#3240)
cc2968800 is described below
commit cc29688004ecd0b24b5bbafaa2f8326cb19a087e
Author: 7hong <[email protected]>
AuthorDate: Wed Oct 16 20:31:40 2024 +0800
[AMORO-3239] Fix stack overflow caused by reading too many partitions in
the filter (#3240)
* [AMORO-3239] Fix stack overflow caused by reading too many partitions in
the filter
* [AMORO-3239] Add the "ignore-filter-partition-count" parameter
* move parameter "optimizer.ignore-filter-partition-count" to
"self-optimizing.skip-filter-partition-count"
* move parameter "self-optimizing.skip-filter-partition-count" to
"refresh-tables.max-pending-partition-count"
---
.../main/java/org/apache/amoro/server/AmoroManagementConf.java | 6 ++++++
.../apache/amoro/server/optimizing/plan/OptimizingEvaluator.java | 6 +++++-
.../apache/amoro/server/optimizing/plan/OptimizingPlanner.java | 2 +-
.../apache/amoro/server/table/executor/AsyncTableExecutors.java | 3 ++-
.../amoro/server/table/executor/TableRuntimeRefreshExecutor.java | 8 ++++++--
.../org/apache/amoro/server/TestDefaultOptimizingService.java | 2 +-
.../java/org/apache/amoro/server/dashboard/TestOverviewCache.java | 2 +-
.../amoro/server/optimizing/plan/TestOptimizingEvaluator.java | 2 +-
.../org/apache/amoro/server/table/TestTableSummaryMetrics.java | 2 +-
dist/src/main/amoro-bin/conf/config.yaml | 1 +
10 files changed, 25 insertions(+), 9 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index 415995d81..f52a0113f 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -162,6 +162,12 @@ public class AmoroManagementConf {
.defaultValue(60000L)
.withDescription("Interval for refreshing table metadata.");
+ public static final ConfigOption<Integer> REFRESH_MAX_PENDING_PARTITIONS =
+ ConfigOptions.key("refresh-tables.max-pending-partition-count")
+ .intType()
+ .defaultValue(100)
+ .withDescription("Filters will not be used beyond that number of
partitions");
+
public static final ConfigOption<Long> BLOCKER_TIMEOUT =
ConfigOptions.key("blocker.timeout")
.longType()
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
index 9be5be797..22ebd838b 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
@@ -62,15 +62,18 @@ public class OptimizingEvaluator {
protected final MixedTable mixedTable;
protected final TableRuntime tableRuntime;
protected final TableSnapshot currentSnapshot;
+ protected final int maxPendingPartitions;
protected boolean isInitialized = false;
protected Map<String, PartitionEvaluator> needOptimizingPlanMap =
Maps.newHashMap();
protected Map<String, PartitionEvaluator> partitionPlanMap =
Maps.newHashMap();
- public OptimizingEvaluator(TableRuntime tableRuntime, MixedTable table) {
+ public OptimizingEvaluator(
+ TableRuntime tableRuntime, MixedTable table, int maxPendingPartitions) {
this.tableRuntime = tableRuntime;
this.mixedTable = table;
this.currentSnapshot = IcebergTableUtil.getSnapshot(table, tableRuntime);
+ this.maxPendingPartitions = maxPendingPartitions;
}
public TableRuntime getTableRuntime() {
@@ -137,6 +140,7 @@ public class OptimizingEvaluator {
needOptimizingPlanMap.putAll(
partitionPlanMap.entrySet().stream()
.filter(entry -> entry.getValue().isNecessary())
+ .limit(maxPendingPartitions)
.collect(Collectors.toMap(entry -> entry.getKey(), entry ->
entry.getValue())));
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
index 3e93129c8..c87a66c8a 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
@@ -65,7 +65,7 @@ public class OptimizingPlanner extends OptimizingEvaluator {
MixedTable table,
double availableCore,
long maxInputSizePerThread) {
- super(tableRuntime, table);
+ super(tableRuntime, table, Integer.MAX_VALUE);
this.partitionFilter =
tableRuntime.getPendingInput() == null
? Expressions.alwaysTrue()
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java
index a45385026..118523255 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java
@@ -77,7 +77,8 @@ public class AsyncTableExecutors {
new TableRuntimeRefreshExecutor(
tableManager,
conf.getInteger(AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT),
- conf.getLong(AmoroManagementConf.REFRESH_TABLES_INTERVAL));
+ conf.getLong(AmoroManagementConf.REFRESH_TABLES_INTERVAL),
+
conf.getInteger(AmoroManagementConf.REFRESH_MAX_PENDING_PARTITIONS));
if (conf.getBoolean(AmoroManagementConf.AUTO_CREATE_TAGS_ENABLED)) {
this.tagsAutoCreatingExecutor =
new TagsAutoCreatingExecutor(
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
index 4fa3598a8..e613027f0 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
@@ -31,10 +31,13 @@ public class TableRuntimeRefreshExecutor extends
BaseTableExecutor {
// 1 minutes
private final long interval;
+ private final int maxPendingPartitions;
- public TableRuntimeRefreshExecutor(TableManager tableRuntimes, int poolSize,
long interval) {
+ public TableRuntimeRefreshExecutor(
+ TableManager tableRuntimes, int poolSize, long interval, int
maxPendingPartitions) {
super(tableRuntimes, poolSize);
this.interval = interval;
+ this.maxPendingPartitions = maxPendingPartitions;
}
@Override
@@ -48,7 +51,8 @@ public class TableRuntimeRefreshExecutor extends
BaseTableExecutor {
private void tryEvaluatingPendingInput(TableRuntime tableRuntime, MixedTable
table) {
if (tableRuntime.isOptimizingEnabled() &&
!tableRuntime.getOptimizingStatus().isProcessing()) {
- OptimizingEvaluator evaluator = new OptimizingEvaluator(tableRuntime,
table);
+ OptimizingEvaluator evaluator =
+ new OptimizingEvaluator(tableRuntime, table, maxPendingPartitions);
if (evaluator.isNecessary()) {
OptimizingEvaluator.PendingInput pendingInput =
evaluator.getOptimizingPendingInput();
logger.debug(
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
index 5a237700d..7945dd8ee 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
@@ -414,7 +414,7 @@ public class TestDefaultOptimizingService extends
AMSTableTestBase {
private class TableRuntimeRefresher extends TableRuntimeRefreshExecutor {
public TableRuntimeRefresher() {
- super(tableService(), 1, Integer.MAX_VALUE);
+ super(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE);
}
void refreshPending() {
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewCache.java
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewCache.java
index 1b59308b9..5ac3c85c3 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewCache.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewCache.java
@@ -109,7 +109,7 @@ public class TestOverviewCache extends AMSTableTestBase {
void refreshPending() {
TableRuntimeRefreshExecutor refresher =
- new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE);
+ new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE,
Integer.MAX_VALUE);
refresher.execute(tableService().getRuntime(serverTableIdentifier().getId()));
refresher.dispose();
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
index 63a853fe1..395aa653d 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
@@ -110,7 +110,7 @@ public class TestOptimizingEvaluator extends
MixedTablePlanTestBase {
}
protected OptimizingEvaluator buildOptimizingEvaluator() {
- return new OptimizingEvaluator(getTableRuntime(), getMixedTable());
+ return new OptimizingEvaluator(getTableRuntime(), getMixedTable(), 100);
}
protected void assertEmptyInput(OptimizingEvaluator.PendingInput input) {
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java
index d5d42186c..bd2436ccb 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java
@@ -142,7 +142,7 @@ public class TestTableSummaryMetrics extends
AMSTableTestBase {
void refreshPending() {
TableRuntimeRefreshExecutor refresher =
- new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE);
+ new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE,
Integer.MAX_VALUE);
refresher.execute(tableService().getRuntime(serverTableIdentifier().getId()));
refresher.dispose();
}
diff --git a/dist/src/main/amoro-bin/conf/config.yaml
b/dist/src/main/amoro-bin/conf/config.yaml
index ae1de24e4..d887239bb 100644
--- a/dist/src/main/amoro-bin/conf/config.yaml
+++ b/dist/src/main/amoro-bin/conf/config.yaml
@@ -43,6 +43,7 @@ ams:
refresh-tables:
thread-count: 10
interval: 60000 # 1min
+ max-pending-partition-count: 100 # default 100
self-optimizing:
commit-thread-count: 10