This is an automated email from the ASF dual-hosted git repository.
jerry-024 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d0fae6e17d [flink] Fix StackOverflowError when building global index
with many index columns and partitions (#7754)
d0fae6e17d is described below
commit d0fae6e17d6fcf74f012a1431e1d4aca6e0b7b07
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed May 6 14:33:29 2026 +0800
[flink] Fix StackOverflowError when building global index with many index
columns and partitions (#7754)
---
.../paimon/flink/btree/BTreeIndexTopoBuilder.java | 14 ++--
.../paimon/flink/BTreeGlobalIndexITCase.java | 90 ++++++++++++++++++++++
2 files changed, 97 insertions(+), 7 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
index dbdfdc8fe0..b03a6cdf7a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
@@ -85,7 +85,7 @@ public class BTreeIndexTopoBuilder {
PartitionPredicate partitionPredicate,
Options userOptions)
throws Exception {
- DataStream<Committable> allCommitMessages = null;
+ List<DataStream<Committable>> allStreams = new ArrayList<>();
for (String indexColumn : indexColumns) {
BTreeGlobalIndexBuilder indexBuilder =
indexBuilderSupplier.get().withIndexField(indexColumn);
@@ -160,15 +160,15 @@ public class BTreeIndexTopoBuilder {
recordsPerRange,
maxParallelism);
- allCommitMessages =
- allCommitMessages == null
- ? commitMessages
- : allCommitMessages.union(commitMessages);
+ allStreams.add(commitMessages);
}
}
}
- if (allCommitMessages != null) {
- commit(table, allCommitMessages);
+ if (!allStreams.isEmpty()) {
+ @SuppressWarnings("unchecked")
+ DataStream<Committable>[] rest =
+ allStreams.subList(1, allStreams.size()).toArray(new
DataStream[0]);
+ commit(table, allStreams.get(0).union(rest));
}
return true;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
index d6368da23f..9aaa39e446 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
@@ -23,11 +23,14 @@ import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -140,4 +143,91 @@ public class BTreeGlobalIndexITCase extends
CatalogITCaseBase {
"CALL sys.create_global_index(`table` => 'default.%s',
index_column => '%s', index_type => 'btree')",
tableName, indexColumn);
}
+
+ @Test
+ void testBTreeIndexWithManyPartitions() throws
Catalog.TableNotExistException {
+ int numPartitions = 50;
+ sql(
+ "CREATE TABLE T_MANY_PT (pt INT, id INT, name STRING)
PARTITIONED BY (pt) WITH ("
+ + "'global-index.enabled' = 'true', "
+ + "'row-tracking.enabled' = 'true', "
+ + "'data-evolution.enabled' = 'true'"
+ + ")");
+
+ for (int p = 0; p < numPartitions; p++) {
+ insertPartitionRows("T_MANY_PT", p, p * 2, 2, "r_");
+ }
+
+ buildBTreeIndexForTable("T_MANY_PT", "id");
+
+ FileStoreTable table = paimonTable("T_MANY_PT");
+ long totalRowCount =
+ table.store().newIndexFileHandler().scanEntries().stream()
+ .filter(e -> "btree".equals(e.indexFile().indexType()))
+ .map(IndexManifestEntry::indexFile)
+ .mapToLong(IndexFileMeta::rowCount)
+ .sum();
+ assertThat(totalRowCount).isEqualTo((long) numPartitions * 2);
+ }
+
+ @Test
+ void testUnionDoesNotStackOverflow() throws InterruptedException {
+ int totalUnions = 1000;
+ long stackSize = 512 * 1024; // Flink JM default
+
+ // Chained union: result = result.union(new) — causes
StackOverflowError
+ AtomicReference<Throwable> chainedError = new AtomicReference<>();
+ Thread chainedThread =
+ new Thread(
+ null,
+ () -> {
+ try {
+ StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<String> all = null;
+ for (int i = 0; i < totalUnions; i++) {
+ DataStream<String> s =
env.fromElements("item-" + i);
+ all = all == null ? s : all.union(s);
+ }
+ all.print();
+ env.getExecutionPlan();
+ } catch (Throwable t) {
+ chainedError.set(t);
+ }
+ },
+ "chained-union-test",
+ stackSize);
+ chainedThread.start();
+ chainedThread.join();
+ assertThat(chainedError.get()).isInstanceOf(StackOverflowError.class);
+
+ // Flat union: first.union(rest...) — no overflow at same stack size
+ AtomicReference<Throwable> flatError = new AtomicReference<>();
+ Thread flatThread =
+ new Thread(
+ null,
+ () -> {
+ try {
+ StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment();
+ @SuppressWarnings("unchecked")
+ DataStream<String>[] streams = new
DataStream[totalUnions];
+ for (int i = 0; i < totalUnions; i++) {
+ streams[i] = env.fromElements("item-" + i);
+ }
+ @SuppressWarnings("unchecked")
+ DataStream<String>[] rest = new
DataStream[totalUnions - 1];
+ System.arraycopy(streams, 1, rest, 0,
totalUnions - 1);
+ streams[0].union(rest).print();
+ env.getExecutionPlan();
+ } catch (Throwable t) {
+ flatError.set(t);
+ }
+ },
+ "flat-union-test",
+ stackSize);
+ flatThread.start();
+ flatThread.join();
+ assertThat(flatError.get()).isNull();
+ }
}