This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f8402328b4 [core] Optimize Flink BTree index topology (#7852)
f8402328b4 is described below

commit f8402328b486a51b01d801025b26b73483924ad3
Author: YeJunHao <[email protected]>
AuthorDate: Sat May 23 22:48:16 2026 +0800

    [core] Optimize Flink BTree index topology (#7852)
    
    - Reworked Flink BTree global index building to use one task-driven
    topology for all contiguous row ranges instead of building one topology
    per range.
    - Added an internal build task id to the sort key so each range keeps
    its own row-range metadata while sharing the same Flink
    source/read/sort/write chain.
    - Added coverage for parallelism calculation, many small ranges, and a
    single large range split across multiple writer subtasks.
    
    ## Why
    
    When row ranges are highly fragmented, the old implementation creates a
    separate Flink topology for each range. That can make the create-index
    procedure spend a long time constructing the JobGraph and can produce an
    oversized topology.
---
 .../paimon/flink/btree/BTreeIndexTopoBuilder.java  | 242 +++++++++++++++------
 .../paimon/flink/BTreeGlobalIndexITCase.java       |  34 +++
 .../flink/btree/BTreeIndexTopoBuilderTest.java     |  61 ++++++
 3 files changed, 271 insertions(+), 66 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
index b03a6cdf7a..e3cd198900 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
@@ -20,7 +20,9 @@ package org.apache.paimon.flink.btree;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
 import org.apache.paimon.data.serializer.BinaryRowSerializer;
 import org.apache.paimon.flink.FlinkRowData;
 import org.apache.paimon.flink.FlinkRowWrapper;
@@ -49,7 +51,9 @@ import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Range;
@@ -63,8 +67,10 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -77,6 +83,9 @@ import static 
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.splitB
 /** The {@link BTreeIndexTopoBuilder} for BTree index in Flink. */
 public class BTreeIndexTopoBuilder {
 
+    private static final String BUILD_TASK_ID_FIELD = "_BTREE_BUILD_TASK_ID";
+    private static final int BUILD_TASK_ID_FIELD_ID = -1;
+
     public static boolean buildIndex(
             StreamExecutionEnvironment env,
             Supplier<BTreeGlobalIndexBuilder> indexBuilderSupplier,
@@ -114,27 +123,34 @@ public class BTreeIndexTopoBuilder {
             List<String> selectedColumns = new ArrayList<>();
             selectedColumns.add(indexColumn);
 
-            RowType readType =
+            RowType dataReadType =
                     
SpecialFields.rowTypeWithRowId(table.rowType().project(selectedColumns));
-            int indexFieldPos = readType.getFieldIndex(indexColumn);
-            int rowIdPos = readType.getFieldIndex(SpecialFields.ROW_ID.name());
-            DataType indexFieldType = readType.getTypeAt(indexFieldPos);
+            String buildTaskIdField = buildTaskIdFieldName(dataReadType);
+            RowType sortReadType = withBuildTaskId(dataReadType, 
buildTaskIdField);
+            int taskIdPos = sortReadType.getFieldIndex(buildTaskIdField);
+            int indexFieldPos = sortReadType.getFieldIndex(indexColumn);
+            int rowIdPos = 
sortReadType.getFieldIndex(SpecialFields.ROW_ID.name());
+            DataType indexFieldType = sortReadType.getTypeAt(indexFieldPos);
 
             // 3. Calculate maximum parallelism bound
             long recordsPerRange = 
userOptions.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE);
             int maxParallelism =
                     
userOptions.get(BTreeIndexOptions.BTREE_INDEX_BUILD_MAX_PARALLELISM);
 
-            // 4. Build one topology per contiguous row range
+            // 4. Build one topology for all contiguous row ranges
             CoreOptions coreOptions = table.coreOptions();
-            ReadBuilder readBuilder = 
table.newReadBuilder().withReadType(readType);
+            ReadBuilder readBuilder = 
table.newReadBuilder().withReadType(dataReadType);
             List<String> sortColumns = new ArrayList<>();
+            sortColumns.add(buildTaskIdField);
             sortColumns.add(indexColumn);
             int partitionFieldSize = table.partitionKeys().size();
             BinaryRowSerializer binaryRowSerializer = new 
BinaryRowSerializer(partitionFieldSize);
+            List<BTreeBuildTask> buildTasks = new ArrayList<>();
+            List<BTreeSplitTask> splitTasks = new ArrayList<>();
             for (Map.Entry<BinaryRow, Map<Range, List<Split>>> partitionEntry :
                     partitionRangeSplits.entrySet()) {
                 BinaryRow partition = partitionEntry.getKey();
+                byte[] partitionBytes = 
binaryRowSerializer.serializeToBytes(partition);
                 for (Map.Entry<Range, List<Split>> entry : 
partitionEntry.getValue().entrySet()) {
                     Range range = entry.getKey();
                     List<Split> rangeSplits = entry.getValue();
@@ -142,27 +158,37 @@ public class BTreeIndexTopoBuilder {
                         continue;
                     }
 
-                    DataStream<Committable> commitMessages =
-                            executeForPartitionRange(
-                                    env,
-                                    range,
-                                    rangeSplits,
-                                    readBuilder,
-                                    indexBuilder,
-                                    partitionFieldSize,
-                                    
binaryRowSerializer.serializeToBytes(partition),
-                                    indexFieldPos,
-                                    rowIdPos,
-                                    indexFieldType,
-                                    sortColumns,
-                                    coreOptions,
-                                    readType,
-                                    recordsPerRange,
-                                    maxParallelism);
-
-                    allStreams.add(commitMessages);
+                    int taskId = buildTasks.size();
+                    buildTasks.add(new BTreeBuildTask(taskId, range, 
partitionBytes));
+                    for (Split split : rangeSplits) {
+                        splitTasks.add(new BTreeSplitTask(taskId, split));
+                    }
                 }
             }
+
+            if (buildTasks.isEmpty()) {
+                return false;
+            }
+
+            DataStream<Committable> commitMessages =
+                    executeForBuildTasks(
+                            env,
+                            buildTasks,
+                            splitTasks,
+                            readBuilder,
+                            indexBuilder,
+                            partitionFieldSize,
+                            taskIdPos,
+                            indexFieldPos,
+                            rowIdPos,
+                            indexFieldType,
+                            sortColumns,
+                            coreOptions,
+                            sortReadType,
+                            recordsPerRange,
+                            maxParallelism);
+
+            allStreams.add(commitMessages);
         }
         if (!allStreams.isEmpty()) {
             @SuppressWarnings("unchecked")
@@ -192,14 +218,14 @@ public class BTreeIndexTopoBuilder {
         }
     }
 
-    protected static DataStream<Committable> executeForPartitionRange(
+    protected static DataStream<Committable> executeForBuildTasks(
             StreamExecutionEnvironment env,
-            Range range,
-            List<Split> rangeSplits,
+            List<BTreeBuildTask> buildTasks,
+            List<BTreeSplitTask> splitTasks,
             ReadBuilder readBuilder,
             BTreeGlobalIndexBuilder indexBuilder,
             int partitionFieldSize,
-            byte[] partition,
+            int taskIdPos,
             int indexFieldPos,
             int rowIdPos,
             DataType indexFieldType,
@@ -208,21 +234,18 @@ public class BTreeIndexTopoBuilder {
             RowType readType,
             long recordsPerRange,
             int maxParallelism) {
-        int parallelism = Math.max((int) (range.count() / recordsPerRange), 1);
-        parallelism = Math.min(parallelism, maxParallelism);
+        int parallelism = calculateParallelism(buildTasks, recordsPerRange, 
maxParallelism);
 
-        DataStream<Split> sourceStream =
+        DataStream<BTreeSplitTask> sourceStream =
                 StreamExecutionEnvironmentUtils.fromData(
-                                env,
-                                new JavaTypeInfo<>(Split.class),
-                                rangeSplits.toArray(new Split[0]))
-                        .name("Global Index Source " + " range=" + range)
+                                env, splitTasks, new 
JavaTypeInfo<>(BTreeSplitTask.class))
+                        .name("Global Index Source")
                         .setParallelism(1);
 
         DataStream<RowData> rowDataStream =
                 sourceStream
                         .transform(
-                                "Read Data " + range,
+                                "Read Data",
                                 
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(readType)),
                                 new ReadDataOperator(readBuilder))
                         .setParallelism(parallelism);
@@ -243,19 +266,51 @@ public class BTreeIndexTopoBuilder {
 
         return sortedStream
                 .transform(
-                        "write-btree-index " + range,
+                        "write-btree-index",
                         new CommittableTypeInfo(),
                         new WriteIndexOperator(
-                                range,
+                                buildTasks,
                                 partitionFieldSize,
-                                partition,
                                 indexBuilder,
+                                taskIdPos,
                                 indexFieldPos,
                                 rowIdPos,
                                 indexFieldType))
                 .setParallelism(parallelism);
     }
 
+    static int calculateParallelism(
+            List<BTreeBuildTask> buildTasks, long recordsPerRange, int 
maxParallelism) {
+        long totalRecords = 0;
+        for (BTreeBuildTask task : buildTasks) {
+            long count = task.rowRange.count();
+            if (Long.MAX_VALUE - totalRecords < count) {
+                totalRecords = Long.MAX_VALUE;
+            } else {
+                totalRecords += count;
+            }
+        }
+
+        long parallelism = Math.max(totalRecords / recordsPerRange, 1);
+        return (int) Math.min(parallelism, maxParallelism);
+    }
+
+    private static String buildTaskIdFieldName(RowType readType) {
+        String fieldName = BUILD_TASK_ID_FIELD;
+        while (readType.containsField(fieldName)) {
+            fieldName = "_" + fieldName;
+        }
+        return fieldName;
+    }
+
+    private static RowType withBuildTaskId(RowType readType, String 
buildTaskIdField) {
+        List<DataField> fields = new ArrayList<>();
+        fields.add(
+                new DataField(BUILD_TASK_ID_FIELD_ID, buildTaskIdField, 
DataTypes.INT().notNull()));
+        fields.addAll(readType.getFields());
+        return new RowType(readType.isNullable(), fields);
+    }
+
     private static void commit(FileStoreTable table, DataStream<Committable> 
written) {
         OneInputStreamOperatorFactory<Committable, Committable> 
committerOperator =
                 new CommitterOperatorFactory<>(
@@ -276,7 +331,7 @@ public class BTreeIndexTopoBuilder {
     private static class ReadDataOperator
             extends 
org.apache.flink.table.runtime.operators.TableStreamOperator<RowData>
             implements 
org.apache.flink.streaming.api.operators.OneInputStreamOperator<
-                    Split, RowData> {
+                    BTreeSplitTask, RowData> {
 
         private static final long serialVersionUID = 1L;
 
@@ -295,43 +350,50 @@ public class BTreeIndexTopoBuilder {
         }
 
         @Override
-        public void processElement(StreamRecord<Split> element) throws 
Exception {
-            Split split = element.getValue();
-            try (RecordReader<InternalRow> reader = 
tableRead.createReader(split)) {
+        public void processElement(StreamRecord<BTreeSplitTask> element) 
throws Exception {
+            BTreeSplitTask buildTask = element.getValue();
+            GenericRow taskId = GenericRow.of(buildTask.taskId);
+            try (RecordReader<InternalRow> reader = 
tableRead.createReader(buildTask.split)) {
                 reader.forEachRemaining(
-                        row -> output.collect(new StreamRecord<>(new 
FlinkRowData(row))));
+                        row ->
+                                output.collect(
+                                        new StreamRecord<>(
+                                                new FlinkRowData(new 
JoinedRow(taskId, row)))));
             }
         }
     }
 
     private static class WriteIndexOperator extends 
BoundedOneInputOperator<RowData, Committable> {
 
-        private final Range rowRange;
-        private final byte[] partition;
+        private final List<BTreeBuildTask> buildTasks;
         private final int partitionFieldSize;
         private final BTreeGlobalIndexBuilder builder;
+        private final int taskIdPos;
         private final int indexFieldPos;
         private final int rowIdPos;
         private final DataType indexFieldType;
 
         private transient long counter;
+        private transient BTreeBuildTask currentTask;
+        private transient BinaryRow currentPartition;
         private transient GlobalIndexParallelWriter currentWriter;
         private transient List<CommitMessage> commitMessages;
+        private transient Map<Integer, BTreeBuildTask> buildTasksById;
         private transient InternalRow.FieldGetter indexFieldGetter;
         private transient BinaryRowSerializer binaryRowSerializer;
 
         public WriteIndexOperator(
-                Range rowRange,
+                List<BTreeBuildTask> buildTasks,
                 int partitionFieldSize,
-                byte[] partition,
                 BTreeGlobalIndexBuilder builder,
+                int taskIdPos,
                 int indexFieldPos,
                 int rowIdPos,
                 DataType indexFieldType) {
-            this.rowRange = rowRange;
+            this.buildTasks = buildTasks;
             this.partitionFieldSize = partitionFieldSize;
-            this.partition = partition;
             this.builder = builder;
+            this.taskIdPos = taskIdPos;
             this.indexFieldPos = indexFieldPos;
             this.rowIdPos = rowIdPos;
             this.indexFieldType = indexFieldType;
@@ -341,6 +403,10 @@ public class BTreeIndexTopoBuilder {
         public void open() throws Exception {
             super.open();
             commitMessages = new ArrayList<>();
+            buildTasksById = new HashMap<>();
+            for (BTreeBuildTask task : buildTasks) {
+                buildTasksById.put(task.taskId, task);
+            }
             indexFieldGetter = InternalRow.createFieldGetter(indexFieldType, 
indexFieldPos);
             this.binaryRowSerializer = new 
BinaryRowSerializer(partitionFieldSize);
         }
@@ -348,14 +414,20 @@ public class BTreeIndexTopoBuilder {
         @Override
         public void processElement(StreamRecord<RowData> element) throws 
IOException {
             InternalRow row = new FlinkRowWrapper(element.getValue());
+            int taskId = row.getInt(taskIdPos);
+            BTreeBuildTask task = buildTasksById.get(taskId);
+            if (task == null) {
+                throw new IllegalArgumentException("Unknown BTree build task 
id: " + taskId);
+            }
+
+            if (currentTask == null || currentTask.taskId != taskId) {
+                flushCurrentWriter();
+                currentTask = task;
+                currentPartition = 
binaryRowSerializer.deserializeFromBytes(task.partition);
+            }
+
             if (currentWriter != null && counter >= builder.recordsPerRange()) 
{
-                commitMessages.add(
-                        builder.flushIndex(
-                                rowRange,
-                                currentWriter.finish(),
-                                
binaryRowSerializer.deserializeFromBytes(partition)));
-                currentWriter = null;
-                counter = 0;
+                flushCurrentWriter();
             }
 
             counter++;
@@ -364,19 +436,13 @@ public class BTreeIndexTopoBuilder {
                 currentWriter = builder.createWriter();
             }
 
-            long localRowId = row.getLong(rowIdPos) - rowRange.from;
+            long localRowId = row.getLong(rowIdPos) - 
currentTask.rowRange.from;
             currentWriter.write(indexFieldGetter.getFieldOrNull(row), 
localRowId);
         }
 
         @Override
         public void endInput() throws IOException {
-            if (counter > 0) {
-                commitMessages.add(
-                        builder.flushIndex(
-                                rowRange,
-                                currentWriter.finish(),
-                                
binaryRowSerializer.deserializeFromBytes(partition)));
-            }
+            flushCurrentWriter();
             for (CommitMessage message : commitMessages) {
                 output.collect(
                         new StreamRecord<>(
@@ -384,5 +450,49 @@ public class BTreeIndexTopoBuilder {
             }
             commitMessages.clear();
         }
+
+        private void flushCurrentWriter() throws IOException {
+            if (counter > 0 && currentWriter != null) {
+                commitMessages.add(
+                        builder.flushIndex(
+                                currentTask.rowRange, currentWriter.finish(), 
currentPartition));
+            }
+            currentWriter = null;
+            counter = 0;
+        }
+    }
+
+    /** Metadata for one BTree index build range. */
+    public static class BTreeBuildTask implements Serializable {
+
+        private static final long serialVersionUID = 1L;
+
+        private int taskId;
+        private Range rowRange;
+        private byte[] partition;
+
+        public BTreeBuildTask() {}
+
+        BTreeBuildTask(int taskId, Range rowRange, byte[] partition) {
+            this.taskId = taskId;
+            this.rowRange = rowRange;
+            this.partition = partition;
+        }
+    }
+
+    /** Split assigned to one BTree index build task. */
+    public static class BTreeSplitTask implements Serializable {
+
+        private static final long serialVersionUID = 1L;
+
+        private int taskId;
+        private Split split;
+
+        public BTreeSplitTask() {}
+
+        BTreeSplitTask(int taskId, Split split) {
+            this.taskId = taskId;
+            this.split = split;
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
index 9aaa39e446..9433034d39 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
@@ -144,6 +144,40 @@ public class BTreeGlobalIndexITCase extends 
CatalogITCaseBase {
                 tableName, indexColumn);
     }
 
+    @Test
+    void testBTreeIndexWithSingleRangeAndParallelWriters() throws 
Catalog.TableNotExistException {
+        sql(
+                "CREATE TABLE T_SINGLE_RANGE_PARALLEL (id INT, name STRING) 
WITH ("
+                        + "'global-index.enabled' = 'true', "
+                        + "'row-tracking.enabled' = 'true', "
+                        + "'data-evolution.enabled' = 'true'"
+                        + ")");
+        String values =
+                IntStream.range(0, 2_000)
+                        .mapToObj(i -> String.format("(%s, %s)", i, "'name_" + 
i + "'"))
+                        .collect(Collectors.joining(","));
+        sql("INSERT INTO T_SINGLE_RANGE_PARALLEL VALUES " + values);
+        sql(
+                "CALL sys.create_global_index(`table` => 
'default.T_SINGLE_RANGE_PARALLEL', "
+                        + "index_column => 'id', index_type => 'btree', "
+                        + "options => 'btree-index.records-per-range=100;"
+                        + "btree-index.build.max-parallelism=4')");
+
+        FileStoreTable table = paimonTable("T_SINGLE_RANGE_PARALLEL");
+        List<IndexFileMeta> btreeEntries =
+                table.store().newIndexFileHandler().scanEntries().stream()
+                        .map(IndexManifestEntry::indexFile)
+                        .filter(f -> "btree".equals(f.indexType()))
+                        .collect(Collectors.toList());
+
+        long totalRowCount = 
btreeEntries.stream().mapToLong(IndexFileMeta::rowCount).sum();
+        assertThat(btreeEntries).hasSizeGreaterThan(1);
+        assertThat(totalRowCount).isEqualTo(2_000L);
+
+        assertThat(sql("SELECT * FROM T_SINGLE_RANGE_PARALLEL WHERE id = 
1500"))
+                .containsOnly(Row.of(1500, "name_1500"));
+    }
+
     @Test
     void testBTreeIndexWithManyPartitions() throws 
Catalog.TableNotExistException {
         int numPartitions = 50;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilderTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilderTest.java
new file mode 100644
index 0000000000..107e0ac048
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilderTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.btree;
+
+import org.apache.paimon.flink.btree.BTreeIndexTopoBuilder.BTreeBuildTask;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link BTreeIndexTopoBuilder}. */
+public class BTreeIndexTopoBuilderTest {
+
+    @Test
+    public void testCalculateParallelismByTotalRowsInsteadOfRangeCount() {
+        List<BTreeBuildTask> tasks = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            tasks.add(new BTreeBuildTask(i, new Range(i * 10L, i * 10L + 9), 
new byte[0]));
+        }
+
+        assertThat(BTreeIndexTopoBuilder.calculateParallelism(tasks, 1000L, 
4096)).isEqualTo(1);
+    }
+
+    @Test
+    public void testCalculateParallelismHonorsMaxParallelism() {
+        List<BTreeBuildTask> tasks = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            tasks.add(new BTreeBuildTask(i, new Range(i * 1000L, i * 1000L + 
999), new byte[0]));
+        }
+
+        assertThat(BTreeIndexTopoBuilder.calculateParallelism(tasks, 1000L, 
16)).isEqualTo(16);
+    }
+
+    @Test
+    public void testCalculateParallelismKeepsSingleRangeBehavior() {
+        List<BTreeBuildTask> tasks = new ArrayList<>();
+        tasks.add(new BTreeBuildTask(0, new Range(0, 1499), new byte[0]));
+
+        assertThat(BTreeIndexTopoBuilder.calculateParallelism(tasks, 1000L, 
16)).isEqualTo(1);
+    }
+}

Reply via email to