This is an automated email from the ASF dual-hosted git repository.

jerry-024 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d0fae6e17d [flink] Fix StackOverflowError when building global index 
with many index columns and partitions (#7754)
d0fae6e17d is described below

commit d0fae6e17d6fcf74f012a1431e1d4aca6e0b7b07
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed May 6 14:33:29 2026 +0800

    [flink] Fix StackOverflowError when building global index with many index 
columns and partitions (#7754)
---
 .../paimon/flink/btree/BTreeIndexTopoBuilder.java  | 14 ++--
 .../paimon/flink/BTreeGlobalIndexITCase.java       | 90 ++++++++++++++++++++++
 2 files changed, 97 insertions(+), 7 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
index dbdfdc8fe0..b03a6cdf7a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
@@ -85,7 +85,7 @@ public class BTreeIndexTopoBuilder {
             PartitionPredicate partitionPredicate,
             Options userOptions)
             throws Exception {
-        DataStream<Committable> allCommitMessages = null;
+        List<DataStream<Committable>> allStreams = new ArrayList<>();
         for (String indexColumn : indexColumns) {
             BTreeGlobalIndexBuilder indexBuilder =
                     indexBuilderSupplier.get().withIndexField(indexColumn);
@@ -160,15 +160,15 @@ public class BTreeIndexTopoBuilder {
                                     recordsPerRange,
                                     maxParallelism);
 
-                    allCommitMessages =
-                            allCommitMessages == null
-                                    ? commitMessages
-                                    : allCommitMessages.union(commitMessages);
+                    allStreams.add(commitMessages);
                 }
             }
         }
-        if (allCommitMessages != null) {
-            commit(table, allCommitMessages);
+        if (!allStreams.isEmpty()) {
+            @SuppressWarnings("unchecked")
+            DataStream<Committable>[] rest =
+                    allStreams.subList(1, allStreams.size()).toArray(new 
DataStream[0]);
+            commit(table, allStreams.get(0).union(rest));
         }
 
         return true;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
index d6368da23f..9aaa39e446 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
@@ -23,11 +23,14 @@ import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.table.FileStoreTable;
 
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Test;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -140,4 +143,91 @@ public class BTreeGlobalIndexITCase extends 
CatalogITCaseBase {
                 "CALL sys.create_global_index(`table` => 'default.%s', 
index_column => '%s', index_type => 'btree')",
                 tableName, indexColumn);
     }
+
+    @Test
+    void testBTreeIndexWithManyPartitions() throws 
Catalog.TableNotExistException {
+        int numPartitions = 50;
+        sql(
+                "CREATE TABLE T_MANY_PT (pt INT, id INT, name STRING) 
PARTITIONED BY (pt) WITH ("
+                        + "'global-index.enabled' = 'true', "
+                        + "'row-tracking.enabled' = 'true', "
+                        + "'data-evolution.enabled' = 'true'"
+                        + ")");
+
+        for (int p = 0; p < numPartitions; p++) {
+            insertPartitionRows("T_MANY_PT", p, p * 2, 2, "r_");
+        }
+
+        buildBTreeIndexForTable("T_MANY_PT", "id");
+
+        FileStoreTable table = paimonTable("T_MANY_PT");
+        long totalRowCount =
+                table.store().newIndexFileHandler().scanEntries().stream()
+                        .filter(e -> "btree".equals(e.indexFile().indexType()))
+                        .map(IndexManifestEntry::indexFile)
+                        .mapToLong(IndexFileMeta::rowCount)
+                        .sum();
+        assertThat(totalRowCount).isEqualTo((long) numPartitions * 2);
+    }
+
+    @Test
+    void testUnionDoesNotStackOverflow() throws InterruptedException {
+        int totalUnions = 1000;
+        long stackSize = 512 * 1024; // Flink JM default
+
+        // Chained union: result = result.union(new) — causes 
StackOverflowError
+        AtomicReference<Throwable> chainedError = new AtomicReference<>();
+        Thread chainedThread =
+                new Thread(
+                        null,
+                        () -> {
+                            try {
+                                StreamExecutionEnvironment env =
+                                        
StreamExecutionEnvironment.getExecutionEnvironment();
+                                DataStream<String> all = null;
+                                for (int i = 0; i < totalUnions; i++) {
+                                    DataStream<String> s = 
env.fromElements("item-" + i);
+                                    all = all == null ? s : all.union(s);
+                                }
+                                all.print();
+                                env.getExecutionPlan();
+                            } catch (Throwable t) {
+                                chainedError.set(t);
+                            }
+                        },
+                        "chained-union-test",
+                        stackSize);
+        chainedThread.start();
+        chainedThread.join();
+        assertThat(chainedError.get()).isInstanceOf(StackOverflowError.class);
+
+        // Flat union: first.union(rest...) — no overflow at same stack size
+        AtomicReference<Throwable> flatError = new AtomicReference<>();
+        Thread flatThread =
+                new Thread(
+                        null,
+                        () -> {
+                            try {
+                                StreamExecutionEnvironment env =
+                                        
StreamExecutionEnvironment.getExecutionEnvironment();
+                                @SuppressWarnings("unchecked")
+                                DataStream<String>[] streams = new 
DataStream[totalUnions];
+                                for (int i = 0; i < totalUnions; i++) {
+                                    streams[i] = env.fromElements("item-" + i);
+                                }
+                                @SuppressWarnings("unchecked")
+                                DataStream<String>[] rest = new 
DataStream[totalUnions - 1];
+                                System.arraycopy(streams, 1, rest, 0, 
totalUnions - 1);
+                                streams[0].union(rest).print();
+                                env.getExecutionPlan();
+                            } catch (Throwable t) {
+                                flatError.set(t);
+                            }
+                        },
+                        "flat-union-test",
+                        stackSize);
+        flatThread.start();
+        flatThread.join();
+        assertThat(flatError.get()).isNull();
+    }
 }

Reply via email to