Copilot commented on code in PR #6716:
URL: https://github.com/apache/paimon/pull/6716#discussion_r2587959780


##########
paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexReader.java:
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector;
+
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.StoredFields;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.KnnByteVectorQuery;
+import org.apache.lucene.search.KnnFloatVectorQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.MMapDirectory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Vector global index reader using Apache Lucene 9.x.
+ *
+ * <p>This implementation uses Lucene's native KnnFloatVectorQuery with HNSW 
graph for efficient
+ * approximate nearest neighbor search.
+ */
+public class VectorGlobalIndexReader implements GlobalIndexReader {
+
+    private final List<IndexSearcher> searchers;
+    private final List<Directory> directories;
+    private final List<java.nio.file.Path> tempDirs;
+
+    public VectorGlobalIndexReader(GlobalIndexFileReader fileReader, 
List<GlobalIndexIOMeta> files)
+            throws IOException {
+        this.searchers = new ArrayList<>();
+        this.directories = new ArrayList<>();
+        this.tempDirs = new ArrayList<>();
+        loadIndices(fileReader, files);
+    }
+
+    /**
+     * Search for similar vectors using Lucene KNN search.
+     *
+     * @param query query vector
+     * @param k number of results
+     * @return global index result containing row IDs
+     */
+    public GlobalIndexResult search(float[] query, int k) {
+        KnnFloatVectorQuery knnQuery = new 
KnnFloatVectorQuery(VectorIndex.VECTOR_FIELD, query, k);
+        return search(knnQuery, k);
+    }
+
+    public GlobalIndexResult search(byte[] query, int k) {
+        KnnByteVectorQuery knnQuery = new 
KnnByteVectorQuery(VectorIndex.VECTOR_FIELD, query, k);
+        return search(knnQuery, k);
+    }
+
+    @Override
+    public void close() throws IOException {
+        // Close readers
+        for (IndexSearcher searcher : searchers) {
+            searcher.getIndexReader().close();
+        }
+        searchers.clear();
+
+        // Close directories
+        for (Directory directory : directories) {
+            directory.close();
+        }
+        directories.clear();
+
+        // Clean up temp directories
+        for (java.nio.file.Path tempDir : tempDirs) {
+            deleteDirectory(tempDir);
+        }
+        tempDirs.clear();
+    }
+
+    private GlobalIndexResult search(Query query, int k) {
+        RoaringNavigableMap64 roaringBitmap64 = new RoaringNavigableMap64();
+        for (IndexSearcher searcher : searchers) {
+            try {
+                // Execute search
+                TopDocs topDocs = searcher.search(query, k);
+                StoredFields storedFields = searcher.storedFields();
+                Set<String> fieldsToLoad = Set.of(VectorIndex.ROW_ID_FIELD);
+                // Collect row IDs from results
+                for (org.apache.lucene.search.ScoreDoc scoreDoc : 
topDocs.scoreDocs) {
+                    float rawScore = scoreDoc.score;

Review Comment:
   Unused variable `rawScore`. The score is retrieved from `scoreDoc.score` but 
never used. If the score is not needed for functionality, this line should be 
removed to avoid confusion.
   ```suggestion
   
   ```



##########
paimon-vector/src/test/java/org/apache/paimon/vector/VectorGlobalIndexTest.java:
##########
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link VectorGlobalIndexWriter} and {@link 
VectorGlobalIndexReader}. */
+public class VectorGlobalIndexTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private FileIO fileIO;
+    private Path indexPath;
+    private DataType vectorType;
+
+    @BeforeEach
+    public void setup() {
+        fileIO = new LocalFileIO();
+        indexPath = new Path(tempDir.toString());
+        vectorType = new ArrayType(new FloatType());
+    }
+
+    @AfterEach
+    public void cleanup() throws IOException {
+        if (fileIO != null) {
+            fileIO.delete(indexPath, true);
+        }
+    }
+
+    private GlobalIndexFileWriter createFileWriter(Path path) {
+        return new GlobalIndexFileWriter() {
+            @Override
+            public String newFileName(String prefix) {
+                return prefix + "-" + UUID.randomUUID();
+            }
+
+            @Override
+            public OutputStream newOutputStream(String fileName) throws 
IOException {
+                return fileIO.newOutputStream(new Path(path, fileName), false);
+            }
+        };
+    }
+
+    private GlobalIndexFileReader createFileReader(Path path) {
+        return fileName -> fileIO.newInputStream(new Path(path, fileName));
+    }
+
+    @Test
+    public void testWriteAndReadVectorIndex() throws IOException {
+        Options options = createDefaultOptions();
+        int dimension = 128;
+        int numVectors = 100;
+
+        // Create writer and write vectors
+        GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+        VectorGlobalIndexWriter writer =
+                new VectorGlobalIndexWriter(fileWriter, vectorType, options);
+
+        List<float[]> testVectors = generateRandomVectors(numVectors, 
dimension);
+        for (int i = 0; i < numVectors; i++) {
+            writer.write(new FloatVectorIndex(i, testVectors.get(i)));
+        }
+
+        List<GlobalIndexWriter.ResultEntry> results = writer.finish();
+        assertThat(results).hasSize(1);
+
+        GlobalIndexWriter.ResultEntry result = results.get(0);
+        assertThat(result.fileName()).isNotNull();
+        assertThat(result.rowRange()).isEqualTo(new Range(0, numVectors - 1));
+
+        // Create reader and verify
+        GlobalIndexFileReader fileReader = createFileReader(indexPath);
+        List<GlobalIndexIOMeta> metas = new ArrayList<>();
+        metas.add(
+                new GlobalIndexIOMeta(
+                        result.fileName(),
+                        fileIO.getFileSize(new Path(indexPath, 
result.fileName())),
+                        result.rowRange(),
+                        result.meta()));
+
+        VectorGlobalIndexReader reader = new 
VectorGlobalIndexReader(fileReader, metas);
+
+        // Test search - query with first vector should return itself
+        GlobalIndexResult searchResult = reader.search(testVectors.get(0), 5);
+        assertThat(searchResult).isNotNull();
+
+        reader.close();
+    }
+
+    @Test
+    public void testSearchSimilarVectors() throws IOException {
+        Options options = createDefaultOptions();
+        int dimension = 64;
+        options.setInteger("vector.dim", dimension); // Set correct dimension
+        int numVectors = 50;
+
+        // Create vectors with known similarities
+        float[] baseVector = new float[dimension];
+        for (int i = 0; i < dimension; i++) {
+            baseVector[i] = 1.0f;
+        }
+
+        GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+        VectorGlobalIndexWriter writer =
+                new VectorGlobalIndexWriter(fileWriter, vectorType, options);
+
+        // Write base vector and similar variants
+        writer.write(new FloatVectorIndex(0, baseVector));
+
+        for (int i = 1; i < numVectors; i++) {
+            float[] variant = baseVector.clone();
+            // Add small noise
+            variant[i % dimension] += (i % 2 == 0 ? 0.1f : -0.1f);
+            writer.write(new FloatVectorIndex(i, variant));
+        }
+
+        List<GlobalIndexWriter.ResultEntry> results = writer.finish();
+        assertThat(results).hasSize(1);
+
+        GlobalIndexWriter.ResultEntry result = results.get(0);
+        GlobalIndexFileReader fileReader = createFileReader(indexPath);
+        List<GlobalIndexIOMeta> metas = new ArrayList<>();
+        metas.add(
+                new GlobalIndexIOMeta(
+                        result.fileName(),
+                        fileIO.getFileSize(new Path(indexPath, 
result.fileName())),
+                        result.rowRange(),
+                        result.meta()));
+
+        VectorGlobalIndexReader reader = new 
VectorGlobalIndexReader(fileReader, metas);
+
+        // Search with base vector should return similar vectors
+        GlobalIndexResult searchResult = reader.search(baseVector, 10);
+        assertThat(searchResult).isNotNull();
+
+        reader.close();
+    }
+
+    @Test
+    public void testDifferentSimilarityFunctions() throws IOException {
+        int dimension = 32;
+        int numVectors = 20;
+
+        String[] metrics = {"COSINE", "DOT_PRODUCT", "EUCLIDEAN"};
+
+        for (String metric : metrics) {
+            Options options = createDefaultOptions();
+            options.setString("vector.metric", metric);
+            options.setInteger("vector.dim", dimension); // Set correct 
dimension
+
+            Path metricIndexPath = new Path(indexPath, metric.toLowerCase());
+            GlobalIndexFileWriter fileWriter = 
createFileWriter(metricIndexPath);
+            VectorGlobalIndexWriter writer =
+                    new VectorGlobalIndexWriter(fileWriter, vectorType, 
options);
+
+            List<float[]> testVectors = generateRandomVectors(numVectors, 
dimension);
+            for (int i = 0; i < numVectors; i++) {
+                writer.write(new FloatVectorIndex(i, testVectors.get(i)));
+            }
+
+            List<GlobalIndexWriter.ResultEntry> results = writer.finish();
+            assertThat(results).hasSize(1);
+
+            GlobalIndexWriter.ResultEntry result = results.get(0);
+            GlobalIndexFileReader fileReader = 
createFileReader(metricIndexPath);
+            List<GlobalIndexIOMeta> metas = new ArrayList<>();
+            metas.add(
+                    new GlobalIndexIOMeta(
+                            result.fileName(),
+                            fileIO.getFileSize(new Path(metricIndexPath, 
result.fileName())),
+                            result.rowRange(),
+                            result.meta()));
+
+            VectorGlobalIndexReader reader = new 
VectorGlobalIndexReader(fileReader, metas);
+
+            // Verify search works with this metric
+            GlobalIndexResult searchResult = reader.search(testVectors.get(0), 
3);
+            assertThat(searchResult).isNotNull();
+
+            reader.close();
+        }
+    }
+
+    @Test
+    public void testDifferentDimensions() throws IOException {
+        int[] dimensions = {8, 32, 128, 256};
+
+        for (int dimension : dimensions) {
+            Options options = createDefaultOptions();
+            options.setInteger("vector.dim", dimension);
+
+            Path dimIndexPath = new Path(indexPath, "dim_" + dimension);
+            GlobalIndexFileWriter fileWriter = createFileWriter(dimIndexPath);
+            VectorGlobalIndexWriter writer =
+                    new VectorGlobalIndexWriter(fileWriter, vectorType, 
options);
+
+            int numVectors = 10;
+            List<float[]> testVectors = generateRandomVectors(numVectors, 
dimension);
+            for (int i = 0; i < numVectors; i++) {
+                writer.write(new FloatVectorIndex(i, testVectors.get(i)));
+            }
+
+            List<GlobalIndexWriter.ResultEntry> results = writer.finish();
+            assertThat(results).hasSize(1);
+
+            GlobalIndexWriter.ResultEntry result = results.get(0);
+            GlobalIndexFileReader fileReader = createFileReader(dimIndexPath);
+            List<GlobalIndexIOMeta> metas = new ArrayList<>();
+            metas.add(
+                    new GlobalIndexIOMeta(
+                            result.fileName(),
+                            fileIO.getFileSize(new Path(dimIndexPath, 
result.fileName())),
+                            result.rowRange(),
+                            result.meta()));
+
+            VectorGlobalIndexReader reader = new 
VectorGlobalIndexReader(fileReader, metas);
+
+            // Verify search works with this dimension
+            GlobalIndexResult searchResult = reader.search(testVectors.get(0), 
5);
+            assertThat(searchResult).isNotNull();
+
+            reader.close();
+        }
+    }
+
+    @Test
+    public void testDimensionMismatch() throws IOException {
+        Options options = createDefaultOptions();
+        options.setInteger("vector.dim", 64);
+
+        GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+        VectorGlobalIndexWriter writer =
+                new VectorGlobalIndexWriter(fileWriter, vectorType, options);
+
+        // Try to write vector with wrong dimension
+        float[] wrongDimVector = new float[32]; // Wrong dimension
+        assertThatThrownBy(() -> writer.write(new FloatVectorIndex(0, 
wrongDimVector)))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("dimension mismatch");
+    }
+
+    @Test
+    public void testEmptyIndex() throws IOException {
+        Options options = createDefaultOptions();
+
+        GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+        VectorGlobalIndexWriter writer =
+                new VectorGlobalIndexWriter(fileWriter, vectorType, options);
+
+        List<GlobalIndexWriter.ResultEntry> results = writer.finish();
+        assertThat(results).isEmpty();
+    }
+
+    @Test
+    public void testHNSWParameters() throws IOException {
+        Options options = createDefaultOptions();
+        options.setInteger("vector.m", 32); // Larger M for better recall
+        options.setInteger("vector.ef-construction", 200); // Larger ef for 
better quality
+
+        int dimension = 64;
+        options.setInteger("vector.dim", dimension); // Set correct dimension
+        int numVectors = 50;
+
+        GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+        VectorGlobalIndexWriter writer =
+                new VectorGlobalIndexWriter(fileWriter, vectorType, options);
+
+        List<float[]> testVectors = generateRandomVectors(numVectors, 
dimension);
+        for (int i = 0; i < numVectors; i++) {
+            writer.write(new FloatVectorIndex(i, testVectors.get(i)));
+        }
+
+        List<GlobalIndexWriter.ResultEntry> results = writer.finish();
+        assertThat(results).hasSize(1);
+
+        GlobalIndexWriter.ResultEntry result = results.get(0);
+        GlobalIndexFileReader fileReader = createFileReader(indexPath);
+        List<GlobalIndexIOMeta> metas = new ArrayList<>();
+        metas.add(
+                new GlobalIndexIOMeta(
+                        result.fileName(),
+                        fileIO.getFileSize(new Path(indexPath, 
result.fileName())),
+                        result.rowRange(),
+                        result.meta()));
+
+        VectorGlobalIndexReader reader = new 
VectorGlobalIndexReader(fileReader, metas);
+
+        // Verify search works with custom HNSW parameters
+        GlobalIndexResult searchResult = reader.search(testVectors.get(0), 5);
+        assertThat(searchResult).isNotNull();
+
+        reader.close();
+    }
+
+    @Test
+    public void testLargeK() throws IOException {
+        Options options = createDefaultOptions();
+        int dimension = 32;
+        options.setInteger("vector.dim", dimension); // Set correct dimension
+        int numVectors = 100;
+
+        GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+        VectorGlobalIndexWriter writer =
+                new VectorGlobalIndexWriter(fileWriter, vectorType, options);
+
+        List<float[]> testVectors = generateRandomVectors(numVectors, 
dimension);
+        for (int i = 0; i < numVectors; i++) {
+            writer.write(new FloatVectorIndex(i, testVectors.get(i)));
+        }
+
+        List<GlobalIndexWriter.ResultEntry> results = writer.finish();
+        assertThat(results).hasSize(1);
+
+        GlobalIndexWriter.ResultEntry result = results.get(0);
+        GlobalIndexFileReader fileReader = createFileReader(indexPath);
+        List<GlobalIndexIOMeta> metas = new ArrayList<>();
+        metas.add(
+                new GlobalIndexIOMeta(
+                        result.fileName(),
+                        fileIO.getFileSize(new Path(indexPath, 
result.fileName())),
+                        result.rowRange(),
+                        result.meta()));
+
+        VectorGlobalIndexReader reader = new 
VectorGlobalIndexReader(fileReader, metas);
+
+        // Search with k larger than number of vectors
+        GlobalIndexResult searchResult = reader.search(testVectors.get(0), 
200);
+        assertThat(searchResult).isNotNull();
+        assertThat(searchResult).isNotNull();

Review Comment:
   Duplicate assertion: Line 373 repeats the same assertion as line 372 
(`assertThat(searchResult).isNotNull()`). This redundant check should be 
removed.
   ```suggestion
   
   ```



##########
paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexWriter.java:
##########
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector;
+
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.Range;
+
+import org.apache.lucene.codecs.KnnVectorsFormat;
+import org.apache.lucene.codecs.lucene912.Lucene912Codec;
+import 
org.apache.lucene.codecs.lucene99.Lucene99HnswScalarQuantizedVectorsFormat;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.MMapDirectory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * Vector global index writer using Apache Lucene 9.x.
+ *
+ * <p>This implementation uses Lucene's native KnnFloatVectorField with HNSW 
algorithm for efficient
+ * approximate nearest neighbor search.
+ */
+public class VectorGlobalIndexWriter implements GlobalIndexWriter {
+
+    private final GlobalIndexFileWriter fileWriter;
+    private final DataType fieldType;
+    private final VectorIndexOptions vectorOptions;
+    private final VectorSimilarityFunction similarityFunction;
+    private final int sizePerIndex;
+
+    private final List<VectorIndex> vectors;
+
+    public VectorGlobalIndexWriter(
+            GlobalIndexFileWriter fileWriter, DataType fieldType, Options 
options) {
+        checkArgument(
+                fieldType instanceof ArrayType,
+                "Vector field type must be ARRAY, but was: " + fieldType);
+        this.fileWriter = fileWriter;
+        this.fieldType = fieldType;
+        this.vectors = new ArrayList<>();
+        this.vectorOptions = new VectorIndexOptions(options);
+        this.similarityFunction = parseMetricToLucene(vectorOptions.metric());
+        this.sizePerIndex = vectorOptions.sizePerIndex();
+    }
+
+    @Override
+    public void write(Object key) {
+        if (key instanceof FloatVectorIndex) {
+            FloatVectorIndex vectorKey = (FloatVectorIndex) key;
+            float[] vector = vectorKey.vector();
+
+            checkArgument(
+                    vector.length == vectorOptions.dimension(),
+                    "Vector dimension mismatch: expected "
+                            + vectorOptions.dimension()
+                            + ", but got "
+                            + vector.length);
+
+            vectors.add(vectorKey);
+        } else if (key instanceof ByteVectorIndex) {
+            ByteVectorIndex vectorKey = (ByteVectorIndex) key;
+            byte[] byteVector = vectorKey.vector();
+
+            checkArgument(
+                    byteVector.length == vectorOptions.dimension(),
+                    "Vector dimension mismatch: expected "
+                            + vectorOptions.dimension()
+                            + ", but got "
+                            + byteVector.length);
+
+            vectors.add(vectorKey);
+        } else {
+            throw new IllegalArgumentException(
+                    "Unsupported index type: " + key.getClass().getName());
+        }
+    }
+
+    @Override
+    public List<ResultEntry> finish() {
+        try {
+            if (vectors.isEmpty()) {
+                return new ArrayList<>();
+            }
+
+            List<ResultEntry> results = new ArrayList<>();
+
+            // Split vectors into batches if size exceeds sizePerIndex
+            int totalVectors = vectors.size();
+            int numBatches = (int) Math.ceil((double) totalVectors / 
sizePerIndex);
+
+            for (int batchIndex = 0; batchIndex < numBatches; batchIndex++) {
+                int startIdx = batchIndex * sizePerIndex;
+                int endIdx = Math.min(startIdx + sizePerIndex, totalVectors);
+                List<VectorIndex> batchVectors = vectors.subList(startIdx, 
endIdx);
+
+                // Build index
+                byte[] indexBytes =
+                        buildIndex(
+                                batchVectors,
+                                this.vectorOptions.m(),
+                                this.vectorOptions.efConstruction());
+
+                // Create metadata
+                VectorIndexMetadata metadata =
+                        new VectorIndexMetadata(
+                                vectorOptions.dimension(),
+                                vectorOptions.metric(),
+                                vectorOptions.m(),
+                                vectorOptions.efConstruction());
+                byte[] metaBytes = 
VectorIndexMetadata.serializeMetadata(metadata);
+
+                // Write to file
+                String fileName = 
fileWriter.newFileName(VectorGlobalIndexerFactory.IDENTIFIER);
+                try (OutputStream out = fileWriter.newOutputStream(fileName)) {
+                    out.write(indexBytes);
+                }
+                long minRowIdInBatch = batchVectors.get(0).rowId();
+                long maxRowIdInBatch = batchVectors.get(batchVectors.size() - 
1).rowId();
+                results.add(
+                        ResultEntry.of(
+                                fileName, metaBytes, new 
Range(minRowIdInBatch, maxRowIdInBatch)));
+            }
+
+            return results;
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to write vector global index", 
e);
+        }
+    }
+
+    private VectorSimilarityFunction parseMetricToLucene(String metric) {
+        switch (metric.toUpperCase()) {
+            case "COSINE":
+                return VectorSimilarityFunction.COSINE;
+            case "DOT_PRODUCT":
+                return VectorSimilarityFunction.DOT_PRODUCT;
+            case "EUCLIDEAN":
+                return VectorSimilarityFunction.EUCLIDEAN;
+            case "MAX_INNER_PRODUCT":
+                return VectorSimilarityFunction.MAXIMUM_INNER_PRODUCT;
+            default:
+                throw new IllegalArgumentException("Unsupported metric: " + 
metric);
+        }
+    }
+
+    private byte[] buildIndex(List<VectorIndex> batchVectors, int m, int 
efConstruction)
+            throws IOException {
+        // Create temporary directory for MMap
+        java.nio.file.Path tempDir =
+                java.nio.file.Files.createTempDirectory(
+                        fileWriter.newFileName("paimon-vector-index"));
+        Directory directory = new MMapDirectory(tempDir);
+
+        try {
+            // Configure index writer
+            IndexWriterConfig config = getIndexWriterConfig(m, efConstruction);
+
+            try (IndexWriter writer = new IndexWriter(directory, config)) {
+                // Add each vector as a document
+                for (VectorIndex vectorIndex : batchVectors) {
+                    Document doc = new Document();
+
+                    // Add KNN vector field
+                    doc.add(vectorIndex.indexableField(similarityFunction));
+
+                    // Store row ID
+                    doc.add(vectorIndex.rowIdStoredField());
+
+                    writer.addDocument(doc);
+                }
+
+                // Commit changes
+                writer.commit();
+            }
+
+            // Serialize directory to byte array
+            return serializeDirectory(directory);
+        } finally {
+            // Clean up
+            directory.close();
+            deleteDirectory(tempDir);
+        }
+    }
+
+    private static IndexWriterConfig getIndexWriterConfig(int m, int 
efConstruction) {
+        IndexWriterConfig config = new IndexWriterConfig();
+        config.setRAMBufferSizeMB(256); // Increase buffer for better 
performance
+        config.setCodec(
+                new Lucene912Codec(Lucene912Codec.Mode.BEST_SPEED) {
+                    @Override
+                    public KnnVectorsFormat getKnnVectorsFormatForField(String 
field) {
+                        return new Lucene99HnswScalarQuantizedVectorsFormat(m, 
efConstruction);
+                    }
+                });
+        return config;
+    }
+
+    private void deleteDirectory(java.nio.file.Path path) throws IOException {
+        if (java.nio.file.Files.exists(path)) {
+            java.nio.file.Files.walk(path)
+                    .sorted(java.util.Comparator.reverseOrder())
+                    .forEach(
+                            p -> {
+                                try {
+                                    java.nio.file.Files.delete(p);
+                                } catch (IOException e) {
+                                    // Ignore cleanup errors
+                                }

Review Comment:
   Silently ignoring cleanup errors: IOException during directory cleanup is 
caught and ignored without logging. This could hide issues like permission 
problems or disk errors. Consider logging the exception at least at debug level 
to aid troubleshooting.



##########
paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexReader.java:
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector;
+
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.StoredFields;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.KnnByteVectorQuery;
+import org.apache.lucene.search.KnnFloatVectorQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.MMapDirectory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Vector global index reader using Apache Lucene 9.x.
+ *
+ * <p>This implementation uses Lucene's native KnnFloatVectorQuery with HNSW 
graph for efficient
+ * approximate nearest neighbor search.
+ */
+public class VectorGlobalIndexReader implements GlobalIndexReader {
+
+    private final List<IndexSearcher> searchers;
+    private final List<Directory> directories;
+    private final List<java.nio.file.Path> tempDirs;
+
+    public VectorGlobalIndexReader(GlobalIndexFileReader fileReader, 
List<GlobalIndexIOMeta> files)
+            throws IOException {
+        this.searchers = new ArrayList<>();
+        this.directories = new ArrayList<>();
+        this.tempDirs = new ArrayList<>();
+        loadIndices(fileReader, files);
+    }
+
+    /**
+     * Search for similar vectors using Lucene KNN search.
+     *
+     * @param query query vector
+     * @param k number of results
+     * @return global index result containing row IDs
+     */
+    public GlobalIndexResult search(float[] query, int k) {
+        KnnFloatVectorQuery knnQuery = new 
KnnFloatVectorQuery(VectorIndex.VECTOR_FIELD, query, k);
+        return search(knnQuery, k);
+    }
+
+    public GlobalIndexResult search(byte[] query, int k) {
+        KnnByteVectorQuery knnQuery = new 
KnnByteVectorQuery(VectorIndex.VECTOR_FIELD, query, k);
+        return search(knnQuery, k);
+    }
+
+    @Override
+    public void close() throws IOException {
+        // Close readers
+        for (IndexSearcher searcher : searchers) {
+            searcher.getIndexReader().close();
+        }
+        searchers.clear();
+
+        // Close directories
+        for (Directory directory : directories) {
+            directory.close();
+        }
+        directories.clear();
+
+        // Clean up temp directories
+        for (java.nio.file.Path tempDir : tempDirs) {
+            deleteDirectory(tempDir);
+        }
+        tempDirs.clear();
+    }
+
+    private GlobalIndexResult search(Query query, int k) {
+        RoaringNavigableMap64 roaringBitmap64 = new RoaringNavigableMap64();
+        for (IndexSearcher searcher : searchers) {
+            try {
+                // Execute search
+                TopDocs topDocs = searcher.search(query, k);
+                StoredFields storedFields = searcher.storedFields();
+                Set<String> fieldsToLoad = Set.of(VectorIndex.ROW_ID_FIELD);
+                // Collect row IDs from results
+                for (org.apache.lucene.search.ScoreDoc scoreDoc : 
topDocs.scoreDocs) {
+                    float rawScore = scoreDoc.score;
+                    Document doc = storedFields.document(scoreDoc.doc, 
fieldsToLoad);
+                    long rowId = 
doc.getField(VectorIndex.ROW_ID_FIELD).numericValue().longValue();
+                    roaringBitmap64.add(rowId);
+                }
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to search vector index", e);
+            }
+        }
+
+        return GlobalIndexResult.create(() -> roaringBitmap64);
+    }
+
+    private void loadIndices(GlobalIndexFileReader fileReader, 
List<GlobalIndexIOMeta> files)
+            throws IOException {
+        for (GlobalIndexIOMeta meta : files) {
+            try (SeekableInputStream in = 
fileReader.getInputStream(meta.fileName())) {
+                byte[] indexBytes = new byte[(int) meta.fileSize()];
+                int totalRead = 0;
+                while (totalRead < indexBytes.length) {
+                    int read = in.read(indexBytes, totalRead, 
indexBytes.length - totalRead);
+                    if (read == -1) {
+                        throw new IOException("Unexpected end of stream");
+                    }
+                    totalRead += read;
+                }
+
+                Directory directory = deserializeDirectory(indexBytes);
+                directories.add(directory);
+
+                IndexReader reader = DirectoryReader.open(directory);
+                IndexSearcher searcher = new IndexSearcher(reader);
+                searchers.add(searcher);
+            }
+        }
+    }
+
+    private Directory deserializeDirectory(byte[] data) throws IOException {
+        // Create temporary directory for MMap
+        Path tempDir = Files.createTempDirectory("paimon-vector-read" + 
UUID.randomUUID());
+        tempDirs.add(tempDir);
+        Directory directory = new MMapDirectory(tempDir);
+
+        ByteBuffer buffer = ByteBuffer.wrap(data);
+
+        // Read number of files
+        int numFiles = buffer.getInt();
+
+        for (int i = 0; i < numFiles; i++) {
+            // Read file name
+            int nameLength = buffer.getInt();
+            byte[] nameBytes = new byte[nameLength];
+            buffer.get(nameBytes);
+            String fileName = new String(nameBytes, StandardCharsets.UTF_8);
+
+            // Read file content
+            long fileLength = buffer.getLong();
+            byte[] fileContent = new byte[(int) fileLength];
+            buffer.get(fileContent);
+
+            // Write to directory
+            try (IndexOutput output = directory.createOutput(fileName, null)) {
+                output.writeBytes(fileContent, 0, fileContent.length);
+            }
+        }
+
+        return directory;
+    }
+
+    private void deleteDirectory(java.nio.file.Path path) throws IOException {
+        if (java.nio.file.Files.exists(path)) {
+            java.nio.file.Files.walk(path)
+                    .sorted(java.util.Comparator.reverseOrder())
+                    .forEach(
+                            p -> {
+                                try {
+                                    java.nio.file.Files.delete(p);
+                                } catch (IOException e) {
+                                    // Ignore cleanup errors
+                                }

Review Comment:
   Silently ignoring cleanup errors: IOException during directory cleanup is 
caught and ignored without logging. This could hide issues like permission 
problems or disk errors. Consider logging the exception at least at debug level 
to aid troubleshooting.



##########
paimon-vector/src/test/java/org/apache/paimon/vector/VectorGlobalIndexTest.java:
##########
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link VectorGlobalIndexWriter} and {@link 
VectorGlobalIndexReader}. */
+public class VectorGlobalIndexTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private FileIO fileIO;
+    private Path indexPath;
+    private DataType vectorType;
+
+    @BeforeEach
+    public void setup() {
+        fileIO = new LocalFileIO();
+        indexPath = new Path(tempDir.toString());
+        vectorType = new ArrayType(new FloatType());
+    }
+
+    @AfterEach
+    public void cleanup() throws IOException {
+        if (fileIO != null) {
+            fileIO.delete(indexPath, true);
+        }
+    }
+
+    private GlobalIndexFileWriter createFileWriter(Path path) {
+        return new GlobalIndexFileWriter() {
+            @Override
+            public String newFileName(String prefix) {
+                return prefix + "-" + UUID.randomUUID();
+            }
+
+            @Override
+            public OutputStream newOutputStream(String fileName) throws 
IOException {
+                return fileIO.newOutputStream(new Path(path, fileName), false);
+            }
+        };
+    }
+
+    private GlobalIndexFileReader createFileReader(Path path) {
+        return fileName -> fileIO.newInputStream(new Path(path, fileName));
+    }
+
+    @Test
+    public void testWriteAndReadVectorIndex() throws IOException {
+        Options options = createDefaultOptions();
+        int dimension = 128;
+        int numVectors = 100;
+
+        // Create writer and write vectors
+        GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+        VectorGlobalIndexWriter writer =
+                new VectorGlobalIndexWriter(fileWriter, vectorType, options);
+
+        List<float[]> testVectors = generateRandomVectors(numVectors, 
dimension);
+        for (int i = 0; i < numVectors; i++) {
+            writer.write(new FloatVectorIndex(i, testVectors.get(i)));
+        }
+
+        List<GlobalIndexWriter.ResultEntry> results = writer.finish();
+        assertThat(results).hasSize(1);
+
+        GlobalIndexWriter.ResultEntry result = results.get(0);
+        assertThat(result.fileName()).isNotNull();
+        assertThat(result.rowRange()).isEqualTo(new Range(0, numVectors - 1));
+
+        // Create reader and verify
+        GlobalIndexFileReader fileReader = createFileReader(indexPath);
+        List<GlobalIndexIOMeta> metas = new ArrayList<>();
+        metas.add(
+                new GlobalIndexIOMeta(
+                        result.fileName(),
+                        fileIO.getFileSize(new Path(indexPath, 
result.fileName())),
+                        result.rowRange(),
+                        result.meta()));
+
+        VectorGlobalIndexReader reader = new 
VectorGlobalIndexReader(fileReader, metas);
+
+        // Test search - query with first vector should return itself
+        GlobalIndexResult searchResult = reader.search(testVectors.get(0), 5);
+        assertThat(searchResult).isNotNull();
+
+        reader.close();
+    }
+
+    @Test
+    public void testSearchSimilarVectors() throws IOException {
+        Options options = createDefaultOptions();
+        int dimension = 64;
+        options.setInteger("vector.dim", dimension); // Set correct dimension
+        int numVectors = 50;
+
+        // Create vectors with known similarities
+        float[] baseVector = new float[dimension];
+        for (int i = 0; i < dimension; i++) {
+            baseVector[i] = 1.0f;
+        }
+
+        GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+        VectorGlobalIndexWriter writer =
+                new VectorGlobalIndexWriter(fileWriter, vectorType, options);
+
+        // Write base vector and similar variants
+        writer.write(new FloatVectorIndex(0, baseVector));
+
+        for (int i = 1; i < numVectors; i++) {
+            float[] variant = baseVector.clone();
+            // Add small noise
+            variant[i % dimension] += (i % 2 == 0 ? 0.1f : -0.1f);
+            writer.write(new FloatVectorIndex(i, variant));
+        }
+
+        List<GlobalIndexWriter.ResultEntry> results = writer.finish();
+        assertThat(results).hasSize(1);
+
+        GlobalIndexWriter.ResultEntry result = results.get(0);
+        GlobalIndexFileReader fileReader = createFileReader(indexPath);
+        List<GlobalIndexIOMeta> metas = new ArrayList<>();
+        metas.add(
+                new GlobalIndexIOMeta(
+                        result.fileName(),
+                        fileIO.getFileSize(new Path(indexPath, 
result.fileName())),
+                        result.rowRange(),
+                        result.meta()));
+
+        VectorGlobalIndexReader reader = new 
VectorGlobalIndexReader(fileReader, metas);
+
+        // Search with base vector should return similar vectors
+        GlobalIndexResult searchResult = reader.search(baseVector, 10);
+        assertThat(searchResult).isNotNull();
+
+        reader.close();
+    }
+
+    @Test
+    public void testDifferentSimilarityFunctions() throws IOException {
+        int dimension = 32;
+        int numVectors = 20;
+
+        String[] metrics = {"COSINE", "DOT_PRODUCT", "EUCLIDEAN"};
+
+        for (String metric : metrics) {
+            Options options = createDefaultOptions();
+            options.setString("vector.metric", metric);
+            options.setInteger("vector.dim", dimension); // Set correct 
dimension
+
+            Path metricIndexPath = new Path(indexPath, metric.toLowerCase());
+            GlobalIndexFileWriter fileWriter = 
createFileWriter(metricIndexPath);
+            VectorGlobalIndexWriter writer =
+                    new VectorGlobalIndexWriter(fileWriter, vectorType, 
options);
+
+            List<float[]> testVectors = generateRandomVectors(numVectors, 
dimension);
+            for (int i = 0; i < numVectors; i++) {
+                writer.write(new FloatVectorIndex(i, testVectors.get(i)));
+            }
+
+            List<GlobalIndexWriter.ResultEntry> results = writer.finish();
+            assertThat(results).hasSize(1);
+
+            GlobalIndexWriter.ResultEntry result = results.get(0);
+            GlobalIndexFileReader fileReader = 
createFileReader(metricIndexPath);
+            List<GlobalIndexIOMeta> metas = new ArrayList<>();
+            metas.add(
+                    new GlobalIndexIOMeta(
+                            result.fileName(),
+                            fileIO.getFileSize(new Path(metricIndexPath, 
result.fileName())),
+                            result.rowRange(),
+                            result.meta()));
+
+            VectorGlobalIndexReader reader = new 
VectorGlobalIndexReader(fileReader, metas);
+
+            // Verify search works with this metric
+            GlobalIndexResult searchResult = reader.search(testVectors.get(0), 
3);
+            assertThat(searchResult).isNotNull();
+
+            reader.close();
+        }
+    }
+
+    @Test
+    public void testDifferentDimensions() throws IOException {
+        int[] dimensions = {8, 32, 128, 256};
+
+        for (int dimension : dimensions) {
+            Options options = createDefaultOptions();
+            options.setInteger("vector.dim", dimension);
+
+            Path dimIndexPath = new Path(indexPath, "dim_" + dimension);
+            GlobalIndexFileWriter fileWriter = createFileWriter(dimIndexPath);
+            VectorGlobalIndexWriter writer =
+                    new VectorGlobalIndexWriter(fileWriter, vectorType, 
options);
+
+            int numVectors = 10;
+            List<float[]> testVectors = generateRandomVectors(numVectors, 
dimension);
+            for (int i = 0; i < numVectors; i++) {
+                writer.write(new FloatVectorIndex(i, testVectors.get(i)));
+            }
+
+            List<GlobalIndexWriter.ResultEntry> results = writer.finish();
+            assertThat(results).hasSize(1);
+
+            GlobalIndexWriter.ResultEntry result = results.get(0);
+            GlobalIndexFileReader fileReader = createFileReader(dimIndexPath);
+            List<GlobalIndexIOMeta> metas = new ArrayList<>();
+            metas.add(
+                    new GlobalIndexIOMeta(
+                            result.fileName(),
+                            fileIO.getFileSize(new Path(dimIndexPath, 
result.fileName())),
+                            result.rowRange(),
+                            result.meta()));
+
+            VectorGlobalIndexReader reader = new 
VectorGlobalIndexReader(fileReader, metas);
+
+            // Verify search works with this dimension
+            GlobalIndexResult searchResult = reader.search(testVectors.get(0), 
5);
+            assertThat(searchResult).isNotNull();
+
+            reader.close();
+        }
+    }
+
+    @Test
+    public void testDimensionMismatch() throws IOException {
+        Options options = createDefaultOptions();
+        options.setInteger("vector.dim", 64);
+
+        GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+        VectorGlobalIndexWriter writer =
+                new VectorGlobalIndexWriter(fileWriter, vectorType, options);
+
+        // Try to write vector with wrong dimension
+        float[] wrongDimVector = new float[32]; // Wrong dimension
+        assertThatThrownBy(() -> writer.write(new FloatVectorIndex(0, 
wrongDimVector)))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("dimension mismatch");
+    }
+
+    @Test
+    public void testEmptyIndex() throws IOException {
+        Options options = createDefaultOptions();
+
+        GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+        VectorGlobalIndexWriter writer =
+                new VectorGlobalIndexWriter(fileWriter, vectorType, options);
+
+        List<GlobalIndexWriter.ResultEntry> results = writer.finish();
+        assertThat(results).isEmpty();
+    }
+
+    @Test
+    public void testHNSWParameters() throws IOException {
+        Options options = createDefaultOptions();
+        options.setInteger("vector.m", 32); // Larger M for better recall
+        options.setInteger("vector.ef-construction", 200); // Larger ef for 
better quality
+
+        int dimension = 64;
+        options.setInteger("vector.dim", dimension); // Set correct dimension
+        int numVectors = 50;
+
+        GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+        VectorGlobalIndexWriter writer =
+                new VectorGlobalIndexWriter(fileWriter, vectorType, options);
+
+        List<float[]> testVectors = generateRandomVectors(numVectors, 
dimension);
+        for (int i = 0; i < numVectors; i++) {
+            writer.write(new FloatVectorIndex(i, testVectors.get(i)));
+        }
+
+        List<GlobalIndexWriter.ResultEntry> results = writer.finish();
+        assertThat(results).hasSize(1);
+
+        GlobalIndexWriter.ResultEntry result = results.get(0);
+        GlobalIndexFileReader fileReader = createFileReader(indexPath);
+        List<GlobalIndexIOMeta> metas = new ArrayList<>();
+        metas.add(
+                new GlobalIndexIOMeta(
+                        result.fileName(),
+                        fileIO.getFileSize(new Path(indexPath, 
result.fileName())),
+                        result.rowRange(),
+                        result.meta()));
+
+        VectorGlobalIndexReader reader = new 
VectorGlobalIndexReader(fileReader, metas);
+
+        // Verify search works with custom HNSW parameters
+        GlobalIndexResult searchResult = reader.search(testVectors.get(0), 5);
+        assertThat(searchResult).isNotNull();
+
+        reader.close();
+    }
+
+    @Test
+    public void testLargeK() throws IOException {
+        Options options = createDefaultOptions();
+        int dimension = 32;
+        options.setInteger("vector.dim", dimension); // Set correct dimension
+        int numVectors = 100;
+
+        GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+        VectorGlobalIndexWriter writer =
+                new VectorGlobalIndexWriter(fileWriter, vectorType, options);
+
+        List<float[]> testVectors = generateRandomVectors(numVectors, 
dimension);
+        for (int i = 0; i < numVectors; i++) {
+            writer.write(new FloatVectorIndex(i, testVectors.get(i)));
+        }
+
+        List<GlobalIndexWriter.ResultEntry> results = writer.finish();
+        assertThat(results).hasSize(1);
+
+        GlobalIndexWriter.ResultEntry result = results.get(0);
+        GlobalIndexFileReader fileReader = createFileReader(indexPath);
+        List<GlobalIndexIOMeta> metas = new ArrayList<>();
+        metas.add(
+                new GlobalIndexIOMeta(
+                        result.fileName(),
+                        fileIO.getFileSize(new Path(indexPath, 
result.fileName())),
+                        result.rowRange(),
+                        result.meta()));
+
+        VectorGlobalIndexReader reader = new 
VectorGlobalIndexReader(fileReader, metas);
+
+        // Search with k larger than number of vectors
+        GlobalIndexResult searchResult = reader.search(testVectors.get(0), 
200);
+        assertThat(searchResult).isNotNull();
+        assertThat(searchResult).isNotNull();
+
+        reader.close();
+    }
+
+    @Test
+    public void testEndToEndMultiShardScenario() throws IOException {
+        Options options = createDefaultOptions();
+        int dimension = 128;
+        options.setInteger("vector.dim", dimension);
+        options.setString("vector.metric", "COSINE");
+        options.setInteger("vector.m", 24);
+        options.setInteger("vector.ef-construction", 150);
+
+        // Simulate a multi-shard scenario with incremental writes
+        int totalVectors = 300;
+        int shardsCount = 3;
+        int vectorsPerShard = totalVectors / shardsCount;
+
+        List<GlobalIndexIOMeta> allMetas = new ArrayList<>();
+        List<float[]> allTestVectors = generateRandomVectors(totalVectors, 
dimension);
+
+        // Write vectors in multiple shards (simulating incremental indexing)
+        for (int shardIdx = 0; shardIdx < shardsCount; shardIdx++) {
+            Path shardPath = new Path(indexPath, "shard_" + shardIdx);
+            GlobalIndexFileWriter fileWriter = createFileWriter(shardPath);
+            VectorGlobalIndexWriter writer =
+                    new VectorGlobalIndexWriter(fileWriter, vectorType, 
options);
+
+            int startIdx = shardIdx * vectorsPerShard;
+            int endIdx = Math.min(startIdx + vectorsPerShard, totalVectors);
+
+            // Write vectors for this shard
+            for (int i = startIdx; i < endIdx; i++) {
+                writer.write(new FloatVectorIndex(i, allTestVectors.get(i)));
+            }
+
+            List<GlobalIndexWriter.ResultEntry> results = writer.finish();
+            assertThat(results).hasSize(1);
+
+            GlobalIndexWriter.ResultEntry result = results.get(0);
+            allMetas.add(
+                    new GlobalIndexIOMeta(
+                            result.fileName(),
+                            fileIO.getFileSize(new Path(shardPath, 
result.fileName())),
+                            result.rowRange(),
+                            result.meta()));
+        }
+
+        // Create a single reader for all shards
+        GlobalIndexFileReader fileReader =
+                new GlobalIndexFileReader() {
+                    @Override
+                    public org.apache.paimon.fs.SeekableInputStream 
getInputStream(String fileName)
+                            throws IOException {
+                        // Try to find the file in any shard directory
+                        for (int shardIdx = 0; shardIdx < shardsCount; 
shardIdx++) {
+                            Path shardPath = new Path(indexPath, "shard_" + 
shardIdx);
+                            Path filePath = new Path(shardPath, fileName);
+                            if (fileIO.exists(filePath)) {
+                                return fileIO.newInputStream(filePath);
+                            }
+                        }
+                        throw new IOException("File not found: " + fileName);
+                    }
+                };
+
+        VectorGlobalIndexReader reader = new 
VectorGlobalIndexReader(fileReader, allMetas);
+
+        // Test 1: Search with various k values
+        int[] kValues = {1, 5, 10, 20, 50};
+        for (int k : kValues) {
+            GlobalIndexResult searchResult = 
reader.search(allTestVectors.get(0), k);
+            assertThat(searchResult).isNotNull();
+            assertThat(containsRowId(searchResult, 0)).isTrue();
+        }
+
+        // Test 2: Search with query vectors from different shards
+        for (int shardIdx = 0; shardIdx < shardsCount; shardIdx++) {
+            int queryIdx = shardIdx * vectorsPerShard + 5;
+            GlobalIndexResult searchResult = 
reader.search(allTestVectors.get(queryIdx), 10);
+            assertThat(searchResult).isNotNull();
+            assertThat(containsRowId(searchResult, queryIdx))
+                    .as("Search result should contain the query vector's own 
ID")
+                    .isTrue();
+        }
+
+        // Test 3: Create a known similar vector and verify top result
+        float[] originalVector = allTestVectors.get(50);
+        float[] slightlyModifiedVector = originalVector.clone();
+        // Add minimal noise to create a very similar vector
+        for (int i = 0; i < dimension; i++) {
+            slightlyModifiedVector[i] += (i % 2 == 0 ? 0.001f : -0.001f);
+        }
+        normalize(slightlyModifiedVector);
+
+        GlobalIndexResult similarSearchResult = 
reader.search(slightlyModifiedVector, 5);
+        assertThat(similarSearchResult).isNotNull();
+        assertThat(containsRowId(similarSearchResult, 50))
+                .as("Search with very similar vector should find the original")
+                .isTrue();
+
+        // Test 4: Verify search quality with a synthetic cluster
+        // Create a cluster center
+        float[] clusterCenter = new float[dimension];
+        Random random = new Random(123);
+        for (int i = 0; i < dimension; i++) {
+            clusterCenter[i] = random.nextFloat() * 2 - 1;
+        }
+        normalize(clusterCenter);
+
+        GlobalIndexResult clusterSearchResult = reader.search(clusterCenter, 
20);
+        assertThat(clusterSearchResult).isNotNull();
+        
assertThat(clusterSearchResult.results().iterator().hasNext()).isTrue();
+
+        // Test 5: Edge case - search with k larger than total vectors
+        GlobalIndexResult largeKResult = reader.search(allTestVectors.get(10), 
totalVectors * 2);
+        assertThat(largeKResult).isNotNull();
+
+        // Test 6: Multiple consecutive searches (stress test)
+        for (int i = 0; i < 20; i++) {
+            int randomIdx = random.nextInt(totalVectors);

Review Comment:
   Using uninitialized `Random` variable. Line 478 creates a `Random` instance 
with seed 123, but line 494 uses an uninitialized `random` variable 
(lowercase). This will cause a compilation error. Should use the `random` 
instance from line 478 or create a new one before this line.



##########
paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexReader.java:
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector;
+
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.StoredFields;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.KnnByteVectorQuery;
+import org.apache.lucene.search.KnnFloatVectorQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.MMapDirectory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Vector global index reader using Apache Lucene 9.x.
+ *
+ * <p>This implementation uses Lucene's native KnnFloatVectorQuery with HNSW 
graph for efficient
+ * approximate nearest neighbor search.
+ */
+public class VectorGlobalIndexReader implements GlobalIndexReader {
+
+    private final List<IndexSearcher> searchers;
+    private final List<Directory> directories;
+    private final List<java.nio.file.Path> tempDirs;
+
+    public VectorGlobalIndexReader(GlobalIndexFileReader fileReader, 
List<GlobalIndexIOMeta> files)
+            throws IOException {
+        this.searchers = new ArrayList<>();
+        this.directories = new ArrayList<>();
+        this.tempDirs = new ArrayList<>();
+        loadIndices(fileReader, files);
+    }
+
+    /**
+     * Search for similar vectors using Lucene KNN search.
+     *
+     * @param query query vector
+     * @param k number of results
+     * @return global index result containing row IDs
+     */
+    public GlobalIndexResult search(float[] query, int k) {
+        KnnFloatVectorQuery knnQuery = new 
KnnFloatVectorQuery(VectorIndex.VECTOR_FIELD, query, k);
+        return search(knnQuery, k);
+    }
+
+    public GlobalIndexResult search(byte[] query, int k) {
+        KnnByteVectorQuery knnQuery = new 
KnnByteVectorQuery(VectorIndex.VECTOR_FIELD, query, k);
+        return search(knnQuery, k);
+    }
+
+    @Override
+    public void close() throws IOException {
+        // Close readers
+        for (IndexSearcher searcher : searchers) {
+            searcher.getIndexReader().close();
+        }
+        searchers.clear();
+
+        // Close directories
+        for (Directory directory : directories) {
+            directory.close();
+        }
+        directories.clear();
+
+        // Clean up temp directories
+        for (java.nio.file.Path tempDir : tempDirs) {
+            deleteDirectory(tempDir);
+        }
+        tempDirs.clear();
+    }
+
+    private GlobalIndexResult search(Query query, int k) {
+        RoaringNavigableMap64 roaringBitmap64 = new RoaringNavigableMap64();
+        for (IndexSearcher searcher : searchers) {
+            try {
+                // Execute search
+                TopDocs topDocs = searcher.search(query, k);
+                StoredFields storedFields = searcher.storedFields();
+                Set<String> fieldsToLoad = Set.of(VectorIndex.ROW_ID_FIELD);
+                // Collect row IDs from results
+                for (org.apache.lucene.search.ScoreDoc scoreDoc : 
topDocs.scoreDocs) {
+                    float rawScore = scoreDoc.score;
+                    Document doc = storedFields.document(scoreDoc.doc, 
fieldsToLoad);
+                    long rowId = 
doc.getField(VectorIndex.ROW_ID_FIELD).numericValue().longValue();
+                    roaringBitmap64.add(rowId);
+                }
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to search vector index", e);
+            }
+        }
+
+        return GlobalIndexResult.create(() -> roaringBitmap64);
+    }
+
+    private void loadIndices(GlobalIndexFileReader fileReader, 
List<GlobalIndexIOMeta> files)
+            throws IOException {
+        for (GlobalIndexIOMeta meta : files) {
+            try (SeekableInputStream in = 
fileReader.getInputStream(meta.fileName())) {
+                byte[] indexBytes = new byte[(int) meta.fileSize()];

Review Comment:
   Inefficient buffer allocation: The entire index file is loaded into memory 
at once using `new byte[(int) meta.fileSize()]`. For large vector indices, this 
could lead to OutOfMemoryError. Consider streaming the data or reading in 
chunks to reduce memory pressure.



##########
paimon-vector/src/main/java/org/apache/paimon/vector/ByteVectorIndex.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector;
+
+import org.apache.lucene.document.KnnByteVectorField;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.index.VectorSimilarityFunction;
+
+/** Vector index for byte vector. */
+public class ByteVectorIndex extends VectorIndex {
+    private final long rowId;
+    private final byte[] vector;
+
+    public ByteVectorIndex(long rowId, byte[] vector) {
+        this.rowId = rowId;
+        this.vector = vector;
+    }
+

Review Comment:
   Missing `@Override` annotation: The `rowId()` method overrides a method from 
the parent class `VectorIndex` but is missing the `@Override` annotation. This 
should be added for consistency with the `indexableField()` method at line 39 
and with `FloatVectorIndex.java`.
   ```suggestion
   
       @Override
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to