This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new aef59c80c6 [core][spark] Move btree builder from spark to core (#7156)
aef59c80c6 is described below

commit aef59c80c6a986b2dc4ad785a73499c38f49445d
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jan 30 15:48:47 2026 +0800

    [core][spark] Move btree builder from spark to core (#7156)
---
 .../main/java/org/apache/paimon/utils/Range.java   |   2 +
 .../globalindex/GlobalIndexBuilderUtils.java       |   6 +-
 .../globalindex/RowIdIndexFieldsExtractor.java     |   2 +-
 .../globalindex/btree/BTreeGlobalIndexBuilder.java | 247 +++++++++++++++++++++
 .../paimon/utils/MutableObjectIteratorAdapter.java |  92 ++++++++
 ...leTest.java => BitmapGlobalIndexTableTest.java} | 171 +-------------
 .../paimon/table/BtreeGlobalIndexTableTest.java    | 196 ++++++++++++++++
 .../org/apache/paimon/table/TableTestBase.java     |   3 +
 .../paimon/flink/lookup/LookupTableTest.java       |   3 -
 .../globalindex/DefaultGlobalIndexBuilder.java     |   4 +-
 .../globalindex/btree/BTreeGlobalIndexBuilder.java | 154 -------------
 .../globalindex/btree/BTreeIndexTopoBuilder.java   |  44 ++--
 12 files changed, 569 insertions(+), 355 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
index 461a68fbab..4457a00254 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
@@ -29,6 +29,8 @@ import java.util.Objects;
 /** Range represents from (inclusive) and to (inclusive). */
 public class Range implements Serializable {
 
+    private static final long serialVersionUID = 1L;
+
     public final long from;
     public final long to;
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderUtils.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java
similarity index 92%
rename from 
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderUtils.java
rename to 
paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java
index cd26dbfaf4..b53a0641ab 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderUtils.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java
@@ -16,13 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark.globalindex;
+package org.apache.paimon.globalindex;
 
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.globalindex.GlobalIndexFileReadWrite;
-import org.apache.paimon.globalindex.GlobalIndexWriter;
-import org.apache.paimon.globalindex.GlobalIndexer;
-import org.apache.paimon.globalindex.ResultEntry;
 import org.apache.paimon.index.GlobalIndexMeta;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.index.IndexPathFactory;
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/RowIdIndexFieldsExtractor.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/RowIdIndexFieldsExtractor.java
similarity index 98%
rename from 
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/RowIdIndexFieldsExtractor.java
rename to 
paimon-core/src/main/java/org/apache/paimon/globalindex/RowIdIndexFieldsExtractor.java
index d3ca25a2cb..8eaf176891 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/RowIdIndexFieldsExtractor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/RowIdIndexFieldsExtractor.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark.globalindex;
+package org.apache.paimon.globalindex;
 
 import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.codegen.Projection;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
new file mode 100644
index 0000000000..9dc9da7e6e
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.globalindex.RowIdIndexFieldsExtractor;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.sort.BinaryExternalSortBuffer;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.MutableObjectIteratorAdapter;
+import org.apache.paimon.utils.Range;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.paimon.globalindex.GlobalIndexBuilderUtils.createIndexWriter;
+import static 
org.apache.paimon.globalindex.GlobalIndexBuilderUtils.toIndexFileMetas;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Builder to build btree global index. */
+public class BTreeGlobalIndexBuilder implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final double FLOATING = 1.2;
+
+    private final FileStoreTable table;
+    private final RowType rowType;
+    private final Options options;
+    private final long recordsPerRange;
+
+    private String indexType;
+    private DataField indexField;
+    private RowType readRowType;
+    private RowIdIndexFieldsExtractor extractor;
+
+    @Nullable private PartitionPredicate partitionPredicate;
+
+    public BTreeGlobalIndexBuilder(Table table) {
+        this.table = (FileStoreTable) table;
+        this.rowType = table.rowType();
+        this.options = this.table.coreOptions().toConfiguration();
+        this.recordsPerRange =
+                (long) 
(options.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE) * FLOATING);
+    }
+
+    public BTreeGlobalIndexBuilder withIndexType(String indexType) {
+        this.indexType = indexType;
+        return this;
+    }
+
+    public BTreeGlobalIndexBuilder withIndexField(String indexField) {
+        checkArgument(
+                rowType.containsField(indexField),
+                "Column '%s' does not exist in table '%s'.",
+                indexField,
+                table.fullName());
+        this.indexField = rowType.getField(indexField);
+        this.readRowType =
+                SpecialFields.rowTypeWithRowId(new 
RowType(singletonList(this.indexField)));
+        List<String> readColumns = new ArrayList<>(table.partitionKeys());
+        readColumns.addAll(readRowType.getFieldNames());
+        this.extractor =
+                new RowIdIndexFieldsExtractor(
+                        
SpecialFields.rowTypeWithRowId(table.rowType()).project(readColumns),
+                        table.partitionKeys(),
+                        this.indexField.name());
+        return this;
+    }
+
+    public BTreeGlobalIndexBuilder withPartitionPredicate(PartitionPredicate 
partitionPredicate) {
+        this.partitionPredicate = partitionPredicate;
+        return this;
+    }
+
+    public List<DataSplit> scan() {
+        // 1. read the whole dataset of target partitions
+        SnapshotReader snapshotReader = table.newSnapshotReader();
+        if (partitionPredicate != null) {
+            snapshotReader = 
snapshotReader.withPartitionFilter(partitionPredicate);
+        }
+
+        return snapshotReader.read().dataSplits();
+    }
+
+    public List<CommitMessage> build(List<DataSplit> splits, IOManager 
ioManager)
+            throws IOException {
+        Range rowRange = calcRowRange(splits);
+        if (splits.isEmpty() || rowRange == null) {
+            return Collections.emptyList();
+        }
+
+        CoreOptions options = new CoreOptions(this.options);
+        BinaryExternalSortBuffer buffer =
+                BinaryExternalSortBuffer.create(
+                        ioManager,
+                        readRowType,
+                        new int[] {0},
+                        options.writeBufferSize(),
+                        options.pageSize(),
+                        options.localSortMaxNumFileHandles(),
+                        options.spillCompressOptions(),
+                        options.writeBufferSpillDiskSize(),
+                        options.sequenceFieldSortOrderIsAscending());
+
+        List<Split> splitList = new ArrayList<>(splits);
+        RecordReader<InternalRow> reader =
+                
table.newReadBuilder().withReadType(readRowType).newRead().createReader(splitList);
+        try (CloseableIterator<InternalRow> iterator = 
reader.toCloseableIterator()) {
+            while (iterator.hasNext()) {
+                InternalRow row = iterator.next();
+                buffer.write(row);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        Iterator<InternalRow> iterator =
+                new MutableObjectIteratorAdapter<>(
+                        buffer.sortedIterator(), new 
BinaryRow(readRowType.getFieldCount()));
+        List<CommitMessage> result = build(rowRange, iterator);
+        buffer.clear();
+
+        return result;
+    }
+
+    public List<CommitMessage> build(Range rowRange, Iterator<InternalRow> 
data)
+            throws IOException {
+        long counter = 0;
+        BinaryRow currentPart = null;
+        GlobalIndexParallelWriter currentWriter = null;
+        List<CommitMessage> commitMessages = new ArrayList<>();
+
+        while (data.hasNext()) {
+            InternalRow row = data.next();
+            BinaryRow partRow = extractor.extractPartition(row);
+
+            // the input is sorted by <partition, indexedField>
+            if (currentWriter != null) {
+                if (!Objects.equals(partRow, currentPart) || counter >= 
recordsPerRange) {
+                    commitMessages.add(flushIndex(rowRange, 
currentWriter.finish(), currentPart));
+                    currentWriter = null;
+                    counter = 0;
+                }
+            }
+
+            // write <value, rowId> pair to index file
+            currentPart = partRow;
+            counter++;
+
+            if (currentWriter == null) {
+                currentWriter = createWriter();
+            }
+
+            // convert the original rowId to local rowId
+            long localRowId = extractor.extractRowId(row) - rowRange.from;
+            currentWriter.write(extractor.extractIndexField(row), localRowId);
+        }
+
+        if (counter > 0) {
+            commitMessages.add(flushIndex(rowRange, currentWriter.finish(), 
currentPart));
+        }
+
+        return commitMessages;
+    }
+
+    private GlobalIndexParallelWriter createWriter() throws IOException {
+        GlobalIndexParallelWriter currentWriter;
+        GlobalIndexWriter indexWriter = createIndexWriter(table, indexType, 
indexField, options);
+        if (!(indexWriter instanceof GlobalIndexParallelWriter)) {
+            throw new RuntimeException(
+                    "Unexpected implementation, the index writer of BTree 
should be an instance of GlobalIndexParallelWriter, but found: "
+                            + indexWriter.getClass().getName());
+        }
+        currentWriter = (GlobalIndexParallelWriter) indexWriter;
+        return currentWriter;
+    }
+
+    private CommitMessage flushIndex(
+            Range rowRange, List<ResultEntry> resultEntries, BinaryRow 
partition)
+            throws IOException {
+        List<IndexFileMeta> indexFileMetas =
+                toIndexFileMetas(table, rowRange, indexField.id(), indexType, 
resultEntries);
+        DataIncrement dataIncrement = 
DataIncrement.indexIncrement(indexFileMetas);
+        return new CommitMessageImpl(
+                partition, 0, null, dataIncrement, 
CompactIncrement.emptyIncrement());
+    }
+
+    public static Range calcRowRange(List<DataSplit> dataSplits) {
+        long start = Long.MAX_VALUE;
+        long end = Long.MIN_VALUE;
+        for (DataSplit dataSplit : dataSplits) {
+            for (DataFileMeta file : dataSplit.dataFiles()) {
+                long firstRowId = file.nonNullFirstRowId();
+                start = Math.min(start, firstRowId);
+                end = Math.max(end, firstRowId + file.rowCount());
+            }
+        }
+        return start == Long.MAX_VALUE ? null : new Range(start, end);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/MutableObjectIteratorAdapter.java
 
b/paimon-core/src/main/java/org/apache/paimon/utils/MutableObjectIteratorAdapter.java
new file mode 100644
index 0000000000..a4a99e12e7
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/MutableObjectIteratorAdapter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Adapter class that wraps a {@link MutableObjectIterator} into a standard 
Java {@link Iterator}.
+ *
+ * <p>This adapter handles the key differences between {@link 
MutableObjectIterator} and standard
+ * {@link Iterator}:
+ *
+ * <ul>
+ *   <li>{@link MutableObjectIterator} returns {@code null} to indicate the 
end of iteration, while
+ *       {@link Iterator} uses {@link #hasNext()} and {@link #next()} 
separation.
+ *   <li>{@link MutableObjectIterator} can throw {@link IOException} from its 
{@code next()} method,
+ *       while standard {@link Iterator} does not declare checked exceptions. 
This adapter wraps
+ *       {@link IOException} into {@link RuntimeException}.
+ * </ul>
+ *
+ * @param <E> The element type of the iterator.
+ */
+public class MutableObjectIteratorAdapter<I extends E, E> implements 
Iterator<E> {
+
+    private final MutableObjectIterator<I> delegate;
+    private final I instance;
+    private E nextElement;
+    private boolean hasNext = false;
+    private boolean initialized = false;
+
+    /**
+     * Creates a new adapter wrapping the given {@link MutableObjectIterator}.
+     *
+     * @param delegate The iterator to wrap.
+     */
+    public MutableObjectIteratorAdapter(MutableObjectIterator<I> delegate, I 
instance) {
+        this.delegate = delegate;
+        this.instance = instance;
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (!initialized) {
+            prefetch();
+        }
+        return hasNext;
+    }
+
+    @Override
+    public E next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        E result = nextElement;
+        prefetch();
+        return result;
+    }
+
+    /**
+     * Prefetches the next element from the delegate iterator.
+     *
+     * <p>This method reads ahead one element to support the {@link 
#hasNext()} check required by
+     * the standard {@link Iterator} interface.
+     */
+    private void prefetch() {
+        try {
+            nextElement = delegate.next(instance);
+            hasNext = (nextElement != null);
+            initialized = true;
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to read next element from 
MutableObjectIterator", e);
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/GlobalIndexTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/BitmapGlobalIndexTableTest.java
similarity index 61%
rename from 
paimon-core/src/test/java/org/apache/paimon/table/GlobalIndexTableTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/table/BitmapGlobalIndexTableTest.java
index b375cea8d6..65ec24b9bb 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/GlobalIndexTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/BitmapGlobalIndexTableTest.java
@@ -21,11 +21,9 @@ package org.apache.paimon.table;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.globalindex.DataEvolutionBatchScan;
 import org.apache.paimon.globalindex.GlobalIndexFileReadWrite;
-import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
 import org.apache.paimon.globalindex.GlobalIndexResult;
 import org.apache.paimon.globalindex.GlobalIndexScanBuilder;
 import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
@@ -35,13 +33,10 @@ import 
org.apache.paimon.globalindex.GlobalIndexerFactoryUtils;
 import org.apache.paimon.globalindex.ResultEntry;
 import org.apache.paimon.globalindex.RowRangeGlobalIndexScanner;
 import org.apache.paimon.globalindex.bitmap.BitmapGlobalIndexerFactory;
-import org.apache.paimon.globalindex.btree.BTreeGlobalIndexerFactory;
-import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
 import org.apache.paimon.index.GlobalIndexMeta;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -56,36 +51,22 @@ import org.apache.paimon.utils.RoaringNavigableMap64;
 
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 /** Test for BTree indexed batch scan. */
-public class GlobalIndexTableTest extends DataEvolutionTestBase {
-
-    private static final Logger log = 
LoggerFactory.getLogger(GlobalIndexTableTest.class);
+public class BitmapGlobalIndexTableTest extends DataEvolutionTestBase {
 
     @Test
     public void testBitmapGlobalIndex() throws Exception {
-        innerTestGlobalIndex(BitmapGlobalIndexerFactory.IDENTIFIER);
-    }
-
-    @Test
-    public void testBTreeGlobalIndex() throws Exception {
-        innerTestGlobalIndex(BTreeGlobalIndexerFactory.IDENTIFIER);
-    }
-
-    private void innerTestGlobalIndex(String indexType) throws Exception {
         write(100000L);
-        createIndex(indexType, "f1");
+        createIndex("f1");
 
         FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
 
@@ -130,17 +111,8 @@ public class GlobalIndexTableTest extends 
DataEvolutionTestBase {
 
     @Test
     public void testBitmapGlobalIndexWithCoreScan() throws Exception {
-        
innerTestGlobalIndexWithCoreScan(BitmapGlobalIndexerFactory.IDENTIFIER);
-    }
-
-    @Test
-    public void testBTreeGlobalIndexWithCoreScan() throws Exception {
-        innerTestGlobalIndexWithCoreScan(BTreeGlobalIndexerFactory.IDENTIFIER);
-    }
-
-    private void innerTestGlobalIndexWithCoreScan(String indexType) throws 
Exception {
         write(100000L);
-        createIndex(indexType, "f1");
+        createIndex("f1");
 
         FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
 
@@ -170,18 +142,9 @@ public class GlobalIndexTableTest extends 
DataEvolutionTestBase {
 
     @Test
     public void testMultipleBitmapIndices() throws Exception {
-        innerTestMultipleIndices(BitmapGlobalIndexerFactory.IDENTIFIER);
-    }
-
-    @Test
-    public void testMultipleBTreeIndices() throws Exception {
-        innerTestMultipleIndices(BTreeGlobalIndexerFactory.IDENTIFIER);
-    }
-
-    private void innerTestMultipleIndices(String indexType) throws Exception {
         write(100000L);
-        createIndex(indexType, "f1");
-        createIndex(indexType, "f2");
+        createIndex("f1");
+        createIndex("f2");
 
         FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
         Predicate predicate1 =
@@ -217,7 +180,7 @@ public class GlobalIndexTableTest extends 
DataEvolutionTestBase {
         Assertions.assertThat(result).containsExactly("a200", "a56789");
     }
 
-    private void createIndex(String indexType, String fieldName) throws 
Exception {
+    private void createIndex(String fieldName) throws Exception {
         FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
         FileIO fileIO = table.fileIO();
         RowType rowType = 
SpecialFields.rowTypeWithRowTracking(table.rowType().project(fieldName));
@@ -231,25 +194,11 @@ public class GlobalIndexTableTest extends 
DataEvolutionTestBase {
                         
table.store().pathFactory().indexFileFactory(BinaryRow.EMPTY_ROW, 0));
 
         DataField indexField = table.rowType().getField(fieldName);
-        GlobalIndexerFactory globalIndexerFactory = 
GlobalIndexerFactoryUtils.load(indexType);
-
-        List<IndexFileMeta> indexFileMetas;
-        if (indexType.equals(BTreeGlobalIndexerFactory.IDENTIFIER)) {
-            indexFileMetas =
-                    createBTreeIndex(
-                            fileIO,
-                            globalIndexerFactory,
-                            indexField,
-                            rowType,
-                            reader,
-                            indexFileReadWrite);
-        } else if (indexType.equals(BitmapGlobalIndexerFactory.IDENTIFIER)) {
-            indexFileMetas =
-                    createBitmapIndex(
-                            fileIO, globalIndexerFactory, indexField, reader, 
indexFileReadWrite);
-        } else {
-            throw new Exception("Unsupported scalar index type: " + indexType);
-        }
+        GlobalIndexerFactory globalIndexerFactory =
+                
GlobalIndexerFactoryUtils.load(BitmapGlobalIndexerFactory.IDENTIFIER);
+
+        List<IndexFileMeta> indexFileMetas =
+                createBitmapIndex(globalIndexerFactory, indexField, reader, 
indexFileReadWrite);
 
         DataIncrement dataIncrement = 
DataIncrement.indexIncrement(indexFileMetas);
 
@@ -265,7 +214,6 @@ public class GlobalIndexTableTest extends 
DataEvolutionTestBase {
     }
 
     private List<IndexFileMeta> createBitmapIndex(
-            FileIO fileIO,
             GlobalIndexerFactory indexerFactory,
             DataField indexField,
             RecordReader<InternalRow> reader,
@@ -296,103 +244,6 @@ public class GlobalIndexTableTest extends 
DataEvolutionTestBase {
                         null));
     }
 
-    private List<IndexFileMeta> createBTreeIndex(
-            FileIO fileIO,
-            GlobalIndexerFactory indexerFactory,
-            DataField indexField,
-            RowType rowType,
-            RecordReader<InternalRow> reader,
-            GlobalIndexFileReadWrite indexFileReadWrite)
-            throws Exception {
-        Options options = new Options();
-        options.set(BTreeIndexOptions.BTREE_INDEX_CACHE_SIZE, 
MemorySize.ofMebiBytes(1));
-        GlobalIndexer globalIndexer = indexerFactory.create(indexField, 
options);
-
-        // collect all rows
-        List<InternalRow> rows = new ArrayList<>();
-        InternalRowSerializer rowSerializer = new 
InternalRowSerializer(rowType);
-        reader.forEachRemaining(row -> rows.add(rowSerializer.copy(row)));
-        // sort by row id
-        rows.sort(Comparator.comparing(row -> row.getLong(1)));
-        // intentionally split into two chunks, build index for each range
-        int midRow = rows.size() / 2;
-        Range firstRange = new Range(rows.get(0).getLong(1), 
rows.get(midRow).getLong(1));
-        Range secondRange =
-                new Range(rows.get(midRow + 1).getLong(1), 
rows.get(rows.size() - 1).getLong(1));
-
-        List<IndexFileMeta> indexFileMetas = new ArrayList<>();
-        indexFileMetas.addAll(
-                createBTreeIndexForRange(
-                        fileIO,
-                        globalIndexer,
-                        indexField,
-                        rows.subList(0, midRow + 1),
-                        indexFileReadWrite,
-                        firstRange));
-        indexFileMetas.addAll(
-                createBTreeIndexForRange(
-                        fileIO,
-                        globalIndexer,
-                        indexField,
-                        rows.subList(midRow + 1, rows.size()),
-                        indexFileReadWrite,
-                        secondRange));
-
-        return indexFileMetas;
-    }
-
-    private List<IndexFileMeta> createBTreeIndexForRange(
-            FileIO fileIO,
-            GlobalIndexer globalIndexer,
-            DataField indexField,
-            List<InternalRow> rowChunk,
-            GlobalIndexFileReadWrite indexFileReadWrite,
-            Range range)
-            throws Exception {
-        // collect all data and sort by index field.
-        // this will be done distributedly by Spark/Flink in production.
-        List<InternalRow> rows = new ArrayList<>(rowChunk);
-        rows.sort(Comparator.comparing(row -> row.getString(0).toString()));
-
-        int fileNum = 10;
-        int targetFileSize = rows.size() / fileNum;
-        int currentOffset = 0;
-
-        List<IndexFileMeta> indexFileMetas = new ArrayList<>();
-        for (int i = 0; i < fileNum; i++) {
-            // write btree file for each data chunk.
-            GlobalIndexParallelWriter writer =
-                    (GlobalIndexParallelWriter) 
globalIndexer.createWriter(indexFileReadWrite);
-            for (int j = currentOffset;
-                    j < Math.min(currentOffset + targetFileSize, rows.size());
-                    j++) {
-                InternalRow row = rows.get(j);
-                writer.write(row.getString(0), row.getLong(1) - range.from);
-            }
-            currentOffset += targetFileSize;
-
-            List<ResultEntry> entries = writer.finish();
-            Assertions.assertThat(entries).hasSize(1);
-            ResultEntry entry = entries.get(0);
-
-            String fileName = entry.fileName();
-            long fileSize = indexFileReadWrite.fileSize(fileName);
-            GlobalIndexMeta globalIndexMeta =
-                    new GlobalIndexMeta(range.from, range.to, indexField.id(), 
null, entry.meta());
-
-            indexFileMetas.add(
-                    new IndexFileMeta(
-                            BTreeGlobalIndexerFactory.IDENTIFIER,
-                            fileName,
-                            fileSize,
-                            entry.rowCount(),
-                            globalIndexMeta,
-                            null));
-        }
-
-        return indexFileMetas;
-    }
-
     private RoaringNavigableMap64 globalIndexScan(FileStoreTable table, 
Predicate predicate)
             throws Exception {
         GlobalIndexScanBuilder indexScanBuilder = 
table.store().newGlobalIndexScanBuilder();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
new file mode 100644
index 0000000000..b4f30f82e9
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.globalindex.DataEvolutionBatchScan;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.GlobalIndexScanBuilder;
+import org.apache.paimon.globalindex.RowRangeGlobalIndexScanner;
+import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
+import org.apache.paimon.globalindex.btree.BTreeGlobalIndexerFactory;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/** Test for BTree indexed batch scan. */
+public class BtreeGlobalIndexTableTest extends DataEvolutionTestBase {
+
+    @Test
+    public void testBTreeGlobalIndex() throws Exception {
+        write(100000L);
+        createIndex("f1");
+
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
+
+        Predicate predicate =
+                new PredicateBuilder(table.rowType()).equal(1, 
BinaryString.fromString("a100"));
+
+        RoaringNavigableMap64 rowIds = globalIndexScan(table, predicate);
+        assertNotNull(rowIds);
+        Assertions.assertThat(rowIds.getLongCardinality()).isEqualTo(1);
+        Assertions.assertThat(rowIds.toRangeList()).containsExactly(new 
Range(100L, 100L));
+
+        Predicate predicate2 =
+                new PredicateBuilder(table.rowType())
+                        .in(
+                                1,
+                                Arrays.asList(
+                                        BinaryString.fromString("a200"),
+                                        BinaryString.fromString("a300"),
+                                        BinaryString.fromString("a400")));
+
+        rowIds = globalIndexScan(table, predicate2);
+        assertNotNull(rowIds);
+        Assertions.assertThat(rowIds.getLongCardinality()).isEqualTo(3);
+        Assertions.assertThat(rowIds.toRangeList())
+                .containsExactlyInAnyOrder(
+                        new Range(200L, 200L), new Range(300L, 300L), new 
Range(400L, 400L));
+
+        DataEvolutionBatchScan scan = (DataEvolutionBatchScan) table.newScan();
+        RoaringNavigableMap64 finalRowIds = rowIds;
+        scan.withGlobalIndexResult(GlobalIndexResult.create(() -> 
finalRowIds));
+
+        List<String> readF1 = new ArrayList<>();
+        table.newRead()
+                .createReader(scan.plan())
+                .forEachRemaining(
+                        row -> {
+                            readF1.add(row.getString(1).toString());
+                        });
+
+        Assertions.assertThat(readF1).containsExactly("a200", "a300", "a400");
+    }
+
+    @Test
+    public void testBTreeGlobalIndexWithCoreScan() throws Exception {
+        write(100000L);
+        createIndex("f1");
+
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
+
+        Predicate predicate =
+                new PredicateBuilder(table.rowType())
+                        .in(
+                                1,
+                                Arrays.asList(
+                                        BinaryString.fromString("a200"),
+                                        BinaryString.fromString("a300"),
+                                        BinaryString.fromString("a400"),
+                                        BinaryString.fromString("a56789")));
+
+        ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate);
+
+        List<String> readF1 = new ArrayList<>();
+        readBuilder
+                .newRead()
+                .createReader(readBuilder.newScan().plan())
+                .forEachRemaining(
+                        row -> {
+                            readF1.add(row.getString(1).toString());
+                        });
+
+        Assertions.assertThat(readF1).containsExactly("a200", "a300", "a400", 
"a56789");
+    }
+
+    @Test
+    public void testMultipleBTreeIndices() throws Exception {
+        write(100000L);
+        createIndex("f1");
+        createIndex("f2");
+
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
+        Predicate predicate1 =
+                new PredicateBuilder(table.rowType())
+                        .in(
+                                1,
+                                Arrays.asList(
+                                        BinaryString.fromString("a200"),
+                                        BinaryString.fromString("a300"),
+                                        BinaryString.fromString("a56789")));
+
+        Predicate predicate2 =
+                new PredicateBuilder(table.rowType())
+                        .in(
+                                2,
+                                Arrays.asList(
+                                        BinaryString.fromString("b200"),
+                                        BinaryString.fromString("b400"),
+                                        BinaryString.fromString("b56789")));
+
+        Predicate predicate = PredicateBuilder.and(predicate1, predicate2);
+        ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate);
+
+        List<String> result = new ArrayList<>();
+        readBuilder
+                .newRead()
+                .createReader(readBuilder.newScan().plan())
+                .forEachRemaining(
+                        row -> {
+                            result.add(row.getString(1).toString());
+                        });
+
+        Assertions.assertThat(result).containsExactly("a200", "a56789");
+    }
+
+    private void createIndex(String fieldName) throws Exception {
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
+        BTreeGlobalIndexBuilder builder =
+                new BTreeGlobalIndexBuilder(table)
+                        .withIndexType(BTreeGlobalIndexerFactory.IDENTIFIER)
+                        .withIndexField(fieldName);
+        List<CommitMessage> commitMessages = builder.build(builder.scan(), 
ioManager);
+        try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
+            commit.commit(commitMessages);
+        }
+    }
+
+    private RoaringNavigableMap64 globalIndexScan(FileStoreTable table, 
Predicate predicate)
+            throws Exception {
+        GlobalIndexScanBuilder indexScanBuilder = 
table.store().newGlobalIndexScanBuilder();
+        List<Range> ranges = indexScanBuilder.shardList();
+        GlobalIndexResult globalFileIndexResult = 
GlobalIndexResult.createEmpty();
+        for (Range range : ranges) {
+            try (RowRangeGlobalIndexScanner scanner =
+                    indexScanBuilder.withRowRange(range).build()) {
+                Optional<GlobalIndexResult> globalIndexResult = 
scanner.scan(predicate, null);
+                if (!globalIndexResult.isPresent()) {
+                    throw new RuntimeException("Can't find index result by 
scan");
+                }
+                globalFileIndexResult = 
globalFileIndexResult.or(globalIndexResult.get());
+            }
+        }
+
+        return globalFileIndexResult.results();
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
index a533017779..1a423aeb1e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
@@ -28,6 +28,7 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.reader.RecordReader;
@@ -75,6 +76,7 @@ public abstract class TableTestBase {
     protected Catalog catalog;
     protected String database;
     @TempDir public java.nio.file.Path tempPath;
+    protected IOManager ioManager;
 
     @BeforeEach
     public void beforeEach() throws Catalog.DatabaseAlreadyExistException {
@@ -82,6 +84,7 @@ public abstract class TableTestBase {
         warehouse = new Path(TraceableFileIO.SCHEME + "://" + 
tempPath.toString());
         catalog = 
CatalogFactory.createCatalog(CatalogContext.create(warehouse));
         catalog.createDatabase(database, true);
+        this.ioManager = new IOManagerImpl(tempPath.toString());
     }
 
     @AfterEach
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index ed172b5e58..b077e9dcdb 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -25,7 +25,6 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.JoinedRow;
 import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.lookup.FullCacheLookupTable.TableBulkLoader;
 import org.apache.paimon.fs.FileIO;
@@ -89,7 +88,6 @@ public class LookupTableTest extends TableTestBase {
 
     @TempDir Path tempDir;
     private RowType rowType;
-    private IOManager ioManager;
     private FullCacheLookupTable table;
 
     public LookupTableTest(boolean inMemory) {
@@ -105,7 +103,6 @@ public class LookupTableTest extends TableTestBase {
     @BeforeEach
     public void before() throws IOException {
         this.rowType = RowType.of(new IntType(), new IntType(), new IntType());
-        this.ioManager = new IOManagerImpl(tempDir.toString());
     }
 
     @AfterEach
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
index e0a6c0e353..fc8c6b83cb 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
@@ -39,8 +39,8 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 
-import static 
org.apache.paimon.spark.globalindex.GlobalIndexBuilderUtils.createIndexWriter;
-import static 
org.apache.paimon.spark.globalindex.GlobalIndexBuilderUtils.toIndexFileMetas;
+import static 
org.apache.paimon.globalindex.GlobalIndexBuilderUtils.createIndexWriter;
+import static 
org.apache.paimon.globalindex.GlobalIndexBuilderUtils.toIndexFileMetas;
 
 /** Default global index builder. */
 public class DefaultGlobalIndexBuilder implements Serializable {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilder.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilder.java
deleted file mode 100644
index 0f520c5884..0000000000
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilder.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.globalindex.btree;
-
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
-import org.apache.paimon.globalindex.GlobalIndexWriter;
-import org.apache.paimon.globalindex.ResultEntry;
-import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
-import org.apache.paimon.index.IndexFileMeta;
-import org.apache.paimon.io.CompactIncrement;
-import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.spark.globalindex.RowIdIndexFieldsExtractor;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.SpecialFields;
-import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.sink.CommitMessageImpl;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.CloseableIterator;
-import org.apache.paimon.utils.Range;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-import static 
org.apache.paimon.spark.globalindex.GlobalIndexBuilderUtils.createIndexWriter;
-import static 
org.apache.paimon.spark.globalindex.GlobalIndexBuilderUtils.toIndexFileMetas;
-
-/**
- * A global index builder implementation for BTree Index. The caller of {@link
- * #build(CloseableIterator) build} must ensure the input data is sorted by 
partitions and indexed
- * field.
- */
-public class BTreeGlobalIndexBuilder implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final double FLOATING = 1.2;
-
-    private final FileStoreTable table;
-    private final DataField indexField;
-    private final String indexType;
-    private final Range rowRange;
-    private final Options options;
-    private final RowIdIndexFieldsExtractor extractor;
-    private final long recordsPerRange;
-
-    public BTreeGlobalIndexBuilder(
-            FileStoreTable table,
-            RowType readType,
-            DataField indexField,
-            String indexType,
-            Range rowRange,
-            Options options) {
-        this.table = table;
-        this.indexField = indexField;
-        this.indexType = indexType;
-        this.rowRange = rowRange;
-        this.options = options;
-        List<String> readColumns = new ArrayList<>(table.partitionKeys());
-        readColumns.addAll(readType.getFieldNames());
-        this.extractor =
-                new RowIdIndexFieldsExtractor(
-                        
SpecialFields.rowTypeWithRowId(table.rowType()).project(readColumns),
-                        table.partitionKeys(),
-                        indexField.name());
-
-        // Each partition boundary is derived from sampling, so we introduce a 
slack factor
-        // to avoid generating too many small files due to sampling variance.
-        this.recordsPerRange =
-                (long) 
(options.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE) * FLOATING);
-    }
-
-    public List<CommitMessage> build(CloseableIterator<InternalRow> data) 
throws IOException {
-        long counter = 0;
-        BinaryRow currentPart = null;
-        GlobalIndexParallelWriter currentWriter = null;
-        List<CommitMessage> commitMessages = new ArrayList<>();
-
-        while (data.hasNext()) {
-            InternalRow row = data.next();
-            BinaryRow partRow = extractor.extractPartition(row);
-
-            // the input is sorted by <partition, indexedField>
-            if (currentWriter != null) {
-                if (!Objects.equals(partRow, currentPart) || counter >= 
recordsPerRange) {
-                    commitMessages.add(flushIndex(currentWriter.finish(), 
currentPart));
-                    currentWriter = null;
-                    counter = 0;
-                }
-            }
-
-            // write <value, rowId> pair to index file
-            currentPart = partRow;
-            counter++;
-
-            if (currentWriter == null) {
-                currentWriter = createWriter();
-            }
-
-            // convert the original rowId to local rowId
-            long localRowId = extractor.extractRowId(row) - rowRange.from;
-            currentWriter.write(extractor.extractIndexField(row), localRowId);
-        }
-
-        if (counter > 0) {
-            commitMessages.add(flushIndex(currentWriter.finish(), 
currentPart));
-        }
-
-        return commitMessages;
-    }
-
-    private GlobalIndexParallelWriter createWriter() throws IOException {
-        GlobalIndexParallelWriter currentWriter;
-        GlobalIndexWriter indexWriter = createIndexWriter(table, indexType, 
indexField, options);
-        if (!(indexWriter instanceof GlobalIndexParallelWriter)) {
-            throw new RuntimeException(
-                    "Unexpected implementation, the index writer of BTree 
should be an instance of GlobalIndexParallelWriter, but found: "
-                            + indexWriter.getClass().getName());
-        }
-        currentWriter = (GlobalIndexParallelWriter) indexWriter;
-        return currentWriter;
-    }
-
-    private CommitMessage flushIndex(List<ResultEntry> resultEntries, 
BinaryRow partition)
-            throws IOException {
-        List<IndexFileMeta> indexFileMetas =
-                toIndexFileMetas(table, rowRange, indexField.id(), indexType, 
resultEntries);
-        DataIncrement dataIncrement = 
DataIncrement.indexIncrement(indexFileMetas);
-        return new CommitMessageImpl(
-                partition, 0, null, dataIncrement, 
CompactIncrement.emptyIncrement());
-    }
-}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
index 68e77cbf14..0b626da8a3 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
@@ -19,8 +19,8 @@
 package org.apache.paimon.spark.globalindex.btree;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
 import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
-import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.spark.SparkRow;
@@ -31,10 +31,8 @@ import org.apache.paimon.table.SpecialFields;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageSerializer;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.CloseableIterator;
 import org.apache.paimon.utils.InstantiationUtil;
 import org.apache.paimon.utils.Range;
 
@@ -54,6 +52,8 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
+import static 
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.calcRowRange;
+
 /** The {@link GlobalIndexTopologyBuilder} for BTree index. */
 public class BTreeIndexTopoBuilder implements GlobalIndexTopologyBuilder {
 
@@ -73,16 +73,18 @@ public class BTreeIndexTopoBuilder implements 
GlobalIndexTopologyBuilder {
             DataField indexField,
             Options options)
             throws IOException {
-
         // 1. read the whole dataset of target partitions
-        SnapshotReader snapshotReader = table.newSnapshotReader();
+        BTreeGlobalIndexBuilder indexBuilder =
+                new BTreeGlobalIndexBuilder(table)
+                        .withIndexType(indexType)
+                        .withIndexField(indexField.name());
         if (partitionPredicate != null) {
-            snapshotReader = 
snapshotReader.withPartitionFilter(partitionPredicate);
+            indexBuilder = 
indexBuilder.withPartitionPredicate(partitionPredicate);
         }
 
-        List<DataSplit> dataSplits = snapshotReader.read().dataSplits();
-        Range range = calcRowRange(dataSplits);
-        if (dataSplits.isEmpty() || range == null) {
+        List<DataSplit> splits = indexBuilder.scan();
+        Range range = calcRowRange(splits);
+        if (splits.isEmpty() || range == null) {
             return Collections.emptyList();
         }
 
@@ -95,7 +97,7 @@ public class BTreeIndexTopoBuilder implements 
GlobalIndexTopologyBuilder {
                 PaimonUtils.createDataset(
                         spark,
                         ScanPlanHelper$.MODULE$.createNewScanPlan(
-                                dataSplits.toArray(new DataSplit[0]), 
relation));
+                                splits.toArray(new DataSplit[0]), relation));
 
         Dataset<Row> selected =
                 
source.select(selectedColumns.stream().map(functions::col).toArray(Column[]::new));
@@ -124,10 +126,7 @@ public class BTreeIndexTopoBuilder implements 
GlobalIndexTopologyBuilder {
                         .sortWithinPartitions(sortFields);
 
         // 3. write index for each partition & range
-        final byte[] builderBytes =
-                InstantiationUtil.serializeObject(
-                        new BTreeGlobalIndexBuilder(
-                                table, readType, indexField, indexType, range, 
options));
+        final byte[] builderBytes = 
InstantiationUtil.serializeObject(indexBuilder);
         final RowType rowType =
                 
SpecialFields.rowTypeWithRowId(table.rowType()).project(selectedColumns);
         JavaRDD<byte[]> written =
@@ -145,9 +144,7 @@ public class BTreeIndexTopoBuilder implements 
GlobalIndexTopologyBuilder {
                                                             
BTreeGlobalIndexBuilder.class
                                                                     
.getClassLoader());
                                             List<CommitMessage> commitMessages 
=
-                                                    builder.build(
-                                                            
CloseableIterator.adapterForIterator(
-                                                                    iter));
+                                                    builder.build(range, iter);
                                             List<byte[]> messageBytes = new 
ArrayList<>();
 
                                             for (CommitMessage commitMessage : 
commitMessages) {
@@ -171,17 +168,4 @@ public class BTreeIndexTopoBuilder implements 
GlobalIndexTopologyBuilder {
 
         return result;
     }
-
-    private Range calcRowRange(List<DataSplit> dataSplits) {
-        long start = Long.MAX_VALUE;
-        long end = Long.MIN_VALUE;
-        for (DataSplit dataSplit : dataSplits) {
-            for (DataFileMeta file : dataSplit.dataFiles()) {
-                long firstRowId = file.nonNullFirstRowId();
-                start = Math.min(start, firstRowId);
-                end = Math.max(end, firstRowId + file.rowCount());
-            }
-        }
-        return start == Long.MAX_VALUE ? null : new Range(start, end);
-    }
 }

Reply via email to