This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 71932f75c9 [core] Refactor scalar index eval and apply pre filter to 
vector search (#7513)
71932f75c9 is described below

commit 71932f75c93a6bffde7276332a558625b2ce6d9f
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Mar 24 18:55:24 2026 +0800

    [core] Refactor scalar index eval and apply pre filter to vector search 
(#7513)
    
    This PR did two core things:
    1. Refactoring the scalar index evaluation logic - removed
    GlobalIndexScanBuilder/GlobalIndexScanBuilderImpl and inline its
    functionality into GlobalIndexScanner, simplifying the API
    2. Introducing pre filter mechanism for vector search - splitting scan
    and read into independent VectorScanImpl/VectorReadImpl classes,
    introducing VectorSearchSplit as an intermediate data structure, and
    supporting pre filtering with scalar indexes before vector search
    
    And this PR no longer queries unindexed files. If there is a global
    index, we only need to query from the global index.
---
 .../paimon/globalindex/UnionGlobalIndexReader.java |  40 ++-
 .../java/org/apache/paimon/AbstractFileStore.java  |  13 -
 .../src/main/java/org/apache/paimon/FileStore.java |   3 -
 .../paimon/globalindex/DataEvolutionBatchScan.java |  43 +--
 .../paimon/globalindex/GlobalIndexScanBuilder.java |  95 -------
 .../globalindex/GlobalIndexScanBuilderImpl.java    | 187 ------------
 ...alIndexScanner.java => GlobalIndexScanner.java} |  85 ++++--
 .../org/apache/paimon/index/GlobalIndexMeta.java   |   5 +
 .../paimon/privilege/PrivilegedFileStore.java      |   6 -
 .../org/apache/paimon/table/source/VectorRead.java |   8 +-
 .../apache/paimon/table/source/VectorReadImpl.java | 163 +++++++++++
 .../org/apache/paimon/table/source/VectorScan.java |  17 +-
 .../apache/paimon/table/source/VectorScanImpl.java | 125 ++++++++
 .../table/source/VectorSearchBuilderImpl.java      | 138 +--------
 .../paimon/table/source/VectorSearchSplit.java     | 130 +++++++++
 .../paimon/table/BitmapGlobalIndexTableTest.java   |  22 +-
 .../paimon/table/BtreeGlobalIndexTableTest.java    |  62 +---
 .../table/source/VectorSearchBuilderTest.java      | 315 +++++++++++++++++++++
 paimon-python/pypaimon/globalindex/__init__.py     |   8 +-
 .../pypaimon/globalindex/global_index_evaluator.py |  36 +--
 .../pypaimon/globalindex/global_index_reader.py    |   6 +-
 .../globalindex/global_index_scan_builder.py       | 202 -------------
 .../globalindex/global_index_scan_builder_impl.py  | 165 -----------
 .../pypaimon/globalindex/global_index_scanner.py   | 162 +++++++++++
 paimon-python/pypaimon/read/read_builder.py        |   9 +-
 .../pypaimon/read/scanner/file_scanner.py          |  68 +----
 paimon-python/pypaimon/read/table_scan.py          |  15 +-
 paimon-python/pypaimon/table/file_store_table.py   |  19 --
 .../procedure/CreateGlobalIndexProcedureTest.scala |   2 -
 29 files changed, 1047 insertions(+), 1102 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
index 2c02a0c484..b585a75570 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
@@ -21,10 +21,19 @@ package org.apache.paimon.globalindex;
 import org.apache.paimon.predicate.FieldRef;
 import org.apache.paimon.predicate.VectorSearch;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.paimon.utils.ThreadPoolUtils.randomlyExecuteSequentialReturn;
 
 /**
  * A {@link GlobalIndexReader} that combines results from multiple readers by 
performing a union
@@ -33,9 +42,16 @@ import java.util.function.Function;
 public class UnionGlobalIndexReader implements GlobalIndexReader {
 
     private final List<GlobalIndexReader> readers;
+    private final @Nullable ExecutorService executor;
 
     public UnionGlobalIndexReader(List<GlobalIndexReader> readers) {
+        this(readers, null);
+    }
+
+    public UnionGlobalIndexReader(
+            List<GlobalIndexReader> readers, @Nullable ExecutorService 
executor) {
         this.readers = readers;
+        this.executor = executor;
     }
 
     @Override
@@ -116,8 +132,9 @@ public class UnionGlobalIndexReader implements 
GlobalIndexReader {
     @Override
     public Optional<ScoredGlobalIndexResult> visitVectorSearch(VectorSearch 
vectorSearch) {
         Optional<ScoredGlobalIndexResult> result = Optional.empty();
-        for (GlobalIndexReader reader : readers) {
-            Optional<ScoredGlobalIndexResult> current = 
reader.visitVectorSearch(vectorSearch);
+        List<Optional<ScoredGlobalIndexResult>> results =
+                executeAllReaders(reader -> 
reader.visitVectorSearch(vectorSearch));
+        for (Optional<ScoredGlobalIndexResult> current : results) {
             if (!current.isPresent()) {
                 continue;
             }
@@ -132,8 +149,8 @@ public class UnionGlobalIndexReader implements 
GlobalIndexReader {
     private Optional<GlobalIndexResult> union(
             Function<GlobalIndexReader, Optional<GlobalIndexResult>> visitor) {
         Optional<GlobalIndexResult> result = Optional.empty();
-        for (GlobalIndexReader reader : readers) {
-            Optional<GlobalIndexResult> current = visitor.apply(reader);
+        List<Optional<GlobalIndexResult>> results = executeAllReaders(visitor);
+        for (Optional<GlobalIndexResult> current : results) {
             if (!current.isPresent()) {
                 continue;
             }
@@ -145,6 +162,21 @@ public class UnionGlobalIndexReader implements 
GlobalIndexReader {
         return result;
     }
 
+    private <R> List<R> executeAllReaders(Function<GlobalIndexReader, R> 
function) {
+        if (executor == null) {
+            return readers.stream().map(function).collect(Collectors.toList());
+        }
+
+        Iterator<R> iterator =
+                randomlyExecuteSequentialReturn(
+                        executor, reader -> 
singletonList(function.apply(reader)), readers);
+        List<R> result = new ArrayList<>();
+        while (iterator.hasNext()) {
+            result.add(iterator.next());
+        }
+        return result;
+    }
+
     @Override
     public void close() throws IOException {
         for (GlobalIndexReader reader : readers) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 35f7a42a6e..388b7eb9de 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -26,8 +26,6 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.globalindex.GlobalIndexScanBuilder;
-import org.apache.paimon.globalindex.GlobalIndexScanBuilderImpl;
 import org.apache.paimon.iceberg.IcebergCommitCallback;
 import org.apache.paimon.iceberg.IcebergOptions;
 import org.apache.paimon.index.IndexFileHandler;
@@ -523,15 +521,4 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
     public void setSnapshotCache(Cache<Path, Snapshot> cache) {
         this.snapshotCache = cache;
     }
-
-    @Override
-    public GlobalIndexScanBuilder newGlobalIndexScanBuilder() {
-        return new GlobalIndexScanBuilderImpl(
-                options.toConfiguration(),
-                schema.logicalRowType(),
-                fileIO,
-                pathFactory().globalIndexFileFactory(),
-                snapshotManager(),
-                newIndexFileHandler());
-    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 1ccd19ca11..98905b47e5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -19,7 +19,6 @@
 package org.apache.paimon;
 
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.globalindex.GlobalIndexScanBuilder;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.manifest.IndexManifestFile;
 import org.apache.paimon.manifest.ManifestFile;
@@ -125,6 +124,4 @@ public interface FileStore<T> {
     void setManifestCache(SegmentsCache<Path> manifestCache);
 
     void setSnapshotCache(Cache<Path, Snapshot> cache);
-
-    GlobalIndexScanBuilder newGlobalIndexScanBuilder();
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
index 84f40243f2..af169f4c6d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.globalindex;
 
-import org.apache.paimon.Snapshot;
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
@@ -36,7 +36,6 @@ import org.apache.paimon.table.source.DataTableBatchScan;
 import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Range;
@@ -44,15 +43,14 @@ import org.apache.paimon.utils.RowRangeIndex;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.function.Function;
 
-import static 
org.apache.paimon.globalindex.GlobalIndexScanBuilder.parallelScan;
 import static org.apache.paimon.table.SpecialFields.ROW_ID;
 import static 
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
 
@@ -263,39 +261,18 @@ public class DataEvolutionBatchScan implements 
DataTableScan {
         if (filter == null) {
             return Optional.empty();
         }
-        if (!table.coreOptions().globalIndexEnabled()) {
+        CoreOptions options = table.coreOptions();
+        if (!options.globalIndexEnabled()) {
             return Optional.empty();
         }
-        PartitionPredicate partitionPredicate =
+        PartitionPredicate partitionFilter =
                 batchScan.snapshotReader().manifestsReader().partitionFilter();
-        GlobalIndexScanBuilder indexScanBuilder = 
table.store().newGlobalIndexScanBuilder();
-        Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(table);
-        
indexScanBuilder.withPartitionPredicate(partitionPredicate).withSnapshot(snapshot);
-        List<Range> indexedRowRanges = indexScanBuilder.shardList();
-        if (indexedRowRanges.isEmpty()) {
-            return Optional.empty();
-        }
-
-        Long nextRowId = Objects.requireNonNull(snapshot.nextRowId());
-        List<Range> nonIndexedRowRanges = new Range(0, nextRowId - 
1).exclude(indexedRowRanges);
-        Optional<GlobalIndexResult> resultOptional =
-                parallelScan(
-                        indexedRowRanges,
-                        indexScanBuilder,
-                        filter,
-                        table.coreOptions().globalIndexThreadNum());
-        if (!resultOptional.isPresent()) {
-            return Optional.empty();
+        try (GlobalIndexScanner scanner =
+                GlobalIndexScanner.create(table, partitionFilter, filter)) {
+            return scanner.scan(filter);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
         }
-
-        GlobalIndexResult result = resultOptional.get();
-        if (!nonIndexedRowRanges.isEmpty()) {
-            for (Range range : nonIndexedRowRanges) {
-                result = result.or(GlobalIndexResult.fromRange(range));
-            }
-        }
-
-        return Optional.of(result);
     }
 
     @VisibleForTesting
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilder.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilder.java
deleted file mode 100644
index 5c4ec92f81..0000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilder.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.globalindex;
-
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.partition.PartitionPredicate;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.utils.IOUtils;
-import org.apache.paimon.utils.Range;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import static 
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
-
-/** Builder for scanning global indexes. */
-public interface GlobalIndexScanBuilder {
-
-    GlobalIndexScanBuilder withSnapshot(long snapshotId);
-
-    GlobalIndexScanBuilder withSnapshot(Snapshot snapshot);
-
-    GlobalIndexScanBuilder withPartitionPredicate(PartitionPredicate 
partitionPredicate);
-
-    GlobalIndexScanBuilder withRowRange(Range rowRange);
-
-    RowRangeGlobalIndexScanner build();
-
-    // Return sorted and no overlap ranges
-    List<Range> shardList();
-
-    static Optional<GlobalIndexResult> parallelScan(
-            final List<Range> ranges,
-            final GlobalIndexScanBuilder globalIndexScanBuilder,
-            final Predicate filter,
-            final Integer threadNum) {
-        List<RowRangeGlobalIndexScanner> scanners =
-                ranges.stream()
-                        .map(globalIndexScanBuilder::withRowRange)
-                        .map(GlobalIndexScanBuilder::build)
-                        .collect(Collectors.toList());
-
-        try {
-            List<Optional<GlobalIndexResult>> rowsResults = new ArrayList<>();
-            Iterator<Optional<GlobalIndexResult>> resultIterators =
-                    randomlyExecuteSequentialReturn(
-                            scanner -> {
-                                Optional<GlobalIndexResult> result = 
scanner.scan(filter);
-                                return Collections.singletonList(result);
-                            },
-                            scanners,
-                            threadNum);
-            while (resultIterators.hasNext()) {
-                rowsResults.add(resultIterators.next());
-            }
-            if (rowsResults.stream().noneMatch(Optional::isPresent)) {
-                return Optional.empty();
-            }
-
-            GlobalIndexResult globalIndexResult = 
GlobalIndexResult.createEmpty();
-
-            for (int i = 0; i < ranges.size(); i++) {
-                if (rowsResults.get(i).isPresent()) {
-                    globalIndexResult = 
globalIndexResult.or(rowsResults.get(i).get());
-                } else {
-                    globalIndexResult =
-                            
globalIndexResult.or(GlobalIndexResult.fromRange(ranges.get(i)));
-                }
-            }
-            return Optional.of(globalIndexResult);
-        } finally {
-            IOUtils.closeAllQuietly(scanners);
-        }
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilderImpl.java
deleted file mode 100644
index 4619bb77d8..0000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilderImpl.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.globalindex;
-
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.index.GlobalIndexMeta;
-import org.apache.paimon.index.IndexFileHandler;
-import org.apache.paimon.index.IndexPathFactory;
-import org.apache.paimon.manifest.IndexManifestEntry;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.partition.PartitionPredicate;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.Filter;
-import org.apache.paimon.utils.Range;
-import org.apache.paimon.utils.SnapshotManager;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-/** Implementation of {@link GlobalIndexScanBuilder}. */
-public class GlobalIndexScanBuilderImpl implements GlobalIndexScanBuilder {
-
-    private final Options options;
-    private final RowType rowType;
-    private final FileIO fileIO;
-    private final IndexPathFactory indexPathFactory;
-    private final SnapshotManager snapshotManager;
-    private final IndexFileHandler indexFileHandler;
-
-    private Snapshot snapshot;
-    private PartitionPredicate partitionPredicate;
-    private Range rowRange;
-
-    public GlobalIndexScanBuilderImpl(
-            Options options,
-            RowType rowType,
-            FileIO fileIO,
-            IndexPathFactory indexPathFactory,
-            SnapshotManager snapshotManager,
-            IndexFileHandler indexFileHandler) {
-        this.options = options;
-        this.rowType = rowType;
-        this.fileIO = fileIO;
-        this.indexPathFactory = indexPathFactory;
-        this.snapshotManager = snapshotManager;
-        this.indexFileHandler = indexFileHandler;
-    }
-
-    @Override
-    public GlobalIndexScanBuilder withSnapshot(long snapshotId) {
-        this.snapshot = snapshotManager.snapshot(snapshotId);
-        return this;
-    }
-
-    @Override
-    public GlobalIndexScanBuilder withSnapshot(Snapshot snapshot) {
-        this.snapshot = snapshot;
-        return this;
-    }
-
-    @Override
-    public GlobalIndexScanBuilder withPartitionPredicate(PartitionPredicate 
partitionPredicate) {
-        this.partitionPredicate = partitionPredicate;
-        return this;
-    }
-
-    @Override
-    public GlobalIndexScanBuilder withRowRange(Range rowRange) {
-        this.rowRange = rowRange;
-        return this;
-    }
-
-    @Override
-    public RowRangeGlobalIndexScanner build() {
-        Objects.requireNonNull(rowRange, "rowRange must not be null");
-        List<IndexManifestEntry> entries = scan();
-        return new RowRangeGlobalIndexScanner(
-                options, rowType, fileIO, indexPathFactory, rowRange, entries);
-    }
-
-    @Override
-    public List<Range> shardList() {
-        Map<String, List<Range>> indexRanges = new HashMap<>();
-        for (IndexManifestEntry entry : scan()) {
-            GlobalIndexMeta globalIndexMeta = 
entry.indexFile().globalIndexMeta();
-
-            if (globalIndexMeta == null) {
-                continue;
-            }
-            long start = globalIndexMeta.rowRangeStart();
-            long end = globalIndexMeta.rowRangeEnd();
-            indexRanges
-                    .computeIfAbsent(entry.indexFile().indexType(), k -> new 
ArrayList<>())
-                    .add(new Range(start, end));
-        }
-
-        String checkIndexType = null;
-        List<Range> checkRanges = null;
-        // check all type index have same shard ranges
-        // If index a has [1,10],[20,30] and index b has [1,10],[20,25], it's 
inconsistent, because
-        // it is hard to handle the [26,30] range.
-        for (Map.Entry<String, List<Range>> rangeEntry : 
indexRanges.entrySet()) {
-            String indexType = rangeEntry.getKey();
-            List<Range> ranges = rangeEntry.getValue();
-            if (checkRanges == null) {
-                checkIndexType = indexType;
-                checkRanges = Range.sortAndMergeOverlap(ranges, true);
-            } else {
-                List<Range> merged = Range.sortAndMergeOverlap(ranges, true);
-                if (merged.size() != checkRanges.size()) {
-                    throw new IllegalStateException(
-                            "Inconsistent shard ranges among index types: "
-                                    + checkIndexType
-                                    + " vs "
-                                    + indexType);
-                }
-                for (int i = 0; i < merged.size(); i++) {
-                    Range r1 = merged.get(i);
-                    Range r2 = checkRanges.get(i);
-                    if (r1.from != r2.from || r1.to != r2.to) {
-                        throw new IllegalStateException(
-                                "Inconsistent shard ranges among index types:"
-                                        + checkIndexType
-                                        + " vs "
-                                        + indexType);
-                    }
-                }
-            }
-        }
-
-        return Range.sortAndMergeOverlap(
-                indexRanges.values().stream()
-                        .flatMap(Collection::stream)
-                        .collect(Collectors.toList()));
-    }
-
-    private List<IndexManifestEntry> scan() {
-        Filter<IndexManifestEntry> filter =
-                entry -> {
-                    if (partitionPredicate != null) {
-                        if (!partitionPredicate.test(entry.partition())) {
-                            return false;
-                        }
-                    }
-                    if (rowRange != null) {
-                        GlobalIndexMeta globalIndexMeta = 
entry.indexFile().globalIndexMeta();
-                        if (globalIndexMeta == null) {
-                            return false;
-                        }
-                        long entryStart = globalIndexMeta.rowRangeStart();
-                        long entryEnd = globalIndexMeta.rowRangeEnd();
-
-                        if (!Range.intersect(entryStart, entryEnd, 
rowRange.from, rowRange.to)) {
-                            return false;
-                        }
-                    }
-                    return true;
-                };
-
-        Snapshot snapshot =
-                this.snapshot == null ? snapshotManager.latestSnapshot() : 
this.snapshot;
-
-        return indexFileHandler.scan(snapshot, filter);
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/RowRangeGlobalIndexScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java
similarity index 67%
rename from 
paimon-core/src/main/java/org/apache/paimon/globalindex/RowRangeGlobalIndexScanner.java
rename to 
paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java
index 044790e7de..1312f17b43 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/RowRangeGlobalIndexScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java
@@ -26,9 +26,13 @@ import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.index.IndexPathFactory;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.ManifestReadThreadPool;
 import org.apache.paimon.utils.Range;
 
 import java.io.Closeable;
@@ -42,57 +46,46 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.function.IntFunction;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.CoreOptions.GLOBAL_INDEX_THREAD_NUM;
+import static org.apache.paimon.predicate.PredicateVisitor.collectFieldNames;
+import static 
org.apache.paimon.table.source.snapshot.TimeTravelUtil.tryTravelOrLatest;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** Scanner for shard-based global indexes. */
-public class RowRangeGlobalIndexScanner implements Closeable {
+public class GlobalIndexScanner implements Closeable {
 
     private final Options options;
+    private final ExecutorService executor;
     private final GlobalIndexEvaluator globalIndexEvaluator;
     private final IndexPathFactory indexPathFactory;
 
-    public RowRangeGlobalIndexScanner(
+    public GlobalIndexScanner(
             Options options,
             RowType rowType,
             FileIO fileIO,
             IndexPathFactory indexPathFactory,
-            Range range,
-            List<IndexManifestEntry> entries) {
+            Collection<IndexFileMeta> indexFiles) {
         this.options = options;
-        for (IndexManifestEntry entry : entries) {
-            GlobalIndexMeta meta = entry.indexFile().globalIndexMeta();
-            checkArgument(
-                    meta != null
-                            && Range.intersect(
-                                    range.from, range.to, 
meta.rowRangeStart(), meta.rowRangeEnd()),
-                    "All index files must have an intersection with row range 
["
-                            + range.from
-                            + ", "
-                            + range.to
-                            + ")");
-        }
-
+        this.executor =
+                
ManifestReadThreadPool.getExecutorService(options.get(GLOBAL_INDEX_THREAD_NUM));
         this.indexPathFactory = indexPathFactory;
-
         GlobalIndexFileReader indexFileReader = meta -> 
fileIO.newInputStream(meta.filePath());
-
         Map<Integer, Map<String, Map<Range, List<IndexFileMeta>>>> indexMetas 
= new HashMap<>();
-        for (IndexManifestEntry entry : entries) {
-            GlobalIndexMeta meta = entry.indexFile().globalIndexMeta();
-            checkArgument(meta != null, "Global index meta must not be null");
+        for (IndexFileMeta indexFile : indexFiles) {
+            GlobalIndexMeta meta = checkNotNull(indexFile.globalIndexMeta());
             int fieldId = meta.indexFieldId();
-            String indexType = entry.indexFile().indexType();
+            String indexType = indexFile.indexType();
             indexMetas
                     .computeIfAbsent(fieldId, k -> new HashMap<>())
                     .computeIfAbsent(indexType, k -> new HashMap<>())
                     .computeIfAbsent(
-                            new Range(meta.rowRangeStart(), 
meta.rowRangeStart()),
+                            new Range(meta.rowRangeStart(), 
meta.rowRangeEnd()),
                             k -> new ArrayList<>())
-                    .add(entry.indexFile());
+                    .add(indexFile);
         }
 
         IntFunction<Collection<GlobalIndexReader>> readersFunction =
@@ -104,6 +97,44 @@ public class RowRangeGlobalIndexScanner implements 
Closeable {
         this.globalIndexEvaluator = new GlobalIndexEvaluator(rowType, 
readersFunction);
     }
 
+    public static GlobalIndexScanner create(
+            FileStoreTable table, Collection<IndexFileMeta> indexFiles) {
+        return new GlobalIndexScanner(
+                table.coreOptions().toConfiguration(),
+                table.rowType(),
+                table.fileIO(),
+                table.store().pathFactory().globalIndexFileFactory(),
+                indexFiles);
+    }
+
+    public static GlobalIndexScanner create(
+            FileStoreTable table, PartitionPredicate partitionFilter, 
Predicate filter) {
+        Set<Integer> filterFieldIds =
+                collectFieldNames(filter).stream()
+                        .filter(name -> table.rowType().containsField(name))
+                        .map(name -> table.rowType().getField(name).id())
+                        .collect(Collectors.toSet());
+        Filter<IndexManifestEntry> indexFileFilter =
+                entry -> {
+                    if (partitionFilter != null && 
!partitionFilter.test(entry.partition())) {
+                        return false;
+                    }
+                    GlobalIndexMeta globalIndex = 
entry.indexFile().globalIndexMeta();
+                    if (globalIndex == null) {
+                        return false;
+                    }
+                    return filterFieldIds.contains(globalIndex.indexFieldId());
+                };
+
+        List<IndexFileMeta> indexFiles =
+                
table.store().newIndexFileHandler().scan(tryTravelOrLatest(table), 
indexFileFilter)
+                        .stream()
+                        .map(IndexManifestEntry::indexFile)
+                        .collect(Collectors.toList());
+
+        return create(table, indexFiles);
+    }
+
     public Optional<GlobalIndexResult> scan(Predicate predicate) {
         return globalIndexEvaluator.evaluate(predicate);
     }
@@ -142,7 +173,7 @@ public class RowRangeGlobalIndexScanner implements 
Closeable {
                     unionReader.add(innerReader);
                 }
 
-                readers.add(new UnionGlobalIndexReader(unionReader));
+                readers.add(new UnionGlobalIndexReader(unionReader, executor));
             }
         } catch (IOException e) {
             throw new RuntimeException("Failed to create global index reader", 
e);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/GlobalIndexMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/index/GlobalIndexMeta.java
index 5efe920832..c468bbffb3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/GlobalIndexMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/GlobalIndexMeta.java
@@ -23,6 +23,7 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Range;
 
 import javax.annotation.Nullable;
 
@@ -69,6 +70,10 @@ public class GlobalIndexMeta {
         return rowRangeEnd;
     }
 
+    public Range rowRange() {
+        return new Range(rowRangeStart, rowRangeEnd);
+    }
+
     public int indexFieldId() {
         return indexFieldId;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index 917301a2e5..6ced52bbd1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -23,7 +23,6 @@ import org.apache.paimon.FileStore;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.globalindex.GlobalIndexScanBuilder;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.manifest.IndexManifestFile;
 import org.apache.paimon.manifest.ManifestFile;
@@ -239,9 +238,4 @@ public class PrivilegedFileStore<T> implements FileStore<T> 
{
     public void setSnapshotCache(Cache<Path, Snapshot> cache) {
         wrapped.setSnapshotCache(cache);
     }
-
-    @Override
-    public GlobalIndexScanBuilder newGlobalIndexScanBuilder() {
-        return wrapped.newGlobalIndexScanBuilder();
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorRead.java
index 4fc3c7aaf0..74e17e2845 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorRead.java
@@ -20,8 +20,14 @@ package org.apache.paimon.table.source;
 
 import org.apache.paimon.globalindex.GlobalIndexResult;
 
+import java.util.List;
+
 /** Vector read to read index files. */
 public interface VectorRead {
 
-    GlobalIndexResult read(VectorScan.Plan plan);
+    default GlobalIndexResult read(VectorScan.Plan plan) {
+        return read(plan.splits());
+    }
+
+    GlobalIndexResult read(List<VectorSearchSplit> splits);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorReadImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorReadImpl.java
new file mode 100644
index 0000000000..49708a0a2a
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorReadImpl.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.GlobalIndexScanner;
+import org.apache.paimon.globalindex.GlobalIndexer;
+import org.apache.paimon.globalindex.GlobalIndexerFactoryUtils;
+import org.apache.paimon.globalindex.OffsetGlobalIndexReader;
+import org.apache.paimon.globalindex.ScoredGlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.index.GlobalIndexMeta;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.index.IndexPathFactory;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.VectorSearch;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** Implementation for {@link VectorRead}. */
+public class VectorReadImpl implements VectorRead {
+
+    private final FileStoreTable table;
+    private final Predicate filter;
+    private final int limit;
+    private final DataField vectorColumn;
+    private final float[] vector;
+
+    public VectorReadImpl(
+            FileStoreTable table,
+            Predicate filter,
+            int limit,
+            DataField vectorColumn,
+            float[] vector) {
+        this.table = table;
+        this.filter = filter;
+        this.limit = limit;
+        this.vectorColumn = vectorColumn;
+        this.vector = vector;
+    }
+
+    @Override
+    public GlobalIndexResult read(List<VectorSearchSplit> splits) {
+        if (splits.isEmpty()) {
+            return GlobalIndexResult.createEmpty();
+        }
+
+        RoaringNavigableMap64 preFilter = preFilter(splits).orElse(null);
+        Integer threadNum = table.coreOptions().globalIndexThreadNum();
+
+        String indexType = splits.get(0).vectorIndexFiles().get(0).indexType();
+        GlobalIndexer globalIndexer =
+                GlobalIndexerFactoryUtils.load(indexType)
+                        .create(vectorColumn, 
table.coreOptions().toConfiguration());
+        IndexPathFactory indexPathFactory = 
table.store().pathFactory().globalIndexFileFactory();
+        Iterator<Optional<ScoredGlobalIndexResult>> resultIterators =
+                randomlyExecuteSequentialReturn(
+                        split ->
+                                singletonList(
+                                        eval(
+                                                globalIndexer,
+                                                indexPathFactory,
+                                                split.rowRangeStart(),
+                                                split.rowRangeEnd(),
+                                                split.vectorIndexFiles(),
+                                                preFilter)),
+                        splits,
+                        threadNum);
+
+        ScoredGlobalIndexResult result = ScoredGlobalIndexResult.createEmpty();
+        while (resultIterators.hasNext()) {
+            Optional<ScoredGlobalIndexResult> next = resultIterators.next();
+            if (next.isPresent()) {
+                result = result.or(next.get());
+            }
+        }
+
+        return result.topK(limit);
+    }
+
+    private Optional<RoaringNavigableMap64> preFilter(List<VectorSearchSplit> 
splits) {
+        Set<IndexFileMeta> scalarIndexFiles =
+                new TreeSet<>(Comparator.comparing(IndexFileMeta::fileName));
+        for (VectorSearchSplit split : splits) {
+            scalarIndexFiles.addAll(split.scalarIndexFiles());
+        }
+        if (scalarIndexFiles.isEmpty()) {
+            return Optional.empty();
+        }
+
+        try (GlobalIndexScanner scanner = GlobalIndexScanner.create(table, 
scalarIndexFiles)) {
+            return scanner.scan(filter).map(GlobalIndexResult::results);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Optional<ScoredGlobalIndexResult> eval(
+            GlobalIndexer globalIndexer,
+            IndexPathFactory indexPathFactory,
+            long rowRangeStart,
+            long rowRangeEnd,
+            List<IndexFileMeta> vectorIndexFiles,
+            @Nullable RoaringNavigableMap64 includeRowIds) {
+        List<GlobalIndexIOMeta> indexIOMetaList = new ArrayList<>();
+        for (IndexFileMeta indexFile : vectorIndexFiles) {
+            GlobalIndexMeta meta = checkNotNull(indexFile.globalIndexMeta());
+            indexIOMetaList.add(
+                    new GlobalIndexIOMeta(
+                            indexPathFactory.toPath(indexFile),
+                            indexFile.fileSize(),
+                            meta.indexMeta()));
+        }
+        @SuppressWarnings("resource")
+        FileIO fileIO = table.fileIO();
+        GlobalIndexFileReader indexFileReader = m -> 
fileIO.newInputStream(m.filePath());
+        try (GlobalIndexReader reader =
+                globalIndexer.createReader(indexFileReader, indexIOMetaList)) {
+            VectorSearch vectorSearch =
+                    new VectorSearch(vector, limit, vectorColumn.name())
+                            .withIncludeRowIds(includeRowIds);
+            return new OffsetGlobalIndexReader(reader, rowRangeStart, 
rowRangeEnd)
+                    .visitVectorSearch(vectorSearch);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScan.java
index 57aaf9fd4a..80f667cc0b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScan.java
@@ -18,12 +18,6 @@
 
 package org.apache.paimon.table.source;
 
-import org.apache.paimon.index.IndexFileMeta;
-import org.apache.paimon.index.IndexFileMetaSerializer;
-import org.apache.paimon.utils.RoaringNavigableMap64;
-
-import javax.annotation.Nullable;
-
 import java.util.List;
 
 /** Vector scan to pre-filter and scan index files. */
@@ -33,15 +27,6 @@ public interface VectorScan {
 
     /** Plan of vector scan. */
     interface Plan {
-
-        /**
-         * Index files for vector index. {@link IndexFileMeta} can be 
serialized by {@link
-         * IndexFileMetaSerializer}.
-         */
-        List<IndexFileMeta> indexFiles();
-
-        /** Include row ids generated by pre-filters. */
-        @Nullable
-        RoaringNavigableMap64 includeRowIds();
+        List<VectorSearchSplit> splits();
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScanImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScanImpl.java
new file mode 100644
index 0000000000..d3db6dd13d
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScanImpl.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.index.GlobalIndexMeta;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.Range;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.predicate.PredicateVisitor.collectFieldNames;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** Implementation for {@link VectorScan}. */
+public class VectorScanImpl implements VectorScan {
+
+    private final FileStoreTable table;
+    private final PartitionPredicate partitionFilter;
+    private final Predicate filter;
+    private final DataField vectorColumn;
+
+    public VectorScanImpl(
+            FileStoreTable table,
+            PartitionPredicate partitionFilter,
+            Predicate filter,
+            DataField vectorColumn) {
+        this.table = table;
+        this.partitionFilter = partitionFilter;
+        this.filter = filter;
+        this.vectorColumn = vectorColumn;
+    }
+
+    @Override
+    public Plan scan() {
+        Objects.requireNonNull(vectorColumn, "Vector column must be set");
+
+        Set<Integer> filterFieldIds =
+                collectFieldNames(filter).stream()
+                        .filter(name -> table.rowType().containsField(name))
+                        .map(name -> table.rowType().getField(name).id())
+                        .collect(Collectors.toSet());
+        Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(table);
+        IndexFileHandler indexFileHandler = 
table.store().newIndexFileHandler();
+        Filter<IndexManifestEntry> indexFileFilter =
+                entry -> {
+                    if (partitionFilter != null && 
!partitionFilter.test(entry.partition())) {
+                        return false;
+                    }
+                    GlobalIndexMeta globalIndex = 
entry.indexFile().globalIndexMeta();
+                    if (globalIndex == null) {
+                        return false;
+                    }
+                    int fieldId = globalIndex.indexFieldId();
+                    return vectorColumn.id() == fieldId || 
filterFieldIds.contains(fieldId);
+                };
+
+        List<IndexFileMeta> allIndexFiles =
+                indexFileHandler.scan(snapshot, indexFileFilter).stream()
+                        .map(IndexManifestEntry::indexFile)
+                        .collect(Collectors.toList());
+
+        // Group vector index files by (rowRangeStart, rowRangeEnd)
+        Map<Range, List<IndexFileMeta>> vectorByRange = new HashMap<>();
+        for (IndexFileMeta indexFile : allIndexFiles) {
+            GlobalIndexMeta meta = checkNotNull(indexFile.globalIndexMeta());
+            if (meta.indexFieldId() == vectorColumn.id()) {
+                Range range = new Range(meta.rowRangeStart(), 
meta.rowRangeEnd());
+                vectorByRange.computeIfAbsent(range, k -> new 
ArrayList<>()).add(indexFile);
+            }
+        }
+
+        // Build splits: for each vector range, attach matching scalar index 
files
+        List<VectorSearchSplit> splits = new ArrayList<>();
+        for (Map.Entry<Range, List<IndexFileMeta>> entry : 
vectorByRange.entrySet()) {
+            Range range = entry.getKey();
+            List<IndexFileMeta> vectorFiles = entry.getValue();
+            List<IndexFileMeta> scalarFiles =
+                    allIndexFiles.stream()
+                            .filter(
+                                    f -> {
+                                        GlobalIndexMeta globalIndex =
+                                                
checkNotNull(f.globalIndexMeta());
+                                        if (globalIndex.indexFieldId() == 
vectorColumn.id()) {
+                                            return false;
+                                        }
+                                        return 
range.hasIntersection(globalIndex.rowRange());
+                                    })
+                            .collect(Collectors.toList());
+            splits.add(new VectorSearchSplit(range.from, range.to, 
vectorFiles, scalarFiles));
+        }
+
+        return () -> splits;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorSearchBuilderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorSearchBuilderImpl.java
index 06e6ee775b..beb7844e13 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorSearchBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorSearchBuilderImpl.java
@@ -18,46 +18,14 @@
 
 package org.apache.paimon.table.source;
 
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.globalindex.GlobalIndexIOMeta;
-import org.apache.paimon.globalindex.GlobalIndexReader;
-import org.apache.paimon.globalindex.GlobalIndexResult;
-import org.apache.paimon.globalindex.GlobalIndexer;
-import org.apache.paimon.globalindex.GlobalIndexerFactory;
-import org.apache.paimon.globalindex.GlobalIndexerFactoryUtils;
-import org.apache.paimon.globalindex.OffsetGlobalIndexReader;
-import org.apache.paimon.globalindex.ScoredGlobalIndexResult;
-import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
-import org.apache.paimon.index.GlobalIndexMeta;
-import org.apache.paimon.index.IndexFileHandler;
-import org.apache.paimon.index.IndexFileMeta;
-import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.predicate.VectorSearch;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.InnerTable;
-import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
 import org.apache.paimon.types.DataField;
-import org.apache.paimon.utils.Filter;
-import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.RoaringNavigableMap64;
 
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import static java.util.Collections.singletonList;
 import static 
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicate;
-import static 
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
 
 /** Implementation for {@link VectorSearchBuilder}. */
 public class VectorSearchBuilderImpl implements VectorSearchBuilder {
@@ -114,113 +82,11 @@ public class VectorSearchBuilderImpl implements 
VectorSearchBuilder {
 
     @Override
     public VectorScan newVectorScan() {
-        return new VectorScanImpl();
+        return new VectorScanImpl(table, partitionFilter, filter, 
vectorColumn);
     }
 
     @Override
     public VectorRead newVectorRead() {
-        return new VectorReadImpl();
-    }
-
-    private class VectorScanImpl implements VectorScan {
-
-        @Override
-        public Plan scan() {
-            Objects.requireNonNull(vector, "Vector must be set");
-            Objects.requireNonNull(vectorColumn, "Vector column must be set");
-            if (limit <= 0) {
-                throw new IllegalArgumentException("Limit must be positive");
-            }
-
-            Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(table);
-            IndexFileHandler indexFileHandler = 
table.store().newIndexFileHandler();
-            Filter<IndexManifestEntry> indexFileFilter =
-                    partitionFilter == null
-                            ? Filter.alwaysTrue()
-                            : entry -> partitionFilter.test(entry.partition());
-            List<IndexManifestEntry> indexManifestEntries =
-                    indexFileHandler.scan(snapshot, indexFileFilter);
-
-            List<IndexFileMeta> vectorIndexFiles =
-                    indexManifestEntries.stream()
-                            .map(IndexManifestEntry::indexFile)
-                            .filter(
-                                    indexFile -> {
-                                        GlobalIndexMeta indexMeta = 
indexFile.globalIndexMeta();
-                                        if (indexMeta == null) {
-                                            return false;
-                                        }
-                                        return indexMeta.indexFieldId() == 
vectorColumn.id();
-                                    })
-                            .collect(Collectors.toList());
-
-            return new Plan() {
-                @Override
-                public List<IndexFileMeta> indexFiles() {
-                    return vectorIndexFiles;
-                }
-
-                @Override
-                public @Nullable RoaringNavigableMap64 includeRowIds() {
-                    // TODO pre filter by btree index
-                    return null;
-                }
-            };
-        }
-    }
-
-    private class VectorReadImpl implements VectorRead {
-
-        @Override
-        public GlobalIndexResult read(VectorScan.Plan plan) {
-            List<IndexFileMeta> indexFiles = plan.indexFiles();
-            if (indexFiles.isEmpty()) {
-                return GlobalIndexResult.createEmpty();
-            }
-
-            Integer threadNum = table.coreOptions().globalIndexThreadNum();
-            Iterator<Optional<ScoredGlobalIndexResult>> resultIterators =
-                    randomlyExecuteSequentialReturn(
-                            vectorIndex -> singletonList(eval(vectorIndex, 
plan.includeRowIds())),
-                            indexFiles,
-                            threadNum);
-
-            ScoredGlobalIndexResult result = 
ScoredGlobalIndexResult.createEmpty();
-            while (resultIterators.hasNext()) {
-                Optional<ScoredGlobalIndexResult> next = 
resultIterators.next();
-                if (next.isPresent()) {
-                    result = result.or(next.get());
-                }
-            }
-
-            return result.topK(limit);
-        }
-
-        private Optional<ScoredGlobalIndexResult> eval(
-                IndexFileMeta indexFile, @Nullable RoaringNavigableMap64 
includeRowIds) {
-            GlobalIndexerFactory globalIndexerFactory =
-                    GlobalIndexerFactoryUtils.load(indexFile.indexType());
-            GlobalIndexer globalIndexer =
-                    globalIndexerFactory.create(
-                            vectorColumn, 
table.coreOptions().toConfiguration());
-            GlobalIndexMeta meta = indexFile.globalIndexMeta();
-            Preconditions.checkNotNull(meta);
-            Path filePath = 
table.store().pathFactory().globalIndexFileFactory().toPath(indexFile);
-            GlobalIndexIOMeta globalIndexIOMeta =
-                    new GlobalIndexIOMeta(filePath, indexFile.fileSize(), 
meta.indexMeta());
-            @SuppressWarnings("resource")
-            FileIO fileIO = table.fileIO();
-            GlobalIndexFileReader indexFileReader = m -> 
fileIO.newInputStream(m.filePath());
-            try (GlobalIndexReader reader =
-                    globalIndexer.createReader(indexFileReader, 
singletonList(globalIndexIOMeta))) {
-                VectorSearch vectorSearch =
-                        new VectorSearch(vector, limit, vectorColumn.name())
-                                .withIncludeRowIds(includeRowIds);
-                return new OffsetGlobalIndexReader(reader, 
meta.rowRangeStart(), meta.rowRangeEnd())
-                        .visitVectorSearch(vectorSearch);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-        }
+        return new VectorReadImpl(table, filter, limit, vectorColumn, vector);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorSearchSplit.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorSearchSplit.java
new file mode 100644
index 0000000000..032c2be301
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorSearchSplit.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.index.IndexFileMetaSerializer;
+import org.apache.paimon.io.DataInputViewStreamWrapper;
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+
+/** Split of vector search. */
+public class VectorSearchSplit implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final int VERSION = 1;
+
+    private static final ThreadLocal<IndexFileMetaSerializer> INDEX_SERIALIZER 
=
+            ThreadLocal.withInitial(IndexFileMetaSerializer::new);
+
+    private transient long rowRangeStart;
+    private transient long rowRangeEnd;
+    private transient List<IndexFileMeta> vectorIndexFiles;
+    private transient List<IndexFileMeta> scalarIndexFiles;
+
+    public VectorSearchSplit(
+            long rowRangeStart,
+            long rowRangeEnd,
+            List<IndexFileMeta> vectorIndexFiles,
+            List<IndexFileMeta> scalarIndexFiles) {
+        this.rowRangeStart = rowRangeStart;
+        this.rowRangeEnd = rowRangeEnd;
+        this.vectorIndexFiles = vectorIndexFiles;
+        this.scalarIndexFiles = scalarIndexFiles;
+    }
+
+    public long rowRangeStart() {
+        return rowRangeStart;
+    }
+
+    public long rowRangeEnd() {
+        return rowRangeEnd;
+    }
+
+    public List<IndexFileMeta> vectorIndexFiles() {
+        return vectorIndexFiles;
+    }
+
+    public List<IndexFileMeta> scalarIndexFiles() {
+        return scalarIndexFiles;
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+        out.writeInt(VERSION);
+        IndexFileMetaSerializer serializer = INDEX_SERIALIZER.get();
+        DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
+        view.writeLong(rowRangeStart);
+        view.writeLong(rowRangeEnd);
+        serializer.serializeList(vectorIndexFiles, view);
+        serializer.serializeList(scalarIndexFiles, view);
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+        in.defaultReadObject();
+        int version = in.readInt();
+        if (version != VERSION) {
+            throw new IOException("Unsupported VectorSearchSplit version: " + 
version);
+        }
+        IndexFileMetaSerializer serializer = INDEX_SERIALIZER.get();
+        DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
+        this.rowRangeStart = view.readLong();
+        this.rowRangeEnd = view.readLong();
+        this.vectorIndexFiles = serializer.deserializeList(view);
+        this.scalarIndexFiles = serializer.deserializeList(view);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        VectorSearchSplit that = (VectorSearchSplit) o;
+        return rowRangeStart == that.rowRangeStart
+                && rowRangeEnd == that.rowRangeEnd
+                && Objects.equals(vectorIndexFiles, that.vectorIndexFiles)
+                && Objects.equals(scalarIndexFiles, that.scalarIndexFiles);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(rowRangeStart, rowRangeEnd, vectorIndexFiles, 
scalarIndexFiles);
+    }
+
+    @Override
+    public String toString() {
+        return "VectorSearchSplit{"
+                + "rowRangeStart="
+                + rowRangeStart
+                + ", rowRangeEnd="
+                + rowRangeEnd
+                + ", vectorIndexFiles="
+                + vectorIndexFiles
+                + ", scalarIndexFiles="
+                + scalarIndexFiles
+                + '}';
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/BitmapGlobalIndexTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/BitmapGlobalIndexTableTest.java
index 574579965d..cd888e6982 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/BitmapGlobalIndexTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/BitmapGlobalIndexTableTest.java
@@ -25,19 +25,19 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.globalindex.DataEvolutionBatchScan;
 import org.apache.paimon.globalindex.GlobalIndexFileReadWrite;
 import org.apache.paimon.globalindex.GlobalIndexResult;
-import org.apache.paimon.globalindex.GlobalIndexScanBuilder;
+import org.apache.paimon.globalindex.GlobalIndexScanner;
 import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
 import org.apache.paimon.globalindex.GlobalIndexer;
 import org.apache.paimon.globalindex.GlobalIndexerFactory;
 import org.apache.paimon.globalindex.GlobalIndexerFactoryUtils;
 import org.apache.paimon.globalindex.ResultEntry;
-import org.apache.paimon.globalindex.RowRangeGlobalIndexScanner;
 import org.apache.paimon.globalindex.bitmap.BitmapGlobalIndexerFactory;
 import org.apache.paimon.index.GlobalIndexMeta;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataIncrement;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.reader.RecordReader;
@@ -56,7 +56,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
@@ -248,20 +247,9 @@ public class BitmapGlobalIndexTableTest extends 
DataEvolutionTestBase {
 
     private RoaringNavigableMap64 globalIndexScan(FileStoreTable table, 
Predicate predicate)
             throws Exception {
-        GlobalIndexScanBuilder indexScanBuilder = 
table.store().newGlobalIndexScanBuilder();
-        List<Range> ranges = indexScanBuilder.shardList();
-        GlobalIndexResult globalFileIndexResult = 
GlobalIndexResult.createEmpty();
-        for (Range range : ranges) {
-            try (RowRangeGlobalIndexScanner scanner =
-                    indexScanBuilder.withRowRange(range).build()) {
-                Optional<GlobalIndexResult> globalIndexResult = 
scanner.scan(predicate);
-                if (!globalIndexResult.isPresent()) {
-                    throw new RuntimeException("Can't find index result by 
scan");
-                }
-                globalFileIndexResult = 
globalFileIndexResult.or(globalIndexResult.get());
-            }
+        try (GlobalIndexScanner scanner =
+                GlobalIndexScanner.create(table, 
PartitionPredicate.ALWAYS_TRUE, predicate)) {
+            return scanner.scan(predicate).get().results();
         }
-
-        return globalFileIndexResult.results();
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
index f524e07e73..98f730359d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
@@ -22,11 +22,10 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.globalindex.DataEvolutionBatchScan;
 import org.apache.paimon.globalindex.GlobalIndexResult;
-import org.apache.paimon.globalindex.GlobalIndexScanBuilder;
+import org.apache.paimon.globalindex.GlobalIndexScanner;
 import org.apache.paimon.globalindex.IndexedSplit;
-import org.apache.paimon.globalindex.RowRangeGlobalIndexScanner;
 import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
-import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.sink.BatchTableCommit;
@@ -44,9 +43,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -172,44 +169,6 @@ public class BtreeGlobalIndexTableTest extends 
DataEvolutionTestBase {
         assertThat(result).containsExactly("a200", "a56789");
     }
 
-    @Test
-    public void testBtreeWithNonIndexedRowRange() throws Exception {
-        write(10L);
-        append(0, 10);
-
-        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
-        createIndex("f1", Collections.singletonList(new Range(0L, 9L)));
-
-        assertThat(table.store().newGlobalIndexScanBuilder().shardList())
-                .containsExactly(new Range(0L, 9L));
-
-        Predicate predicate =
-                new PredicateBuilder(table.rowType()).equal(1, 
BinaryString.fromString("a5"));
-        ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate);
-
-        List<Split> splits = readBuilder.newScan().plan().splits();
-        assertThat(splits).hasSize(1);
-
-        IndexedSplit indexedSplit = (IndexedSplit) splits.get(0);
-        assertThat(indexedSplit.rowRanges())
-                .containsExactly(new Range(5L, 5L), new Range(10L, 19L));
-        assertThat(
-                        indexedSplit.dataSplit().dataFiles().stream()
-                                .map(DataFileMeta::firstRowId)
-                                .distinct()
-                                .sorted()
-                                .collect(Collectors.toList()))
-                .containsExactly(0L, 10L);
-
-        List<String> result = new ArrayList<>();
-        readBuilder
-                .newRead()
-                .createReader(splits)
-                .forEachRemaining(row -> 
result.add(row.getString(1).toString()));
-        assertThat(result)
-                .containsExactly("a5", "a0", "a1", "a2", "a3", "a4", "a5", 
"a6", "a7", "a8", "a9");
-    }
-
     private void createIndex(String fieldName) throws Exception {
         createIndex(fieldName, null);
     }
@@ -266,20 +225,9 @@ public class BtreeGlobalIndexTableTest extends 
DataEvolutionTestBase {
 
     private RoaringNavigableMap64 globalIndexScan(FileStoreTable table, 
Predicate predicate)
             throws Exception {
-        GlobalIndexScanBuilder indexScanBuilder = 
table.store().newGlobalIndexScanBuilder();
-        List<Range> ranges = indexScanBuilder.shardList();
-        GlobalIndexResult globalFileIndexResult = 
GlobalIndexResult.createEmpty();
-        for (Range range : ranges) {
-            try (RowRangeGlobalIndexScanner scanner =
-                    indexScanBuilder.withRowRange(range).build()) {
-                Optional<GlobalIndexResult> globalIndexResult = 
scanner.scan(predicate);
-                if (!globalIndexResult.isPresent()) {
-                    throw new RuntimeException("Can't find index result by 
scan");
-                }
-                globalFileIndexResult = 
globalFileIndexResult.or(globalIndexResult.get());
-            }
+        try (GlobalIndexScanner scanner =
+                GlobalIndexScanner.create(table, 
PartitionPredicate.ALWAYS_TRUE, predicate)) {
+            return scanner.scan(predicate).get().results();
         }
-
-        return globalFileIndexResult.results();
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/VectorSearchBuilderTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/VectorSearchBuilderTest.java
index 30f9c4b107..79928bacca 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/VectorSearchBuilderTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/VectorSearchBuilderTest.java
@@ -25,16 +25,20 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.globalindex.GlobalIndexBuilderUtils;
+import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
 import org.apache.paimon.globalindex.GlobalIndexResult;
 import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
 import org.apache.paimon.globalindex.ResultEntry;
 import org.apache.paimon.globalindex.ScoredGlobalIndexResult;
+import org.apache.paimon.globalindex.btree.BTreeGlobalIndexerFactory;
 import org.apache.paimon.globalindex.testvector.TestVectorGlobalIndexerFactory;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataIncrement;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
@@ -52,6 +56,10 @@ import org.apache.paimon.utils.Range;
 
 import org.junit.jupiter.api.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -355,6 +363,150 @@ public class VectorSearchBuilderTest extends 
TableTestBase {
                                 + result2.results().getIntCardinality());
     }
 
+    @Test
+    public void testScanPartialRangeIntersection() throws Exception {
+        createTableDefault();
+        FileStoreTable table = getTableDefault();
+
+        // Write 10 rows
+        float[][] allVectors = new float[10][];
+        for (int i = 0; i < 10; i++) {
+            allVectors[i] = new float[] {(float) Math.cos(i * 0.3), (float) 
Math.sin(i * 0.3)};
+        }
+        writeVectors(table, allVectors);
+
+        // Build ONE vector index covering full range [0,9]
+        buildAndCommitVectorIndex(table, allVectors, new Range(0, 9));
+
+        // Build ONE btree index covering partial range [3,7]
+        buildAndCommitBTreeIndex(table, new int[] {3, 4, 5, 6, 7}, new 
Range(3, 7));
+
+        // VectorScanImpl should attach scalar index because [3,7] intersects 
[0,9]
+        Predicate idFilter = new 
PredicateBuilder(table.rowType()).greaterOrEqual(0, 5);
+        VectorScan.Plan plan =
+                table.newVectorSearchBuilder()
+                        .withVector(new float[] {1.0f, 0.0f})
+                        .withLimit(5)
+                        .withVectorColumn(VECTOR_FIELD_NAME)
+                        .withFilter(idFilter)
+                        .newVectorScan()
+                        .scan();
+
+        assertThat(plan.splits()).hasSize(1);
+        VectorSearchSplit split = plan.splits().get(0);
+        assertThat(split.rowRangeStart()).isEqualTo(0);
+        assertThat(split.rowRangeEnd()).isEqualTo(9);
+        assertThat(split.vectorIndexFiles()).isNotEmpty();
+        // Scalar index [3,7] intersects vector range [0,9] → attached
+        assertThat(split.scalarIndexFiles()).isNotEmpty();
+
+        // Read with pre-filter: id >= 5, btree covers [3,7] so rows 5,6,7 
from btree
+        GlobalIndexResult result =
+                table.newVectorSearchBuilder()
+                        .withVector(new float[] {1.0f, 0.0f})
+                        .withLimit(5)
+                        .withVectorColumn(VECTOR_FIELD_NAME)
+                        .withFilter(idFilter)
+                        .newVectorRead()
+                        .read(plan);
+
+        assertThat(result).isInstanceOf(ScoredGlobalIndexResult.class);
+        assertThat(result.results().isEmpty()).isFalse();
+        // Pre-filter restricts to rows matching id >= 5 from btree [3,7]
+        for (long rowId : result.results()) {
+            assertThat(rowId).isBetween(5L, 7L);
+        }
+    }
+
+    @Test
+    public void testPreFilterMatchesZeroRows() throws Exception {
+        createTableDefault();
+        FileStoreTable table = getTableDefault();
+
+        float[][] vectors = {
+            {1.0f, 0.0f},
+            {0.95f, 0.1f},
+            {0.0f, 1.0f},
+            {0.1f, 0.95f}
+        };
+        writeVectors(table, vectors);
+
+        Range range = new Range(0, 3);
+        buildAndCommitVectorIndex(table, vectors, range);
+        buildAndCommitBTreeIndex(table, new int[] {0, 1, 2, 3}, range);
+
+        // Filter id > 100: btree covers ids 0-3, so preFilter matches zero 
rows
+        Predicate impossibleFilter = new 
PredicateBuilder(table.rowType()).greaterThan(0, 100);
+        VectorSearchBuilder searchBuilder =
+                table.newVectorSearchBuilder()
+                        .withVector(new float[] {1.0f, 0.0f})
+                        .withLimit(4)
+                        .withVectorColumn(VECTOR_FIELD_NAME)
+                        .withFilter(impossibleFilter);
+
+        VectorScan.Plan plan = searchBuilder.newVectorScan().scan();
+        assertThat(plan.splits()).hasSize(1);
+        // Scalar index is attached since field matches filter
+        assertThat(plan.splits().get(0).scalarIndexFiles()).isNotEmpty();
+
+        // Read: preFilter returns empty bitmap → vector search returns no 
results
+        GlobalIndexResult result = searchBuilder.newVectorRead().read(plan);
+        assertThat(result.results().isEmpty()).isTrue();
+    }
+
+    @Test
+    public void testVectorSearchSplitSerialization() throws Exception {
+        createTableDefault();
+        FileStoreTable table = getTableDefault();
+
+        float[][] vectors = {{1.0f, 0.0f}, {0.0f, 1.0f}};
+        writeVectors(table, vectors);
+
+        Range range = new Range(0, 1);
+        buildAndCommitVectorIndex(table, vectors, range);
+        buildAndCommitBTreeIndex(table, new int[] {0, 1}, range);
+
+        Predicate filter = new 
PredicateBuilder(table.rowType()).greaterOrEqual(0, 0);
+        VectorScan.Plan plan =
+                table.newVectorSearchBuilder()
+                        .withVector(new float[] {1.0f, 0.0f})
+                        .withLimit(2)
+                        .withVectorColumn(VECTOR_FIELD_NAME)
+                        .withFilter(filter)
+                        .newVectorScan()
+                        .scan();
+
+        assertThat(plan.splits()).hasSize(1);
+        VectorSearchSplit original = plan.splits().get(0);
+
+        // Serialize
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        try (ObjectOutputStream out = new ObjectOutputStream(bos)) {
+            out.writeObject(original);
+        }
+
+        // Deserialize
+        VectorSearchSplit deserialized;
+        try (ObjectInputStream in =
+                new ObjectInputStream(new 
ByteArrayInputStream(bos.toByteArray()))) {
+            deserialized = (VectorSearchSplit) in.readObject();
+        }
+
+        // Verify all fields match
+        
assertThat(deserialized.rowRangeStart()).isEqualTo(original.rowRangeStart());
+        
assertThat(deserialized.rowRangeEnd()).isEqualTo(original.rowRangeEnd());
+        
assertThat(deserialized.vectorIndexFiles()).hasSize(original.vectorIndexFiles().size());
+        
assertThat(deserialized.scalarIndexFiles()).hasSize(original.scalarIndexFiles().size());
+        for (int i = 0; i < original.vectorIndexFiles().size(); i++) {
+            assertThat(deserialized.vectorIndexFiles().get(i).fileName())
+                    .isEqualTo(original.vectorIndexFiles().get(i).fileName());
+        }
+        for (int i = 0; i < original.scalarIndexFiles().size(); i++) {
+            assertThat(deserialized.scalarIndexFiles().get(i).fileName())
+                    .isEqualTo(original.scalarIndexFiles().get(i).fileName());
+        }
+    }
+
     // ====================== Helper methods ======================
 
     private void writeVectors(FileStoreTable table, float[][] vectors) throws 
Exception {
@@ -490,6 +642,169 @@ public class VectorSearchBuilderTest extends 
TableTestBase {
         }
     }
 
+    @Test
+    public void testVectorSearchWithBTreePreFilter() throws Exception {
+        createTableDefault();
+        FileStoreTable table = getTableDefault();
+
+        // Write 10 rows: ids 0-9 with vectors
+        // Rows 0-4: vectors near (1,0)
+        // Rows 5-9: vectors near (0,1)
+        float[][] allVectors = {
+            {1.0f, 0.0f}, // row 0
+            {0.95f, 0.1f}, // row 1
+            {0.98f, 0.05f}, // row 2
+            {0.9f, 0.15f}, // row 3
+            {0.85f, 0.2f}, // row 4
+            {0.0f, 1.0f}, // row 5
+            {0.1f, 0.95f}, // row 6
+            {0.05f, 0.98f}, // row 7
+            {0.15f, 0.9f}, // row 8
+            {0.2f, 0.85f} // row 9
+        };
+        writeVectors(table, allVectors);
+
+        Range range1 = new Range(0, 4);
+        Range range2 = new Range(5, 9);
+
+        // Build two vector index files for each range
+        buildAndCommitVectorIndex(
+                table,
+                new float[][] {
+                    allVectors[0], allVectors[1], allVectors[2], 
allVectors[3], allVectors[4]
+                },
+                range1);
+        buildAndCommitVectorIndex(
+                table,
+                new float[][] {
+                    allVectors[5], allVectors[6], allVectors[7], 
allVectors[8], allVectors[9]
+                },
+                range2);
+
+        // Build two btree indexes on 'id' field for each range
+        buildAndCommitBTreeIndex(table, new int[] {0, 1, 2, 3, 4}, range1);
+        buildAndCommitBTreeIndex(table, new int[] {5, 6, 7, 8, 9}, range2);
+
+        // --- Test VectorScanImpl: verify splits contain scalar index files 
---
+        Predicate idFilter = new 
PredicateBuilder(table.rowType()).greaterOrEqual(0, 5);
+        VectorSearchBuilder searchBuilder =
+                table.newVectorSearchBuilder()
+                        .withVector(new float[] {0.1f, 0.9f})
+                        .withLimit(5)
+                        .withVectorColumn(VECTOR_FIELD_NAME)
+                        .withFilter(idFilter);
+
+        VectorScan.Plan plan = searchBuilder.newVectorScan().scan();
+        assertThat(plan.splits()).isNotEmpty();
+        // Every split should have vector index files
+        for (VectorSearchSplit split : plan.splits()) {
+            assertThat(split.vectorIndexFiles()).isNotEmpty();
+        }
+        // At least one split should have scalar (btree) index files
+        long scalarCount =
+                plan.splits().stream().filter(s -> 
!s.scalarIndexFiles().isEmpty()).count();
+        assertThat(scalarCount).isGreaterThan(0);
+
+        // --- Test VectorReadImpl: pre-filter should narrow results ---
+        // Query vector near (0,1) with filter id >= 5
+        // Without filter: rows 5,6,7,8,9 are closest
+        // With filter id >= 5: btree pre-filter restricts to rows 5-9
+        GlobalIndexResult resultWithFilter = 
searchBuilder.newVectorRead().read(plan);
+        
assertThat(resultWithFilter).isInstanceOf(ScoredGlobalIndexResult.class);
+        assertThat(resultWithFilter.results().isEmpty()).isFalse();
+        for (long rowId : resultWithFilter.results()) {
+            assertThat(rowId).isBetween(5L, 9L);
+        }
+
+        // Compare with no-filter search (should include results from both 
ranges)
+        GlobalIndexResult resultNoFilter =
+                table.newVectorSearchBuilder()
+                        .withVector(new float[] {0.1f, 0.9f})
+                        .withLimit(10)
+                        .withVectorColumn(VECTOR_FIELD_NAME)
+                        .executeLocal();
+        assertThat(resultNoFilter.results().getIntCardinality())
+                .isGreaterThan(resultWithFilter.results().getIntCardinality());
+    }
+
+    private void buildAndCommitVectorIndex(FileStoreTable table, float[][] 
vectors, Range rowRange)
+            throws Exception {
+        Options options = table.coreOptions().toConfiguration();
+        DataField vectorField = table.rowType().getField(VECTOR_FIELD_NAME);
+
+        GlobalIndexSingletonWriter writer =
+                (GlobalIndexSingletonWriter)
+                        GlobalIndexBuilderUtils.createIndexWriter(
+                                table,
+                                TestVectorGlobalIndexerFactory.IDENTIFIER,
+                                vectorField,
+                                options);
+        for (float[] vec : vectors) {
+            writer.write(vec);
+        }
+        List<ResultEntry> entries = writer.finish();
+
+        List<IndexFileMeta> indexFiles =
+                GlobalIndexBuilderUtils.toIndexFileMetas(
+                        table.fileIO(),
+                        table.store().pathFactory().globalIndexFileFactory(),
+                        table.coreOptions(),
+                        rowRange,
+                        vectorField.id(),
+                        TestVectorGlobalIndexerFactory.IDENTIFIER,
+                        entries);
+
+        DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFiles);
+        CommitMessage message =
+                new CommitMessageImpl(
+                        BinaryRow.EMPTY_ROW,
+                        0,
+                        null,
+                        dataIncrement,
+                        CompactIncrement.emptyIncrement());
+        try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
+            commit.commit(Collections.singletonList(message));
+        }
+    }
+
+    private void buildAndCommitBTreeIndex(FileStoreTable table, int[] ids, 
Range rowRange)
+            throws Exception {
+        Options options = table.coreOptions().toConfiguration();
+        DataField idField = table.rowType().getField("id");
+
+        GlobalIndexParallelWriter writer =
+                (GlobalIndexParallelWriter)
+                        GlobalIndexBuilderUtils.createIndexWriter(
+                                table, BTreeGlobalIndexerFactory.IDENTIFIER, 
idField, options);
+        for (int id : ids) {
+            long relativeRowId = id - rowRange.from;
+            writer.write(id, relativeRowId);
+        }
+        List<ResultEntry> entries = writer.finish();
+
+        List<IndexFileMeta> indexFiles =
+                GlobalIndexBuilderUtils.toIndexFileMetas(
+                        table.fileIO(),
+                        table.store().pathFactory().globalIndexFileFactory(),
+                        table.coreOptions(),
+                        rowRange,
+                        idField.id(),
+                        BTreeGlobalIndexerFactory.IDENTIFIER,
+                        entries);
+
+        DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFiles);
+        CommitMessage message =
+                new CommitMessageImpl(
+                        BinaryRow.EMPTY_ROW,
+                        0,
+                        null,
+                        dataIncrement,
+                        CompactIncrement.emptyIncrement());
+        try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
+            commit.commit(Collections.singletonList(message));
+        }
+    }
+
     private void buildAndCommitPartitionedIndex(
             FileStoreTable table, float[][] vectors, BinaryRow partition, 
Range rowRange)
             throws Exception {
diff --git a/paimon-python/pypaimon/globalindex/__init__.py 
b/paimon-python/pypaimon/globalindex/__init__.py
index c3311655c8..e96d2d6a80 100644
--- a/paimon-python/pypaimon/globalindex/__init__.py
+++ b/paimon-python/pypaimon/globalindex/__init__.py
@@ -26,10 +26,7 @@ from pypaimon.globalindex.vector_search_result import (
 )
 from pypaimon.globalindex.global_index_meta import GlobalIndexMeta, 
GlobalIndexIOMeta
 from pypaimon.globalindex.global_index_evaluator import GlobalIndexEvaluator
-from pypaimon.globalindex.global_index_scan_builder import (
-    GlobalIndexScanBuilder,
-    RowRangeGlobalIndexScanner,
-)
+from pypaimon.globalindex.global_index_scanner import GlobalIndexScanner
 from pypaimon.utils.range import Range
 
 __all__ = [
@@ -43,7 +40,6 @@ __all__ = [
     'GlobalIndexMeta',
     'GlobalIndexIOMeta',
     'GlobalIndexEvaluator',
-    'GlobalIndexScanBuilder',
-    'RowRangeGlobalIndexScanner',
+    'GlobalIndexScanner',
     'Range',
 ]
diff --git a/paimon-python/pypaimon/globalindex/global_index_evaluator.py 
b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
index 1692113d64..bbab92e883 100644
--- a/paimon-python/pypaimon/globalindex/global_index_evaluator.py
+++ b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
@@ -23,7 +23,6 @@ from typing import Callable, Collection, Dict, List, Optional
 from pypaimon.globalindex.global_index_reader import GlobalIndexReader, 
FieldRef
 from pypaimon.globalindex.global_index_result import GlobalIndexResult
 from pypaimon.common.predicate import Predicate
-from pypaimon.globalindex.vector_search import VectorSearch
 from pypaimon.schema.data_types import DataField
 
 
@@ -44,8 +43,7 @@ class GlobalIndexEvaluator:
 
     def evaluate(
         self,
-        predicate: Optional[Predicate],
-        vector_search: Optional[VectorSearch]
+        predicate: Optional[Predicate]
     ) -> Optional[GlobalIndexResult]:
         compound_result: Optional[GlobalIndexResult] = None
         
@@ -53,36 +51,6 @@ class GlobalIndexEvaluator:
         if predicate is not None:
             compound_result = self._visit_predicate(predicate)
         
-        # Evaluate vector search
-        if vector_search is not None:
-            field = self._field_by_name.get(vector_search.field_name)
-            if field is None:
-                raise ValueError(f"Field not found: 
{vector_search.field_name}")
-            
-            field_id = field.id
-            readers = self._index_readers_cache.get(field_id)
-            if readers is None:
-                readers = self._readers_function(field)
-                self._index_readers_cache[field_id] = readers
-            
-            # If we have a compound result from predicates, use it to filter 
vector search
-            if compound_result is not None:
-                vector_search = 
vector_search.with_include_row_ids(compound_result.results())
-            
-            for reader in readers:
-                child_result = vector_search.visit(reader)
-                if child_result is None:
-                    continue
-                
-                # AND operation
-                if compound_result is not None:
-                    compound_result = compound_result.and_(child_result)
-                else:
-                    compound_result = child_result
-                
-                if compound_result.is_empty():
-                    return compound_result
-        
         return compound_result
 
     def _visit_predicate(self, predicate: Predicate) -> 
Optional[GlobalIndexResult]:
@@ -141,7 +109,7 @@ class GlobalIndexEvaluator:
                 continue
             
             if compound_result is not None:
-                compound_result = compound_result.and_(child_result)
+                compound_result = compound_result.or_(child_result)
             else:
                 compound_result = child_result
             
diff --git a/paimon-python/pypaimon/globalindex/global_index_reader.py 
b/paimon-python/pypaimon/globalindex/global_index_reader.py
index ce70b5d6c1..cb9760ed2d 100644
--- a/paimon-python/pypaimon/globalindex/global_index_reader.py
+++ b/paimon-python/pypaimon/globalindex/global_index_reader.py
@@ -19,11 +19,7 @@
 """Global index reader interface."""
 
 from abc import ABC, abstractmethod
-from typing import List, Optional, TYPE_CHECKING
-
-if TYPE_CHECKING:
-    from pypaimon.globalindex.global_index_result import GlobalIndexResult
-    from pypaimon.globalindex.vector_search import VectorSearch
+from typing import List, Optional
 
 
 class FieldRef:
diff --git a/paimon-python/pypaimon/globalindex/global_index_scan_builder.py 
b/paimon-python/pypaimon/globalindex/global_index_scan_builder.py
deleted file mode 100644
index 543e4559fa..0000000000
--- a/paimon-python/pypaimon/globalindex/global_index_scan_builder.py
+++ /dev/null
@@ -1,202 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-"""Builder for scanning global indexes."""
-
-from abc import ABC, abstractmethod
-from typing import List, Optional, Collection
-from concurrent.futures import ThreadPoolExecutor, as_completed
-
-from pypaimon.globalindex import GlobalIndexIOMeta, GlobalIndexReader, 
GlobalIndexEvaluator
-from pypaimon.utils.range import Range
-from pypaimon.globalindex.global_index_result import GlobalIndexResult
-from pypaimon.schema.data_types import DataField
-
-
-class GlobalIndexScanBuilder(ABC):
-    """Builder for scanning global indexes."""
-
-    @abstractmethod
-    def with_snapshot(self, snapshot_or_id) -> 'GlobalIndexScanBuilder':
-        """Set the snapshot to scan."""
-        pass
-
-    @abstractmethod
-    def with_partition_predicate(self, partition_predicate) -> 
'GlobalIndexScanBuilder':
-        """Set the partition predicate."""
-        pass
-
-    @abstractmethod
-    def with_row_range(self, row_range: Range) -> 'GlobalIndexScanBuilder':
-        """Set the row range to scan."""
-        pass
-
-    @abstractmethod
-    def build(self) -> 'RowRangeGlobalIndexScanner':
-        """Build the scanner."""
-        pass
-
-    @abstractmethod
-    def shard_list(self) -> List[Range]:
-        """Return sorted and non-overlapping ranges."""
-        pass
-
-    @staticmethod
-    def parallel_scan(
-        ranges: List[Range],
-        builder: 'GlobalIndexScanBuilder',
-        filter_predicate: Optional['Predicate'],
-        vector_search: Optional['VectorSearch'],
-        thread_num: Optional[int] = None
-    ) -> Optional[GlobalIndexResult]:
-        
-        if not ranges:
-            return None
-        
-        scanners = []
-        try:
-            # Build scanners for each range
-            for row_range in ranges:
-                scanner = builder.with_row_range(row_range).build()
-                scanners.append((row_range, scanner))
-            
-            # Execute scans in parallel
-            results: List[Optional[GlobalIndexResult]] = [None] * len(ranges)
-            
-            def scan_range(idx: int, scanner: 'RowRangeGlobalIndexScanner') -> 
tuple:
-                result = scanner.scan(filter_predicate, vector_search)
-                return idx, result
-            
-            with ThreadPoolExecutor(max_workers=thread_num) as executor:
-                futures = {
-                    executor.submit(scan_range, idx, scanner): idx
-                    for idx, (_, scanner) in enumerate(scanners)
-                }
-                
-                for future in as_completed(futures):
-                    idx, result = future.result()
-                    results[idx] = result
-            
-            # Combine results
-            if all(r is None for r in results):
-                return None
-
-            # Find the first non-None result to start combining
-            combined: Optional[GlobalIndexResult] = None
-            for i, row_range in enumerate(ranges):
-                if results[i] is not None:
-                    if combined is None:
-                        combined = results[i]
-                    else:
-                        combined = combined.or_(results[i])
-                else:
-                    # If no result for this range, include the full range
-                    range_result = GlobalIndexResult.from_range(row_range)
-                    if combined is None:
-                        combined = range_result
-                    else:
-                        combined = combined.or_(range_result)
-
-            return combined
-            
-        finally:
-            # Close all scanners
-            for _, scanner in scanners:
-                try:
-                    scanner.close()
-                except Exception:
-                    pass
-
-
-class RowRangeGlobalIndexScanner:
-    """Scanner for shard-based global indexes."""
-
-    def __init__(
-        self,
-        options: dict,
-        fields: list,
-        file_io,
-        index_path: str,
-        row_range: Range,
-        index_entries: list
-    ):
-        self._options = options
-        self._row_range = row_range
-        self._evaluator = self._create_evaluator(fields, file_io, index_path, 
index_entries)
-
-    def _create_evaluator(self, fields, file_io, index_path, index_entries):
-        index_metas = {}
-        for entry in index_entries:
-            global_index_meta = entry.get('global_index_meta')
-            if global_index_meta is None:
-                continue
-            
-            field_id = global_index_meta.index_field_id
-            index_type = entry.get('index_type', 'unknown')
-            
-            if field_id not in index_metas:
-                index_metas[field_id] = {}
-            if index_type not in index_metas[field_id]:
-                index_metas[field_id][index_type] = []
-            
-            io_meta = GlobalIndexIOMeta(
-                file_name=entry.get('file_name'),
-                file_size=entry.get('file_size'),
-                metadata=global_index_meta.index_meta
-            )
-            index_metas[field_id][index_type].append(io_meta)
-        
-        def readers_function(field: DataField) -> 
Collection[GlobalIndexReader]:
-            readers = []
-            if field.id not in index_metas:
-                return readers
-            
-            for index_type, io_metas in index_metas[field.id].items():
-                if index_type == 'btree':
-                    from pypaimon.globalindex.btree import BTreeIndexReader
-                    from pypaimon.globalindex.btree.key_serializer import 
create_serializer
-                    for metadata in io_metas:
-                        reader = BTreeIndexReader(
-                            key_serializer=create_serializer(field.type),
-                            file_io=file_io,
-                            index_path=index_path,
-                            io_meta=metadata
-                        )
-                        readers.append(reader)
-            
-            return readers
-        
-        return GlobalIndexEvaluator(fields, readers_function)
-
-    def scan(
-        self,
-        predicate: Optional['Predicate'],
-        vector_search: Optional['VectorSearch']
-    ) -> Optional[GlobalIndexResult]:
-        """Scan the global index with the given predicate and vector search."""
-        return self._evaluator.evaluate(predicate, vector_search)
-
-    def close(self) -> None:
-        """Close the scanner and release resources."""
-        self._evaluator.close()
-
-    def __enter__(self) -> 'RowRangeGlobalIndexScanner':
-        return self
-
-    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
-        self.close()
diff --git 
a/paimon-python/pypaimon/globalindex/global_index_scan_builder_impl.py 
b/paimon-python/pypaimon/globalindex/global_index_scan_builder_impl.py
deleted file mode 100644
index 67e11d9b70..0000000000
--- a/paimon-python/pypaimon/globalindex/global_index_scan_builder_impl.py
+++ /dev/null
@@ -1,165 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-"""
-Implementation of GlobalIndexScanBuilder for FileStoreTable.
-"""
-
-from typing import List
-
-from pypaimon.utils.range import Range
-from pypaimon.globalindex.global_index_scan_builder import (
-    GlobalIndexScanBuilder,
-    RowRangeGlobalIndexScanner
-)
-
-
-class GlobalIndexScanBuilderImpl(GlobalIndexScanBuilder):
-
-    def __init__(
-        self,
-        options: dict,
-        row_type: list,
-        file_io: 'FileIO',
-        index_path_factory: 'IndexPathFactory',
-        snapshot_manager: 'SnapshotManager',
-        index_file_handler: 'IndexFileHandler'
-    ):
-        self._options = options
-        self._row_type = row_type
-        self._file_io = file_io
-        self._index_path_factory = index_path_factory
-        self._snapshot_manager = snapshot_manager
-        self._index_file_handler = index_file_handler
-
-        self._snapshot = None
-        self._partition_predicate = None
-        self._row_range = None
-        self._cached_entries = None
-
-    def with_snapshot(self, snapshot_or_id) -> 'GlobalIndexScanBuilderImpl':
-        """Set the snapshot to scan."""
-        if isinstance(snapshot_or_id, int):
-            self._snapshot = self._snapshot_manager.snapshot(snapshot_or_id)
-        else:
-            self._snapshot = snapshot_or_id
-        return self
-
-    def with_partition_predicate(self, partition_predicate) -> 
'GlobalIndexScanBuilderImpl':
-        """Set the partition predicate."""
-        self._partition_predicate = partition_predicate
-        return self
-
-    def with_row_range(self, row_range: Range) -> 'GlobalIndexScanBuilderImpl':
-        """Set the row range to scan."""
-        self._row_range = row_range
-        return self
-
-    def shard_list(self) -> List[Range]:
-        entries = self._scan()
-        if not entries:
-            return []
-
-        # Build ranges from index entries grouped by index type
-        index_ranges = {}
-        for entry in entries:
-            global_index_meta = entry.index_file.global_index_meta
-            if global_index_meta is None:
-                continue
-
-            index_type = entry.index_file.index_type
-            start = global_index_meta.row_range_start
-            end = global_index_meta.row_range_end
-
-            if index_type not in index_ranges:
-                index_ranges[index_type] = []
-            index_ranges[index_type].append(Range(start, end))
-
-        if not index_ranges:
-            return []
-
-        # Merge all ranges
-        all_ranges = []
-        for ranges in index_ranges.values():
-            all_ranges.extend(ranges)
-
-        return Range.sort_and_merge_overlap(all_ranges)
-
-    def build(self) -> RowRangeGlobalIndexScanner:
-        """Build the scanner for the current row range."""
-        if self._row_range is None:
-            raise ValueError("rowRange must not be null")
-
-        entries = self._scan()
-        index_path = self._index_path_factory.index_path()
-
-        # Convert entries to dict format for RowRangeGlobalIndexScanner
-        index_entries = []
-        for entry in entries:
-            index_entries.append({
-                'file_name': entry.index_file.file_name,
-                'file_size': entry.index_file.file_size,
-                'index_type': entry.index_file.index_type,
-                'global_index_meta': entry.index_file.global_index_meta
-            })
-
-        return RowRangeGlobalIndexScanner(
-            options=self._options,
-            fields=self._row_type,
-            file_io=self._file_io,
-            index_path=index_path,
-            row_range=self._row_range,
-            index_entries=index_entries
-        )
-
-    def _scan(self) -> List['IndexManifestEntry']:
-        """
-        Scan index manifest entries.
-        """
-        if self._cached_entries is not None:
-            return self._cached_entries
-
-        def entry_filter(entry: 'IndexManifestEntry') -> bool:
-            # Filter by partition predicate
-            if self._partition_predicate is not None:
-                if not self._partition_predicate.test(entry.partition):
-                    return False
-
-            # Filter by row range
-            if self._row_range is not None:
-                global_index_meta = entry.index_file.global_index_meta
-                if global_index_meta is None:
-                    return False
-
-                entry_start = global_index_meta.row_range_start
-                entry_end = global_index_meta.row_range_end
-
-                if not Range.intersect(
-                    entry_start, entry_end,
-                    self._row_range.from_, self._row_range.to
-                ):
-                    return False
-
-            return True
-
-        snapshot = self._snapshot
-        if snapshot is None:
-            snapshot = self._snapshot_manager.get_latest_snapshot()
-
-        self._cached_entries = self._index_file_handler.scan(snapshot, 
entry_filter)
-        return self._cached_entries
diff --git a/paimon-python/pypaimon/globalindex/global_index_scanner.py 
b/paimon-python/pypaimon/globalindex/global_index_scanner.py
new file mode 100644
index 0000000000..c3d144811c
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/global_index_scanner.py
@@ -0,0 +1,162 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Scanner for shard-based global indexes."""
+
+from typing import Collection, Optional
+
+from pypaimon.globalindex.global_index_evaluator import GlobalIndexEvaluator
+from pypaimon.globalindex.global_index_meta import GlobalIndexIOMeta
+from pypaimon.globalindex.global_index_reader import GlobalIndexReader
+from pypaimon.globalindex.global_index_result import GlobalIndexResult
+from pypaimon.common.predicate import Predicate
+from pypaimon.read.push_down_utils import _get_all_fields
+from pypaimon.schema.data_types import DataField
+from pypaimon.utils.range import Range
+
+
+class GlobalIndexScanner:
+    """Scanner for shard-based global indexes."""
+
+    def __init__(
+        self,
+        options: dict,
+        fields: list,
+        file_io,
+        index_path: str,
+        index_files: Collection['IndexFileMeta']
+    ):
+        self._options = options
+        self._evaluator = self._create_evaluator(fields, file_io, index_path, 
index_files)
+
+    def _create_evaluator(self, fields, file_io, index_path, index_files):
+        index_metas = {}
+        for index_file in index_files:
+            global_index_meta = index_file.global_index_meta
+            if global_index_meta is None:
+                continue
+
+            field_id = global_index_meta.index_field_id
+            index_type = index_file.index_type
+
+            if field_id not in index_metas:
+                index_metas[field_id] = {}
+            if index_type not in index_metas[field_id]:
+                index_metas[field_id][index_type] = {}
+
+            range_key = Range(global_index_meta.row_range_start, 
global_index_meta.row_range_end)
+            if range_key not in index_metas[field_id][index_type]:
+                index_metas[field_id][index_type][range_key] = []
+
+            io_meta = GlobalIndexIOMeta(
+                file_name=index_file.file_name,
+                file_size=index_file.file_size,
+                metadata=global_index_meta.index_meta
+            )
+            index_metas[field_id][index_type][range_key].append(io_meta)
+
+        def readers_function(field: DataField) -> 
Collection[GlobalIndexReader]:
+            return _create_readers(file_io, index_path, 
index_metas.get(field.id), field)
+
+        return GlobalIndexEvaluator(fields, readers_function)
+
+    @staticmethod
+    def create(table, index_files=None, partition_filter=None, predicate=None):
+        """Create a GlobalIndexScanner.
+
+        Can be called in two ways:
+        1. create(table, index_files) - with explicit index files
+        2. create(table, partition_filter=..., predicate=...) - scan index 
files from snapshot
+        """
+        from pypaimon.index.index_file_handler import IndexFileHandler
+
+        if index_files is not None:
+            return GlobalIndexScanner(
+                options=table.table_schema.options,
+                fields=table.fields,
+                file_io=table.file_io,
+                
index_path=table.path_factory().global_index_path_factory().index_path(),
+                index_files=index_files
+            )
+
+        # Scan index files from snapshot using partition_filter and predicate
+        filter_field_names = _get_all_fields(predicate)
+        filter_field_ids = set()
+        if predicate is not None:
+            for field_item in table.fields:
+                if field_item.name in filter_field_names:
+                    filter_field_ids.add(field_item.id)
+
+        def index_file_filter(entry):
+            if partition_filter is not None:
+                if not partition_filter.test(entry.partition):
+                    return False
+            global_index_meta = entry.index_file.global_index_meta
+            if global_index_meta is None:
+                return False
+            return global_index_meta.index_field_id in filter_field_ids
+
+        from pypaimon.snapshot.snapshot_manager import SnapshotManager
+        snapshot = SnapshotManager(table).get_latest_snapshot()
+        index_file_handler = IndexFileHandler(table=table)
+        entries = index_file_handler.scan(snapshot, index_file_filter)
+        scanned_index_files = [entry.index_file for entry in entries]
+
+        return GlobalIndexScanner(
+            options=table.table_schema.options,
+            fields=table.fields,
+            file_io=table.file_io,
+            
index_path=table.path_factory().global_index_path_factory().index_path(),
+            index_files=scanned_index_files
+        )
+
+    def scan(self, predicate: Optional[Predicate]) -> 
Optional[GlobalIndexResult]:
+        """Scan the global index with the given predicate."""
+        return self._evaluator.evaluate(predicate)
+
+    def close(self):
+        """Close the scanner and release resources."""
+        self._evaluator.close()
+
+    def __enter__(self) -> 'GlobalIndexScanner':
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
+        self.close()
+
+
+def _create_readers(file_io, index_path, index_type_metas, field):
+    """Create readers for a specific field from the index metadata."""
+    if index_type_metas is None:
+        return []
+
+    readers = []
+    for index_type, range_metas in index_type_metas.items():
+        if index_type == 'btree':
+            from pypaimon.globalindex.btree import BTreeIndexReader
+            from pypaimon.globalindex.btree.key_serializer import 
create_serializer
+            for range_key, io_metas in range_metas.items():
+                for metadata in io_metas:
+                    reader = BTreeIndexReader(
+                        key_serializer=create_serializer(field.type),
+                        file_io=file_io,
+                        index_path=index_path,
+                        io_meta=metadata
+                    )
+                    readers.append(reader)
+    return readers
diff --git a/paimon-python/pypaimon/read/read_builder.py 
b/paimon-python/pypaimon/read/read_builder.py
index be489c9bac..d45a526a69 100644
--- a/paimon-python/pypaimon/read/read_builder.py
+++ b/paimon-python/pypaimon/read/read_builder.py
@@ -20,7 +20,6 @@ from typing import List, Optional
 
 from pypaimon.common.predicate import Predicate
 from pypaimon.common.predicate_builder import PredicateBuilder
-from pypaimon.globalindex import VectorSearch
 from pypaimon.read.table_read import TableRead
 from pypaimon.read.table_scan import TableScan
 from pypaimon.schema.data_types import DataField
@@ -37,7 +36,6 @@ class ReadBuilder:
         self._predicate: Optional[Predicate] = None
         self._projection: Optional[List[str]] = None
         self._limit: Optional[int] = None
-        self._vector_search: Optional['VectorSearch'] = None
 
     def with_filter(self, predicate: Predicate) -> 'ReadBuilder':
         self._predicate = predicate
@@ -51,16 +49,11 @@ class ReadBuilder:
         self._limit = limit
         return self
 
-    def with_vector_search(self, vector_search: VectorSearch) -> 'ReadBuilder':
-        self._vector_search = vector_search
-        return self
-
     def new_scan(self) -> TableScan:
         return TableScan(
             table=self.table,
             predicate=self._predicate,
-            limit=self._limit,
-            vector_search=self._vector_search
+            limit=self._limit
         )
 
     def new_read(self) -> TableRead:
diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py 
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index 1b78ddb0c8..b770b15916 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -167,8 +167,7 @@ class FileScanner:
         table,
         manifest_scanner: Callable[[], List[ManifestFileMeta]],
         predicate: Optional[Predicate] = None,
-        limit: Optional[int] = None,
-        vector_search: Optional['VectorSearch'] = None
+        limit: Optional[int] = None
     ):
         from pypaimon.table.file_store_table import FileStoreTable
 
@@ -177,7 +176,6 @@ class FileScanner:
         self.predicate = predicate
         self.predicate_for_stats = remove_row_id_filter(predicate) if 
predicate else None
         self.limit = limit
-        self.vector_search = vector_search
 
         self.snapshot_manager = SnapshotManager(table)
         self.manifest_list_manager = ManifestListManager(table)
@@ -299,67 +297,27 @@ class FileScanner:
         return self.read_manifest_entries(manifest_files)
 
     def _eval_global_index(self):
-        from pypaimon.globalindex.global_index_result import GlobalIndexResult
-        from pypaimon.globalindex.global_index_scan_builder import \
-            GlobalIndexScanBuilder
-        from pypaimon.utils.range import Range
-
-        # No filter and no vector search - nothing to evaluate
-        if self.predicate is None and self.vector_search is None:
+        # No filter - nothing to evaluate
+        if self.predicate is None:
             return None
 
         # Check if global index is enabled
         if not self.table.options.global_index_enabled():
             return None
 
-        # Get latest snapshot
-        snapshot = self.snapshot_manager.get_latest_snapshot()
-        if snapshot is None:
-            return None
-
-        # Check if table has store with global index scan builder
-        index_scan_builder = self.table.new_global_index_scan_builder()
-        if index_scan_builder is None:
-            return None
-
-        # Set partition predicate and snapshot
-        index_scan_builder.with_partition_predicate(
-            self.partition_key_predicate
-        ).with_snapshot(snapshot)
-
-        # Get indexed row ranges
-        indexed_row_ranges = index_scan_builder.shard_list()
-        if not indexed_row_ranges:
-            return None
-
-        # Get next row ID from snapshot
-        next_row_id = snapshot.next_row_id
-        if next_row_id is None:
-            return None
-
-        # Calculate non-indexed row ranges
-        non_indexed_row_ranges = Range(0, next_row_id - 
1).exclude(indexed_row_ranges)
-
-        # Get thread number from options (can be None, meaning use default)
-        thread_num = self.table.options.global_index_thread_num()
+        from pypaimon.globalindex.global_index_scanner import 
GlobalIndexScanner
 
-        # Scan global index in parallel
-        result = GlobalIndexScanBuilder.parallel_scan(
-            indexed_row_ranges,
-            index_scan_builder,
-            self.predicate,
-            self.vector_search,
-            thread_num
-        )
-
-        if result is None:
+        try:
+            scanner = GlobalIndexScanner.create(
+                self.table,
+                partition_filter=self.partition_key_predicate,
+                predicate=self.predicate
+            )
+            with scanner:
+                return scanner.scan(self.predicate)
+        except Exception:
             return None
 
-        for row_range in non_indexed_row_ranges:
-            result = result.or_(GlobalIndexResult.from_range(row_range))
-
-        return result
-
     def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> 
List[ManifestEntry]:
         max_workers = 
self.table.options.scan_manifest_parallelism(os.cpu_count() or 8)
         manifest_files = [entry for entry in manifest_files if 
self._filter_manifest_file(entry)]
diff --git a/paimon-python/pypaimon/read/table_scan.py 
b/paimon-python/pypaimon/read/table_scan.py
index 17da0692e8..416ef1717d 100755
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-from typing import Optional, TYPE_CHECKING
+from typing import Optional
 
 from pypaimon.common.options.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
@@ -26,9 +26,6 @@ from pypaimon.read.scanner.file_scanner import FileScanner
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
 from pypaimon.manifest.manifest_list_manager import ManifestListManager
 
-if TYPE_CHECKING:
-    from pypaimon.globalindex.vector_search import VectorSearch
-
 
 class TableScan:
     """Implementation of TableScan for native Python reading."""
@@ -37,15 +34,13 @@ class TableScan:
         self,
         table,
         predicate: Optional[Predicate],
-        limit: Optional[int],
-        vector_search: Optional['VectorSearch'] = None
+        limit: Optional[int]
     ):
         from pypaimon.table.file_store_table import FileStoreTable
 
         self.table: FileStoreTable = table
         self.predicate = predicate
         self.limit = limit
-        self.vector_search = vector_search
         self.file_scanner = self._create_file_scanner()
 
     def plan(self) -> Plan:
@@ -115,8 +110,7 @@ class TableScan:
                 self.table,
                 tag_manifest_scanner,
                 self.predicate,
-                self.limit,
-                vector_search=self.vector_search
+                self.limit
             )
 
         def all_manifests():
@@ -127,8 +121,7 @@ class TableScan:
             self.table,
             all_manifests,
             self.predicate,
-            self.limit,
-            vector_search=self.vector_search
+            self.limit
         )
 
     def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 
'TableScan':
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index 51a031bd94..9db0fc3a48 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -341,25 +341,6 @@ class FileStoreTable(Table):
     def new_read_builder(self) -> 'ReadBuilder':
         return ReadBuilder(self)
 
-    def new_global_index_scan_builder(self) -> 
Optional['GlobalIndexScanBuilder']:
-        if not self.options.global_index_enabled():
-            return None
-
-        from pypaimon.globalindex.global_index_scan_builder_impl import (
-            GlobalIndexScanBuilderImpl
-        )
-
-        from pypaimon.index.index_file_handler import IndexFileHandler
-
-        return GlobalIndexScanBuilderImpl(
-            options=self.table_schema.options,
-            row_type=self.fields,
-            file_io=self.file_io,
-            index_path_factory=self.path_factory().global_index_path_factory(),
-            snapshot_manager=self.snapshot_manager(),
-            index_file_handler=IndexFileHandler(table=self)
-        )
-
     def new_stream_read_builder(self) -> 'StreamReadBuilder':
         return StreamReadBuilder(self)
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
index 029d6cd378..b45475ff0c 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
@@ -64,7 +64,6 @@ class CreateGlobalIndexProcedureTest extends 
PaimonSparkTestBase with StreamTest
         .scanEntries()
         .asScala
         .filter(_.indexFile().indexType() == "bitmap")
-      table.store().newGlobalIndexScanBuilder().shardList()
       assert(bitmapEntries.nonEmpty)
       val totalRowCount = bitmapEntries.map(_.indexFile().rowCount()).sum
       assert(totalRowCount == 100000L)
@@ -359,7 +358,6 @@ class CreateGlobalIndexProcedureTest extends 
PaimonSparkTestBase with StreamTest
       .scanEntries()
       .asScala
       .filter(_.indexFile().indexType() == "btree")
-    table.store().newGlobalIndexScanBuilder().shardList()
     assert(btreeEntries.nonEmpty)
 
     // 1. assert total row count

Reply via email to