This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 18a770106f [flink] [spark] Fix btree procedure GlobalIndexMeta row 
range not accurate (#7354)
18a770106f is described below

commit 18a770106f9360e1755a5b17a066d9a4cc3c2a49
Author: YeJunHao <[email protected]>
AuthorDate: Tue Mar 10 18:08:43 2026 +0800

    [flink] [spark] Fix btree procedure GlobalIndexMeta row range not accurate 
(#7354)
---
 .../globalindex/GlobalIndexBuilderUtils.java       |  11 +-
 .../globalindex/btree/BTreeGlobalIndexBuilder.java | 228 ++++++++++++++++-----
 .../test/java/org/apache/paimon/JavaPyE2ETest.java |   9 +-
 .../btree/BTreeGlobalIndexBuilderSplitTest.java    |  91 ++++++++
 .../btree/BTreeGlobalIndexBuilderTest.java         |   7 +-
 .../paimon/table/BtreeGlobalIndexTableTest.java    |  23 ++-
 .../paimon/flink/btree/BTreeIndexTopoBuilder.java  | 198 +++++++++++++-----
 .../paimon/flink/BTreeGlobalIndexITCase.java       |  74 +++++++
 .../globalindex/DefaultGlobalIndexBuilder.java     |   9 +-
 .../globalindex/btree/BTreeIndexTopoBuilder.java   | 137 ++++++++-----
 .../procedure/CreateGlobalIndexProcedure.java      |   6 +-
 .../procedure/CreateGlobalIndexProcedureTest.scala |  11 +-
 12 files changed, 617 insertions(+), 187 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java
index b53a0641ab..085423efa8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.globalindex;
 
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.GlobalIndexMeta;
 import org.apache.paimon.index.IndexFileMeta;
@@ -35,7 +37,9 @@ import java.util.List;
 public class GlobalIndexBuilderUtils {
 
     public static List<IndexFileMeta> toIndexFileMetas(
-            FileStoreTable table,
+            FileIO fileIO,
+            IndexPathFactory indexPathFactory,
+            CoreOptions options,
             Range range,
             int indexFieldId,
             String indexType,
@@ -44,12 +48,11 @@ public class GlobalIndexBuilderUtils {
         List<IndexFileMeta> results = new ArrayList<>();
         for (ResultEntry entry : entries) {
             String fileName = entry.fileName();
-            GlobalIndexFileReadWrite readWrite = 
createGlobalIndexFileReadWrite(table);
-            long fileSize = readWrite.fileSize(fileName);
+            long fileSize = 
fileIO.getFileSize(indexPathFactory.toPath(fileName));
             GlobalIndexMeta globalIndexMeta =
                     new GlobalIndexMeta(range.from, range.to, indexFieldId, 
null, entry.meta());
 
-            Path externalPathDir = 
table.coreOptions().globalIndexExternalPath();
+            Path externalPathDir = options.globalIndexExternalPath();
             String externalPathString = null;
             if (externalPathDir != null) {
                 Path externalPath = new Path(externalPathDir, fileName);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
index 08b31d06b4..4c051741b2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
@@ -19,13 +19,14 @@
 package org.apache.paimon.globalindex.btree;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalRow.FieldGetter;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
 import org.apache.paimon.globalindex.GlobalIndexWriter;
 import org.apache.paimon.globalindex.ResultEntry;
-import org.apache.paimon.globalindex.RowIdIndexFieldsExtractor;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
@@ -46,8 +47,10 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CloseableIterator;
 import org.apache.paimon.utils.MutableObjectIteratorAdapter;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RangeHelper;
 
 import javax.annotation.Nullable;
 
@@ -55,9 +58,13 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Objects;
+import java.util.Map;
+import java.util.function.Supplier;
 import java.util.stream.IntStream;
 
 import static java.util.Collections.singletonList;
@@ -82,13 +89,12 @@ public class BTreeGlobalIndexBuilder implements 
Serializable {
 
     // readRowType is composed by partition fields, indexed field and _ROW_ID 
field
     private RowType readRowType;
-    private RowIdIndexFieldsExtractor extractor;
 
     @Nullable private PartitionPredicate partitionPredicate;
 
     public BTreeGlobalIndexBuilder(Table table) {
         this.table = (FileStoreTable) table;
-        this.rowType = table.rowType();
+        this.rowType = this.table.rowType();
         this.options = this.table.coreOptions().toConfiguration();
         this.recordsPerRange =
                 (long) 
(options.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE) * FLOATING);
@@ -110,14 +116,11 @@ public class BTreeGlobalIndexBuilder implements 
Serializable {
                 indexField,
                 table.fullName());
         this.indexField = rowType.getField(indexField);
-        List<String> readColumns = new ArrayList<>(table.partitionKeys());
-        readColumns.addAll(
-                SpecialFields.rowTypeWithRowId(new 
RowType(singletonList(this.indexField)))
-                        .getFieldNames());
+        List<String> readColumns = new ArrayList<>();
+        readColumns.add(this.indexField.name());
+        readColumns.add(SpecialFields.ROW_ID.name());
+
         this.readRowType = 
SpecialFields.rowTypeWithRowId(table.rowType()).project(readColumns);
-        this.extractor =
-                new RowIdIndexFieldsExtractor(
-                        this.readRowType, table.partitionKeys(), 
this.indexField.name());
         return this;
     }
 
@@ -136,12 +139,10 @@ public class BTreeGlobalIndexBuilder implements 
Serializable {
         return snapshotReader.read().dataSplits();
     }
 
-    public List<CommitMessage> build(List<DataSplit> splits, IOManager 
ioManager)
-            throws IOException {
-        Range rowRange = calcRowRange(splits);
-        if (splits.isEmpty() || rowRange == null) {
-            return Collections.emptyList();
-        }
+    @VisibleForTesting
+    public List<CommitMessage> build(DataSplit split, IOManager ioManager) 
throws IOException {
+        BinaryRow partition = split.partition();
+        Range rowRange = calcRowRange(split);
 
         CoreOptions options = new CoreOptions(this.options);
         BinaryExternalSortBuffer buffer =
@@ -157,7 +158,7 @@ public class BTreeGlobalIndexBuilder implements 
Serializable {
                         options.writeBufferSpillDiskSize(),
                         options.sequenceFieldSortOrderIsAscending());
 
-        List<Split> splitList = new ArrayList<>(splits);
+        List<Split> splitList = Collections.singletonList(split);
         RecordReader<InternalRow> reader =
                 
table.newReadBuilder().withReadType(readRowType).newRead().createReader(splitList);
         try (CloseableIterator<InternalRow> iterator = 
reader.toCloseableIterator()) {
@@ -172,7 +173,7 @@ public class BTreeGlobalIndexBuilder implements 
Serializable {
         Iterator<InternalRow> iterator =
                 new MutableObjectIteratorAdapter<>(
                         buffer.sortedIterator(), new 
BinaryRow(readRowType.getFieldCount()));
-        List<CommitMessage> result = build(rowRange, iterator);
+        List<CommitMessage> result = buildForSinglePartition(rowRange, 
partition, iterator);
         buffer.clear();
 
         return result;
@@ -182,47 +183,34 @@ public class BTreeGlobalIndexBuilder implements 
Serializable {
         return recordsPerRange;
     }
 
-    public RowIdIndexFieldsExtractor extractor() {
-        return extractor;
-    }
-
-    public List<CommitMessage> build(Range rowRange, Iterator<InternalRow> 
data)
-            throws IOException {
+    public List<CommitMessage> buildForSinglePartition(
+            Range rowRange, BinaryRow partition, Iterator<InternalRow> data) 
throws IOException {
         long counter = 0;
-        BinaryRow currentPart = null;
         GlobalIndexParallelWriter currentWriter = null;
         List<CommitMessage> commitMessages = new ArrayList<>();
+        FieldGetter indexFieldGetter = 
InternalRow.createFieldGetter(indexField.type(), 0);
 
         while (data.hasNext()) {
             InternalRow row = data.next();
-            BinaryRow partRow = extractor.extractPartition(row);
-
-            // the input is sorted by <partition, indexedField>
-            if (currentWriter != null) {
-                if (!Objects.equals(partRow, currentPart) || counter >= 
recordsPerRange) {
-                    commitMessages.add(flushIndex(rowRange, 
currentWriter.finish(), currentPart));
-                    currentWriter = null;
-                    counter = 0;
-                }
+
+            if (currentWriter != null && counter >= recordsPerRange) {
+                commitMessages.add(flushIndex(rowRange, 
currentWriter.finish(), partition));
+                currentWriter = null;
+                counter = 0;
             }
 
-            // write <value, rowId> pair to index file
-            currentPart = partRow;
             counter++;
-
             if (currentWriter == null) {
                 currentWriter = createWriter();
             }
 
-            // convert the original rowId to local rowId
-            long localRowId = extractor.extractRowId(row) - rowRange.from;
-            currentWriter.write(extractor.extractIndexField(row), localRowId);
+            long localRowId = row.getLong(1) - rowRange.from;
+            currentWriter.write(indexFieldGetter.getFieldOrNull(row), 
localRowId);
         }
 
         if (counter > 0) {
-            commitMessages.add(flushIndex(rowRange, currentWriter.finish(), 
currentPart));
+            commitMessages.add(flushIndex(rowRange, currentWriter.finish(), 
partition));
         }
-
         return commitMessages;
     }
 
@@ -242,22 +230,160 @@ public class BTreeGlobalIndexBuilder implements 
Serializable {
             Range rowRange, List<ResultEntry> resultEntries, BinaryRow 
partition)
             throws IOException {
         List<IndexFileMeta> indexFileMetas =
-                toIndexFileMetas(table, rowRange, indexField.id(), indexType, 
resultEntries);
+                toIndexFileMetas(
+                        table.fileIO(),
+                        table.store().pathFactory().globalIndexFileFactory(),
+                        table.coreOptions(),
+                        rowRange,
+                        indexField.id(),
+                        indexType,
+                        resultEntries);
         DataIncrement dataIncrement = 
DataIncrement.indexIncrement(indexFileMetas);
         return new CommitMessageImpl(
                 partition, 0, null, dataIncrement, 
CompactIncrement.emptyIncrement());
     }
 
-    public static Range calcRowRange(List<DataSplit> dataSplits) {
-        long start = Long.MAX_VALUE;
-        long end = Long.MIN_VALUE;
+    public static Range calcRowRange(DataSplit dataSplit) {
+        List<Range> ranges = calcRowRanges(singletonList(dataSplit));
+        if (ranges.isEmpty()) {
+            return null;
+        }
+        return new Range(ranges.get(0).from, ranges.get(ranges.size() - 1).to);
+    }
+
+    public static List<Range> calcRowRanges(List<DataSplit> dataSplits) {
+        List<Range> ranges = new ArrayList<>();
         for (DataSplit dataSplit : dataSplits) {
             for (DataFileMeta file : dataSplit.dataFiles()) {
-                Range range = file.nonNullRowIdRange();
-                start = Math.min(start, range.from);
-                end = Math.max(end, range.to);
+                ranges.add(file.nonNullRowIdRange());
             }
         }
-        return start == Long.MAX_VALUE ? null : new Range(start, end);
+        return Range.sortAndMergeOverlap(ranges, true);
+    }
+
+    public static List<DataSplit> splitByContiguousRowRange(List<DataSplit> 
splits) {
+        List<DataSplit> result = new ArrayList<>();
+        for (DataSplit split : splits) {
+            result.addAll(splitByContiguousRowRange(split));
+        }
+        return result;
+    }
+
+    public static Map<BinaryRow, Map<Range, List<DataSplit>>> 
groupSplitsByRange(
+            List<DataSplit> splits) {
+        Map<BinaryRow, List<Pair<Range, DataSplit>>> partitionSplitRanges = 
new HashMap<>();
+        for (DataSplit split : splits) {
+            Range splitRange = calcRowRange(split);
+            if (splitRange == null) {
+                continue;
+            }
+            BinaryRow partition = split.partition();
+            partitionSplitRanges
+                    .computeIfAbsent(partition, p -> new ArrayList<>())
+                    .add(Pair.of(splitRange, split));
+        }
+
+        Map<BinaryRow, Map<Range, List<DataSplit>>> result = new HashMap<>();
+        for (Map.Entry<BinaryRow, List<Pair<Range, DataSplit>>> partitionEntry 
:
+                partitionSplitRanges.entrySet()) {
+            List<Pair<Range, DataSplit>> splitRanges = 
partitionEntry.getValue();
+            splitRanges.sort(
+                    Comparator.comparingLong((Pair<Range, DataSplit> e) -> 
e.getKey().from)
+                            .thenComparingLong(e -> e.getKey().to));
+
+            Map<Range, List<DataSplit>> partitionRanges = new 
LinkedHashMap<>();
+            Range current = null;
+            List<DataSplit> currentSplits = new ArrayList<>();
+            for (Map.Entry<Range, DataSplit> entry : splitRanges) {
+                Range splitRange = entry.getKey();
+                if (current == null) {
+                    current = splitRange;
+                    currentSplits.add(entry.getValue());
+                    continue;
+                }
+                Range merged = Range.union(current, splitRange);
+                if (merged != null) {
+                    current = merged;
+                    currentSplits.add(entry.getValue());
+                } else {
+                    partitionRanges.put(current, currentSplits);
+                    current = splitRange;
+                    currentSplits = new ArrayList<>();
+                    currentSplits.add(entry.getValue());
+                }
+            }
+            if (current != null) {
+                partitionRanges.put(current, currentSplits);
+            }
+            result.put(partitionEntry.getKey(), partitionRanges);
+        }
+
+        return result;
+    }
+
+    private static List<DataSplit> splitByContiguousRowRange(DataSplit split) {
+        List<DataFileMeta> input = split.dataFiles();
+        RangeHelper<DataFileMeta> rangeHelper = new 
RangeHelper<>(DataFileMeta::nonNullRowIdRange);
+        List<List<DataFileMeta>> ranges = 
rangeHelper.mergeOverlappingRanges(input);
+
+        Supplier<DataSplit.Builder> builderSupplier =
+                () ->
+                        DataSplit.builder()
+                                .withSnapshot(split.snapshotId())
+                                .withPartition(split.partition())
+                                .withBucket(split.bucket())
+                                .withBucketPath(split.bucketPath())
+                                .withTotalBuckets(split.totalBuckets())
+                                .isStreaming(split.isStreaming())
+                                .rawConvertible(split.rawConvertible());
+        return packByContiguousRanges(builderSupplier, ranges);
+    }
+
+    private static List<DataSplit> packByContiguousRanges(
+            Supplier<DataSplit.Builder> builderFactory, 
List<List<DataFileMeta>> ranges) {
+        if (ranges.isEmpty()) {
+            return new ArrayList<>();
+        }
+
+        List<DataSplit> result = new ArrayList<>();
+        List<DataFileMeta> currentSegment = new ArrayList<>();
+        long currentMaxRowId = Long.MIN_VALUE;
+
+        for (List<DataFileMeta> rangeFiles : ranges) {
+            long minRowId = minRowId(rangeFiles);
+            long maxRowId = maxRowId(rangeFiles);
+            if (currentSegment.isEmpty() || areContiguous(currentMaxRowId, 
minRowId)) {
+                currentSegment.addAll(rangeFiles);
+                currentMaxRowId = maxRowId;
+            } else {
+                DataSplit.Builder builder = builderFactory.get();
+                builder.withDataFiles(currentSegment);
+                result.add(builder.build());
+                currentSegment = new ArrayList<>(rangeFiles);
+                currentMaxRowId = maxRowId;
+            }
+        }
+
+        DataSplit.Builder builder = builderFactory.get();
+        builder.withDataFiles(currentSegment);
+        result.add(builder.build());
+        return result;
+    }
+
+    private static long minRowId(List<DataFileMeta> files) {
+        return files.stream()
+                .mapToLong(f -> f.nonNullRowIdRange().from)
+                .min()
+                .orElse(Long.MAX_VALUE);
+    }
+
+    private static long maxRowId(List<DataFileMeta> files) {
+        return files.stream().mapToLong(f -> 
f.nonNullRowIdRange().to).max().orElse(Long.MIN_VALUE);
+    }
+
+    private static boolean areContiguous(long previousMaxRowId, long 
currentMinRowId) {
+        // Contiguous means no gap between adjacent ranges.
+        // e.g. previous max == current min (as requested) or previous max + 1 
== current min.
+        return previousMaxRowId >= currentMinRowId - 1;
     }
 }
diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java 
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index c63456a2c2..a9bcfc9bbf 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -515,7 +515,8 @@ public class JavaPyE2ETest {
         BTreeGlobalIndexBuilder builder =
                 new 
BTreeGlobalIndexBuilder(table).withIndexType("btree").withIndexField("k");
         try (BatchTableCommit commit = writeBuilder.newCommit()) {
-            commit.commit(builder.build(builder.scan(), 
IOManager.create(warehouse.toString())));
+            commit.commit(
+                    builder.build(builder.scan().get(0), 
IOManager.create(warehouse.toString())));
         }
 
         // assert index
@@ -583,7 +584,8 @@ public class JavaPyE2ETest {
         BTreeGlobalIndexBuilder builder =
                 new 
BTreeGlobalIndexBuilder(table).withIndexType("btree").withIndexField("k");
         try (BatchTableCommit commit = writeBuilder.newCommit()) {
-            commit.commit(builder.build(builder.scan(), 
IOManager.create(warehouse.toString())));
+            commit.commit(
+                    builder.build(builder.scan().get(0), 
IOManager.create(warehouse.toString())));
         }
 
         // assert index
@@ -653,7 +655,8 @@ public class JavaPyE2ETest {
         BTreeGlobalIndexBuilder builder =
                 new 
BTreeGlobalIndexBuilder(table).withIndexType("btree").withIndexField("k");
         try (BatchTableCommit commit = writeBuilder.newCommit()) {
-            commit.commit(builder.build(builder.scan(), 
IOManager.create(warehouse.toString())));
+            commit.commit(
+                    builder.build(builder.scan().get(0), 
IOManager.create(warehouse.toString())));
         }
 
         // assert index
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderSplitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderSplitTest.java
new file mode 100644
index 0000000000..3ea2809e1c
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderSplitTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.PojoDataFileMeta;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for split regrouping in {@link BTreeGlobalIndexBuilder}. */
+public class BTreeGlobalIndexBuilderSplitTest {
+
+    @Test
+    public void testSplitByContiguousRowRangeFromDataFiles() {
+        DataFileMeta file1 = createDataFileMeta(0L, 100L);
+        DataFileMeta file2 = createDataFileMeta(300L, 100L);
+        DataFileMeta file3 = createDataFileMeta(100L, 100L);
+        DataSplit split =
+                DataSplit.builder()
+                        .withSnapshot(1L)
+                        .withPartition(BinaryRow.EMPTY_ROW)
+                        .withBucket(0)
+                        .withBucketPath("bucket-0")
+                        .withDataFiles(Arrays.asList(file1, file2, file3))
+                        .isStreaming(false)
+                        .rawConvertible(false)
+                        .build();
+
+        List<DataSplit> rebuilt =
+                
BTreeGlobalIndexBuilder.splitByContiguousRowRange(Collections.singletonList(split));
+
+        assertThat(rebuilt).hasSize(2);
+        assertThat(rebuilt.get(0).dataFiles()).containsExactly(file1, file3);
+        assertThat(rebuilt.get(1).dataFiles()).containsExactly(file2);
+        assertThat(BTreeGlobalIndexBuilder.calcRowRange(rebuilt.get(0)))
+                .isEqualTo(new Range(0, 199));
+        assertThat(BTreeGlobalIndexBuilder.calcRowRange(rebuilt.get(1)))
+                .isEqualTo(new Range(300, 399));
+    }
+
+    private static DataFileMeta createDataFileMeta(long firstRowId, long 
rowCount) {
+        return new PojoDataFileMeta(
+                "test-file-" + UUID.randomUUID(),
+                1024L,
+                rowCount,
+                BinaryRow.EMPTY_ROW,
+                BinaryRow.EMPTY_ROW,
+                SimpleStats.EMPTY_STATS,
+                SimpleStats.EMPTY_STATS,
+                0L,
+                0L,
+                0L,
+                0,
+                Collections.emptyList(),
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                firstRowId,
+                null);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
index d37ad973da..bd748edb53 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
@@ -37,6 +37,7 @@ import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Pair;
@@ -107,7 +108,11 @@ public class BTreeGlobalIndexBuilderTest extends 
TableTestBase {
         builder.withIndexField("f0");
         builder.withIndexType("btree");
         builder.withPartitionPredicate(partitionPredicate);
-        List<CommitMessage> commitMessages = builder.build(builder.scan(), 
ioManager);
+        List<DataSplit> dataSplits = builder.scan();
+        List<CommitMessage> commitMessages = new ArrayList<>();
+        for (DataSplit dataSplit : dataSplits) {
+            commitMessages.addAll(builder.build(dataSplit, ioManager));
+        }
 
         try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
             commit.commit(commitMessages);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
index b4f30f82e9..9b1ace20a0 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
@@ -29,11 +29,11 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.utils.Range;
 import org.apache.paimon.utils.RoaringNavigableMap64;
 
-import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
@@ -41,6 +41,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 /** Test for BTree indexed batch scan. */
@@ -58,8 +59,8 @@ public class BtreeGlobalIndexTableTest extends 
DataEvolutionTestBase {
 
         RoaringNavigableMap64 rowIds = globalIndexScan(table, predicate);
         assertNotNull(rowIds);
-        Assertions.assertThat(rowIds.getLongCardinality()).isEqualTo(1);
-        Assertions.assertThat(rowIds.toRangeList()).containsExactly(new 
Range(100L, 100L));
+        assertThat(rowIds.getLongCardinality()).isEqualTo(1);
+        assertThat(rowIds.toRangeList()).containsExactly(new Range(100L, 
100L));
 
         Predicate predicate2 =
                 new PredicateBuilder(table.rowType())
@@ -72,8 +73,8 @@ public class BtreeGlobalIndexTableTest extends 
DataEvolutionTestBase {
 
         rowIds = globalIndexScan(table, predicate2);
         assertNotNull(rowIds);
-        Assertions.assertThat(rowIds.getLongCardinality()).isEqualTo(3);
-        Assertions.assertThat(rowIds.toRangeList())
+        assertThat(rowIds.getLongCardinality()).isEqualTo(3);
+        assertThat(rowIds.toRangeList())
                 .containsExactlyInAnyOrder(
                         new Range(200L, 200L), new Range(300L, 300L), new 
Range(400L, 400L));
 
@@ -89,7 +90,7 @@ public class BtreeGlobalIndexTableTest extends 
DataEvolutionTestBase {
                             readF1.add(row.getString(1).toString());
                         });
 
-        Assertions.assertThat(readF1).containsExactly("a200", "a300", "a400");
+        assertThat(readF1).containsExactly("a200", "a300", "a400");
     }
 
     @Test
@@ -120,7 +121,7 @@ public class BtreeGlobalIndexTableTest extends 
DataEvolutionTestBase {
                             readF1.add(row.getString(1).toString());
                         });
 
-        Assertions.assertThat(readF1).containsExactly("a200", "a300", "a400", 
"a56789");
+        assertThat(readF1).containsExactly("a200", "a300", "a400", "a56789");
     }
 
     @Test
@@ -160,7 +161,7 @@ public class BtreeGlobalIndexTableTest extends 
DataEvolutionTestBase {
                             result.add(row.getString(1).toString());
                         });
 
-        Assertions.assertThat(result).containsExactly("a200", "a56789");
+        assertThat(result).containsExactly("a200", "a56789");
     }
 
     private void createIndex(String fieldName) throws Exception {
@@ -169,7 +170,11 @@ public class BtreeGlobalIndexTableTest extends 
DataEvolutionTestBase {
                 new BTreeGlobalIndexBuilder(table)
                         .withIndexType(BTreeGlobalIndexerFactory.IDENTIFIER)
                         .withIndexField(fieldName);
-        List<CommitMessage> commitMessages = builder.build(builder.scan(), 
ioManager);
+        List<DataSplit> dataSplits = builder.scan();
+        List<CommitMessage> commitMessages = new ArrayList<>();
+        for (DataSplit dataSplit : dataSplits) {
+            commitMessages.addAll(builder.build(dataSplit, ioManager));
+        }
         try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
             commit.commit(commitMessages);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
index 7ab2c69bd6..f69b0d704c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.btree;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
 import org.apache.paimon.flink.FlinkRowData;
 import org.apache.paimon.flink.FlinkRowWrapper;
 import org.apache.paimon.flink.LogicalTypeConversion;
@@ -34,7 +35,6 @@ import org.apache.paimon.flink.sorter.TableSorter;
 import org.apache.paimon.flink.utils.BoundedOneInputOperator;
 import org.apache.paimon.flink.utils.JavaTypeInfo;
 import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
-import org.apache.paimon.globalindex.RowIdIndexFieldsExtractor;
 import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
 import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
 import org.apache.paimon.options.Options;
@@ -47,6 +47,7 @@ import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Range;
 
@@ -60,9 +61,11 @@ import 
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
+import java.util.Map;
+import java.util.UUID;
 
-import static 
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.calcRowRange;
+import static 
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.groupSplitsByRange;
+import static 
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.splitByContiguousRowRange;
 
 /** The {@link BTreeIndexTopoBuilder} for BTree index in Flink. */
 public class BTreeIndexTopoBuilder {
@@ -83,45 +86,115 @@ public class BTreeIndexTopoBuilder {
             indexBuilder = 
indexBuilder.withPartitionPredicate(partitionPredicate);
         }
 
-        List<DataSplit> splits = indexBuilder.scan();
-        Range range = calcRowRange(splits);
-        if (splits.isEmpty() || range == null) {
+        List<DataSplit> splits = 
splitByContiguousRowRange(indexBuilder.scan());
+        if (splits.isEmpty()) {
+            return;
+        }
+        Map<BinaryRow, Map<Range, List<DataSplit>>> partitionRangeSplits =
+                groupSplitsByRange(splits);
+        if (partitionRangeSplits.isEmpty()) {
             return;
         }
 
-        // 2. Select necessary columns (partition keys + index field + ROW_ID)
-        List<String> selectedColumns = new ArrayList<>(table.partitionKeys());
+        // 2. Select necessary columns (index field + ROW_ID)
+        List<String> selectedColumns = new ArrayList<>();
         selectedColumns.add(indexColumn);
 
         RowType readType = 
SpecialFields.rowTypeWithRowId(table.rowType().project(selectedColumns));
+        int indexFieldPos = readType.getFieldIndex(indexColumn);
+        int rowIdPos = readType.getFieldIndex(SpecialFields.ROW_ID.name());
+        DataType indexFieldType = readType.getTypeAt(indexFieldPos);
 
-        // 3. Calculate parallelism and sort
+        // 3. Calculate maximum parallelism bound
         long recordsPerRange = 
userOptions.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE);
-        int parallelism = Math.max((int) (range.count() / recordsPerRange), 1);
         int maxParallelism = 
userOptions.get(BTreeIndexOptions.BTREE_INDEX_BUILD_MAX_PARALLELISM);
+
+        // 4. Build one topology per contiguous row range
+        CoreOptions coreOptions = table.coreOptions();
+        ReadBuilder readBuilder = 
table.newReadBuilder().withReadType(readType);
+        List<String> sortColumns = new ArrayList<>();
+        sortColumns.add(indexColumn);
+        DataStream<Committable> allCommitMessages = null;
+        int partitionFieldSize = table.partitionKeys().size();
+        BinaryRowSerializer binaryRowSerializer = new 
BinaryRowSerializer(partitionFieldSize);
+        for (Map.Entry<BinaryRow, Map<Range, List<DataSplit>>> partitionEntry :
+                partitionRangeSplits.entrySet()) {
+            BinaryRow partition = partitionEntry.getKey();
+            for (Map.Entry<Range, List<DataSplit>> entry : 
partitionEntry.getValue().entrySet()) {
+                Range range = entry.getKey();
+                List<DataSplit> rangeSplits = entry.getValue();
+                if (rangeSplits.isEmpty()) {
+                    continue;
+                }
+
+                DataStream<Committable> commitMessages =
+                        executeForPartitionRange(
+                                env,
+                                range,
+                                rangeSplits,
+                                readBuilder,
+                                indexBuilder,
+                                partitionFieldSize,
+                                
binaryRowSerializer.serializeToBytes(partition),
+                                indexFieldPos,
+                                rowIdPos,
+                                indexFieldType,
+                                sortColumns,
+                                coreOptions,
+                                readType,
+                                recordsPerRange,
+                                maxParallelism);
+
+                allCommitMessages =
+                        allCommitMessages == null
+                                ? commitMessages
+                                : allCommitMessages.union(commitMessages);
+            }
+        }
+        if (allCommitMessages != null) {
+            commit(table, allCommitMessages);
+        }
+
+        env.execute("Create btree global index for table: " + table.name());
+    }
+
+    private static DataStream<Committable> executeForPartitionRange(
+            StreamExecutionEnvironment env,
+            Range range,
+            List<DataSplit> rangeSplits,
+            ReadBuilder readBuilder,
+            BTreeGlobalIndexBuilder indexBuilder,
+            int partitionFieldSize,
+            byte[] partition,
+            int indexFieldPos,
+            int rowIdPos,
+            DataType indexFieldType,
+            List<String> sortColumns,
+            CoreOptions coreOptions,
+            RowType readType,
+            long recordsPerRange,
+            int maxParallelism) {
+        int parallelism = Math.max((int) (range.count() / recordsPerRange), 1);
         parallelism = Math.min(parallelism, maxParallelism);
 
-        // 4. Create source from splits and select columns
         DataStream<DataSplit> sourceStream =
-                env.fromData(new JavaTypeInfo<>(DataSplit.class), 
splits.toArray(new DataSplit[0]))
-                        .name("Global Index Source")
+                env.fromData(
+                                new JavaTypeInfo<>(DataSplit.class),
+                                rangeSplits.toArray(new DataSplit[0]))
+                        .name("Global Index Source " + " range=" + range)
                         .setParallelism(1);
 
-        ReadBuilder readBuilder = 
table.newReadBuilder().withReadType(readType);
         DataStream<RowData> rowDataStream =
                 sourceStream
                         .transform(
-                                "Read Data",
+                                "Read Data " + range,
                                 
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(readType)),
                                 new ReadDataOperator(readBuilder))
                         .setParallelism(parallelism);
 
-        // 5. Sort data using TableSorter style
-        // Configure sort info similar to SortCompactAction
-        CoreOptions coreOptions = table.coreOptions();
         TableSortInfo sortInfo =
                 new TableSortInfo.Builder()
-                        .setSortColumns(selectedColumns)
+                        .setSortColumns(sortColumns)
                         .setSortStrategy(CoreOptions.OrderType.ORDER)
                         .setSinkParallelism(parallelism)
                         .setLocalSampleSize(parallelism * 
coreOptions.getLocalSampleMagnification())
@@ -129,24 +202,23 @@ public class BTreeIndexTopoBuilder {
                         .setRangeNumber(parallelism * 10)
                         .build();
 
-        // Use TableSorter for sorting
         TableSorter sorter =
                 TableSorter.getSorter(env, rowDataStream, coreOptions, 
readType, sortInfo);
         DataStream<RowData> sortedStream = sorter.sort();
 
-        // 6. Build index for each partition
-        DataStream<Committable> commitMessages =
-                sortedStream
-                        .transform(
-                                "write-btree-index",
-                                new CommittableTypeInfo(),
-                                new WriteIndexOperator(range, indexBuilder))
-                        .setParallelism(parallelism);
-
-        // 7. Commit all commit messages
-        commit(table, commitMessages);
-
-        env.execute("Create btree global index for table: " + table.name());
+        return sortedStream
+                .transform(
+                        "write-btree-index " + range,
+                        new CommittableTypeInfo(),
+                        new WriteIndexOperator(
+                                range,
+                                partitionFieldSize,
+                                partition,
+                                indexBuilder,
+                                indexFieldPos,
+                                rowIdPos,
+                                indexFieldType))
+                .setParallelism(parallelism);
     }
 
     private static void commit(FileStoreTable table, DataStream<Committable> 
written) {
@@ -154,7 +226,7 @@ public class BTreeIndexTopoBuilder {
                 new CommitterOperatorFactory<>(
                         false,
                         true,
-                        "BTreeIndexCommitter",
+                        "BTreeIndexCommitter-" + UUID.randomUUID(),
                         context ->
                                 new StoreCommitter(
                                         table, 
table.newCommit(context.commitUser()), context),
@@ -200,59 +272,75 @@ public class BTreeIndexTopoBuilder {
     private static class WriteIndexOperator extends 
BoundedOneInputOperator<RowData, Committable> {
 
         private final Range rowRange;
+        private final byte[] partition;
+        private final int partitionFieldSize;
         private final BTreeGlobalIndexBuilder builder;
+        private final int indexFieldPos;
+        private final int rowIdPos;
+        private final DataType indexFieldType;
 
         private transient long counter;
-        private transient BinaryRow currentPart;
         private transient GlobalIndexParallelWriter currentWriter;
         private transient List<CommitMessage> commitMessages;
-
-        public WriteIndexOperator(Range rowRange, BTreeGlobalIndexBuilder 
builder) {
+        private transient InternalRow.FieldGetter indexFieldGetter;
+        private transient BinaryRowSerializer binaryRowSerializer;
+
+        public WriteIndexOperator(
+                Range rowRange,
+                int partitionFieldSize,
+                byte[] partition,
+                BTreeGlobalIndexBuilder builder,
+                int indexFieldPos,
+                int rowIdPos,
+                DataType indexFieldType) {
             this.rowRange = rowRange;
+            this.partitionFieldSize = partitionFieldSize;
+            this.partition = partition;
             this.builder = builder;
+            this.indexFieldPos = indexFieldPos;
+            this.rowIdPos = rowIdPos;
+            this.indexFieldType = indexFieldType;
         }
 
         @Override
         public void open() throws Exception {
             super.open();
             commitMessages = new ArrayList<>();
+            indexFieldGetter = InternalRow.createFieldGetter(indexFieldType, 
indexFieldPos);
+            this.binaryRowSerializer = new 
BinaryRowSerializer(partitionFieldSize);
         }
 
         @Override
         public void processElement(StreamRecord<RowData> element) throws 
IOException {
             InternalRow row = new FlinkRowWrapper(element.getValue());
-
-            RowIdIndexFieldsExtractor extractor = builder.extractor();
-            BinaryRow partRow = extractor.extractPartition(row);
-
-            // the input is sorted by <partition, indexedField>
-            if (currentWriter != null) {
-                if (!Objects.equals(partRow, currentPart) || counter >= 
builder.recordsPerRange()) {
-                    commitMessages.add(
-                            builder.flushIndex(rowRange, 
currentWriter.finish(), currentPart));
-                    currentWriter = null;
-                    counter = 0;
-                }
+            if (currentWriter != null && counter >= builder.recordsPerRange()) 
{
+                commitMessages.add(
+                        builder.flushIndex(
+                                rowRange,
+                                currentWriter.finish(),
+                                
binaryRowSerializer.deserializeFromBytes(partition)));
+                currentWriter = null;
+                counter = 0;
             }
 
-            // write <value, rowId> pair to index file
-            currentPart = partRow;
             counter++;
 
             if (currentWriter == null) {
                 currentWriter = builder.createWriter();
             }
 
-            // convert the original rowId to local rowId
-            long localRowId = extractor.extractRowId(row) - rowRange.from;
-            currentWriter.write(extractor.extractIndexField(row), localRowId);
+            long localRowId = row.getLong(rowIdPos) - rowRange.from;
+            currentWriter.write(indexFieldGetter.getFieldOrNull(row), 
localRowId);
         }
 
         @Override
         public void endInput() throws IOException {
             if (counter > 0) {
                 commitMessages.add(
-                        builder.flushIndex(rowRange, currentWriter.finish(), 
currentPart));
+                        builder.flushIndex(
+                                rowRange,
+                                currentWriter.finish(),
+                                
binaryRowSerializer.deserializeFromBytes(partition)));
             }
             for (CommitMessage message : commitMessages) {
                 output.collect(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
index 51cd79f378..d6368da23f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Test;
 
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -66,4 +67,77 @@ public class BTreeGlobalIndexITCase extends 
CatalogITCaseBase {
         // assert select with filter
         assertThat(sql("SELECT * FROM T WHERE id = 
100")).containsOnly(Row.of(100, "name_100"));
     }
+
+    @Test
+    public void testBTreeIndexWithMultiPartition() throws 
Catalog.TableNotExistException {
+        sql(
+                "CREATE TABLE T_MP (pt INT, id INT, name STRING) PARTITIONED 
BY (pt) WITH ("
+                        + "'global-index.enabled' = 'true', "
+                        + "'row-tracking.enabled' = 'true', "
+                        + "'data-evolution.enabled' = 'true'"
+                        + ")");
+
+        // write partition 0: 100k rows
+        insertPartitionRows("T_MP", 0, 0, 500, "p0_a_");
+        insertPartitionRows("T_MP", 0, 500, 500, "p0_a_");
+        // write partition 1: 100k rows
+        insertPartitionRows("T_MP", 1, 1_000, 1_000, "p1_");
+        // write partition 0 again: 100k rows
+        insertPartitionRows("T_MP", 0, 2_000, 1_000, "p0_b_");
+
+        buildBTreeIndexForTable("T_MP", "id");
+
+        FileStoreTable table = paimonTable("T_MP");
+        List<IndexManifestEntry> btreeEntries =
+                table.store().newIndexFileHandler().scanEntries().stream()
+                        .filter(e -> "btree".equals(e.indexFile().indexType()))
+                        .collect(Collectors.toList());
+
+        long totalRowCount =
+                btreeEntries.stream()
+                        .map(IndexManifestEntry::indexFile)
+                        .mapToLong(IndexFileMeta::rowCount)
+                        .sum();
+        Map<Object, Long> partitionRowCounts =
+                btreeEntries.stream()
+                        .collect(
+                                Collectors.groupingBy(
+                                        IndexManifestEntry::partition,
+                                        Collectors.summingLong(e -> 
e.indexFile().rowCount())));
+
+        assertThat(partitionRowCounts).hasSize(2);
+        
assertThat(partitionRowCounts.values()).containsExactlyInAnyOrder(1_000L, 
2_000L);
+        assertThat(totalRowCount).isEqualTo(3_000L);
+
+        assertThat(sql("SELECT * FROM T_MP WHERE id = 999"))
+                .containsOnly(Row.of(0, 999, "p0_a_999"));
+        assertThat(sql("SELECT * FROM T_MP WHERE id = 1500"))
+                .containsOnly(Row.of(1, 1500, "p1_1500"));
+        assertThat(sql("SELECT * FROM T_MP WHERE id = 2500"))
+                .containsOnly(Row.of(0, 2500, "p0_b_2500"));
+    }
+
+    private void insertPartitionRows(
+            String tableName, int partition, int startId, int count, String 
namePrefix) {
+        final int batchSize = 5_000;
+        for (int offset = 0; offset < count; offset += batchSize) {
+            int batchStart = startId + offset;
+            int batchEnd = Math.min(startId + count, batchStart + batchSize);
+            String values =
+                    IntStream.range(batchStart, batchEnd)
+                            .mapToObj(
+                                    i ->
+                                            String.format(
+                                                    "(%d, %d, '%s%d')",
+                                                    partition, i, namePrefix, 
i))
+                            .collect(Collectors.joining(","));
+            sql("INSERT INTO %s VALUES %s", tableName, values);
+        }
+    }
+
+    private void buildBTreeIndexForTable(String tableName, String indexColumn) 
{
+        sql(
+                "CALL sys.create_global_index(`table` => 'default.%s', 
index_column => '%s', index_type => 'btree')",
+                tableName, indexColumn);
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
index fc8c6b83cb..20d8bc5de2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
@@ -84,7 +84,14 @@ public class DefaultGlobalIndexBuilder implements 
Serializable {
         LongCounter rowCounter = new LongCounter(0);
         List<ResultEntry> resultEntries = writePaimonRows(data, rowCounter);
         List<IndexFileMeta> indexFileMetas =
-                toIndexFileMetas(table, rowRange, indexField.id(), indexType, 
resultEntries);
+                toIndexFileMetas(
+                        table.fileIO(),
+                        table.store().pathFactory().globalIndexFileFactory(),
+                        table.coreOptions(),
+                        rowRange,
+                        indexField.id(),
+                        indexType,
+                        resultEntries);
         DataIncrement dataIncrement = 
DataIncrement.indexIncrement(indexFileMetas);
         return new CommitMessageImpl(
                 partition, 0, null, dataIncrement, 
CompactIncrement.emptyIncrement());
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
index fdfe0f532a..644fa81cdd 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
@@ -18,7 +18,9 @@
 
 package org.apache.paimon.spark.globalindex.btree;
 
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
 import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
 import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
 import org.apache.paimon.options.Options;
@@ -27,7 +29,6 @@ import org.apache.paimon.spark.SparkRow;
 import org.apache.paimon.spark.globalindex.GlobalIndexTopologyBuilder;
 import org.apache.paimon.spark.util.ScanPlanHelper$;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.SpecialFields;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageSerializer;
 import org.apache.paimon.table.source.DataSplit;
@@ -51,8 +52,10 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
-import static 
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.calcRowRange;
+import static 
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.groupSplitsByRange;
+import static 
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.splitByContiguousRowRange;
 
 /** The {@link GlobalIndexTopologyBuilder} for BTree index. */
 public class BTreeIndexTopoBuilder implements GlobalIndexTopologyBuilder {
@@ -82,72 +85,94 @@ public class BTreeIndexTopoBuilder implements 
GlobalIndexTopologyBuilder {
             indexBuilder = 
indexBuilder.withPartitionPredicate(partitionPredicate);
         }
 
-        List<DataSplit> splits = indexBuilder.scan();
-        Range range = calcRowRange(splits);
-        if (splits.isEmpty() || range == null) {
+        List<DataSplit> splits = 
splitByContiguousRowRange(indexBuilder.scan());
+        if (splits.isEmpty()) {
+            return Collections.emptyList();
+        }
+        Map<BinaryRow, Map<Range, List<DataSplit>>> partitionRangeSplits =
+                groupSplitsByRange(splits);
+        if (partitionRangeSplits.isEmpty()) {
             return Collections.emptyList();
         }
 
-        // we need to read all partition columns for shuffle
-        List<String> selectedColumns = new ArrayList<>();
-        selectedColumns.addAll(table.partitionKeys());
-        selectedColumns.addAll(readType.getFieldNames());
-
-        Dataset<Row> source =
-                PaimonUtils.createDataset(
-                        spark,
-                        ScanPlanHelper$.MODULE$.createNewScanPlan(
-                                splits.toArray(new DataSplit[0]), relation));
-
-        Dataset<Row> selected =
-                
source.select(selectedColumns.stream().map(functions::col).toArray(Column[]::new));
-
-        // 2. shuffle and sort by partitions and index keys
-        Column[] sortFields =
-                selectedColumns.stream()
-                        .filter(name -> 
!SpecialFields.ROW_ID.name().equals(name))
-                        .map(functions::col)
-                        .toArray(Column[]::new);
+        List<String> selectedColumns = new 
ArrayList<>(readType.getFieldNames());
 
+        // Calculate maximum parallelism bound
         long recordsPerRange = 
options.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE);
-        // this should be superfast since append only table can utilize 
count-start pushdown well.
-        long rowCount = source.count();
-        int partitionNum = Math.max((int) (rowCount / recordsPerRange), 1);
         int maxParallelism = 
options.get(BTreeIndexOptions.BTREE_INDEX_BUILD_MAX_PARALLELISM);
-        partitionNum = Math.min(partitionNum, maxParallelism);
-
-        // For efficiency, we do not repartition within each paimon partition. 
Instead, we directly
-        // divide ranges by <partitions, index field>, and each subtask is 
expected to process
-        // records from multiple partitions. The drawback is that if a Paimon 
partition spans
-        // multiple Spark partitions, the first and last output files may 
contain relatively few
-        // records.
-        Dataset<Row> partitioned =
-                selected.repartitionByRange(partitionNum, sortFields)
-                        .sortWithinPartitions(sortFields);
-
-        // 3. write index for each partition & range
-        final byte[] serializedBuilder = 
InstantiationUtil.serializeObject(indexBuilder);
-        final RowType rowType =
-                
SpecialFields.rowTypeWithRowId(table.rowType()).project(selectedColumns);
-        JavaRDD<byte[]> written =
-                partitioned
-                        .javaRDD()
-                        .map(row -> (InternalRow) (new SparkRow(rowType, row)))
-                        .mapPartitions(
-                                (FlatMapFunction<Iterator<InternalRow>, 
byte[]>)
-                                        iter -> buildBTreeIndex(iter, 
serializedBuilder, range));
-
-        // 4. collect all commit messages and return
-        List<byte[]> commitBytes = written.collect();
-        return CommitMessageSerializer.deserializeAll(commitBytes);
+
+        List<CommitMessage> allMessages = new ArrayList<>();
+        List<String> sortColumns = new ArrayList<>();
+        sortColumns.add(indexField.name());
+        final int partitionKeyNum = table.partitionKeys().size();
+        BinaryRowSerializer binaryRowSerializer = new 
BinaryRowSerializer(partitionKeyNum);
+        for (Map.Entry<BinaryRow, Map<Range, List<DataSplit>>> partitionEntry :
+                partitionRangeSplits.entrySet()) {
+            for (Map.Entry<Range, List<DataSplit>> entry : 
partitionEntry.getValue().entrySet()) {
+                Range range = entry.getKey();
+                List<DataSplit> rangeSplits = entry.getValue();
+                if (rangeSplits.isEmpty()) {
+                    continue;
+                }
+                int partitionNum = Math.max((int) (range.count() / 
recordsPerRange), 1);
+                partitionNum = Math.min(partitionNum, maxParallelism);
+
+                Dataset<Row> source =
+                        PaimonUtils.createDataset(
+                                spark,
+                                ScanPlanHelper$.MODULE$.createNewScanPlan(
+                                        rangeSplits.toArray(new DataSplit[0]), 
relation));
+
+                Dataset<Row> selected =
+                        source.select(
+                                selectedColumns.stream()
+                                        .map(functions::col)
+                                        .toArray(Column[]::new));
+
+                Column[] sortFields =
+                        
sortColumns.stream().map(functions::col).toArray(Column[]::new);
+
+                Dataset<Row> partitioned =
+                        selected.repartitionByRange(partitionNum, sortFields)
+                                .sortWithinPartitions(sortFields);
+
+                final byte[] serializedBuilder = 
InstantiationUtil.serializeObject(indexBuilder);
+                final byte[] partitionBytes =
+                        
binaryRowSerializer.serializeToBytes(partitionEntry.getKey());
+                JavaRDD<byte[]> written =
+                        partitioned
+                                .javaRDD()
+                                .map(row -> (InternalRow) (new 
SparkRow(readType, row)))
+                                .mapPartitions(
+                                        
(FlatMapFunction<Iterator<InternalRow>, byte[]>)
+                                                iter ->
+                                                        buildBTreeIndex(
+                                                                iter,
+                                                                
serializedBuilder,
+                                                                range,
+                                                                
partitionKeyNum,
+                                                                
partitionBytes));
+                List<byte[]> commitBytes = written.collect();
+                
allMessages.addAll(CommitMessageSerializer.deserializeAll(commitBytes));
+            }
+        }
+        return allMessages;
     }
 
     private static Iterator<byte[]> buildBTreeIndex(
-            Iterator<InternalRow> input, byte[] serializedBuilder, Range range)
+            Iterator<InternalRow> input,
+            byte[] serializedBuilder,
+            Range range,
+            int partitionKeyNum,
+            byte[] partitionBytes)
             throws IOException, ClassNotFoundException {
+        final BinaryRowSerializer binaryRowSerializer = new 
BinaryRowSerializer(partitionKeyNum);
+        BinaryRow partition = 
binaryRowSerializer.deserializeFromBytes(partitionBytes);
         BTreeGlobalIndexBuilder builder =
                 InstantiationUtil.deserializeObject(
                         serializedBuilder, 
BTreeGlobalIndexBuilder.class.getClassLoader());
-        return CommitMessageSerializer.serializeAll(builder.build(range, 
input)).iterator();
+        return CommitMessageSerializer.serializeAll(
+                        builder.buildForSinglePartition(range, partition, 
input))
+                .iterator();
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
index 8ff94e1d21..ddc7477bf7 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
@@ -157,7 +157,11 @@ public class CreateGlobalIndexProcedure extends 
BaseProcedure {
                                         userOptions);
 
                         try (TableCommitImpl commit =
-                                table.newCommit("global-index-create-" + 
UUID.randomUUID())) {
+                                table.newCommit(
+                                        "global-index-"
+                                                + indexType
+                                                + "-create-"
+                                                + UUID.randomUUID())) {
                             commit.commit(indexResults);
                         }
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
index e20469c371..029d6cd378 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
@@ -177,7 +177,6 @@ class CreateGlobalIndexProcedureTest extends 
PaimonSparkTestBase with StreamTest
         .asScala
         .filter(_.indexFile().indexType() == "btree")
         .map(_.indexFile())
-      table.store().newGlobalIndexScanBuilder().shardList()
       assert(btreeEntries.nonEmpty)
 
       // 1. assert total row count, file count and row range
@@ -237,22 +236,22 @@ class CreateGlobalIndexProcedureTest extends 
PaimonSparkTestBase with StreamTest
         (0 until 65000).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
       spark.sql(s"INSERT INTO T VALUES $values")
 
-      values = (0 until 35000).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+      values = (0 until 22222).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
       spark.sql(s"INSERT INTO T VALUES $values")
 
-      values = (0 until 22222).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
+      values = (0 until 35000).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
       spark.sql(s"INSERT INTO T VALUES $values")
 
       values = (0 until 100).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
       spark.sql(s"INSERT INTO T VALUES $values")
 
-      values = (0 until 100).map(i => s"($i, 'name_$i', 'p2')").mkString(",")
+      values = (0 until 33333).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
       spark.sql(s"INSERT INTO T VALUES $values")
 
-      values = (0 until 33333).map(i => s"($i, 'name_$i', 'p2')").mkString(",")
+      values = (0 until 100).map(i => s"($i, 'name_$i', 'p2')").mkString(",")
       spark.sql(s"INSERT INTO T VALUES $values")
 
-      values = (0 until 33333).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+      values = (0 until 33333).map(i => s"($i, 'name_$i', 'p2')").mkString(",")
       spark.sql(s"INSERT INTO T VALUES $values")
 
       val output =

Reply via email to