Copilot commented on code in PR #6716:
URL: https://github.com/apache/paimon/pull/6716#discussion_r2596884664


##########
paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexWriter.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector;
+
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.Range;
+
+import org.apache.lucene.codecs.KnnVectorsFormat;
+import org.apache.lucene.codecs.lucene912.Lucene912Codec;
+import 
org.apache.lucene.codecs.lucene99.Lucene99HnswScalarQuantizedVectorsFormat;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * Vector global index writer using Apache Lucene 9.x.
+ *
+ * <p>This implementation uses Lucene's native KnnFloatVectorField with HNSW 
algorithm for efficient
+ * approximate nearest neighbor search.
+ */
+public class VectorGlobalIndexWriter implements GlobalIndexWriter {
+
+    private final GlobalIndexFileWriter fileWriter;
+    private final VectorIndexOptions vectorOptions;
+    private final VectorSimilarityFunction similarityFunction;
+    private final int sizePerIndex;
+
+    private final List<VectorIndex> vectors;
+
+    public VectorGlobalIndexWriter(
+            GlobalIndexFileWriter fileWriter, DataType fieldType, Options 
options) {
+        checkArgument(
+                fieldType instanceof ArrayType,
+                "Vector field type must be ARRAY, but was: " + fieldType);
+        this.fileWriter = fileWriter;
+        this.vectors = new ArrayList<>();
+        this.vectorOptions = new VectorIndexOptions(options);
+        this.similarityFunction = parseMetricToLucene(vectorOptions.metric());
+        this.sizePerIndex = vectorOptions.sizePerIndex();
+    }
+
+    @Override
+    public void write(Object key) {
+        if (key instanceof FloatVectorIndex) {
+            FloatVectorIndex vectorKey = (FloatVectorIndex) key;
+            float[] vector = vectorKey.vector();
+
+            checkArgument(
+                    vector.length == vectorOptions.dimension(),
+                    "Vector dimension mismatch: expected "
+                            + vectorOptions.dimension()
+                            + ", but got "
+                            + vector.length);
+
+            vectors.add(vectorKey);
+        } else if (key instanceof ByteVectorIndex) {
+            ByteVectorIndex vectorKey = (ByteVectorIndex) key;
+            byte[] byteVector = vectorKey.vector();
+
+            checkArgument(
+                    byteVector.length == vectorOptions.dimension(),
+                    "Vector dimension mismatch: expected "
+                            + vectorOptions.dimension()
+                            + ", but got "
+                            + byteVector.length);
+
+            vectors.add(vectorKey);
+        } else {
+            throw new IllegalArgumentException(
+                    "Unsupported index type: " + key.getClass().getName());
+        }
+    }
+
+    @Override
+    public List<ResultEntry> finish() {
+        try {
+            if (vectors.isEmpty()) {
+                return new ArrayList<>();
+            }
+
+            List<ResultEntry> results = new ArrayList<>();
+
+            // Split vectors into batches if size exceeds sizePerIndex
+            int totalVectors = vectors.size();
+            int numBatches = (int) Math.ceil((double) totalVectors / 
sizePerIndex);
+
+            for (int batchIndex = 0; batchIndex < numBatches; batchIndex++) {
+                int startIdx = batchIndex * sizePerIndex;
+                int endIdx = Math.min(startIdx + sizePerIndex, totalVectors);
+                List<VectorIndex> batchVectors = vectors.subList(startIdx, 
endIdx);
+
+                String fileName = 
fileWriter.newFileName(VectorGlobalIndexerFactory.IDENTIFIER);
+                try (OutputStream out = fileWriter.newOutputStream(fileName)) {
+                    buildIndex(
+                            batchVectors,
+                            this.vectorOptions.m(),
+                            this.vectorOptions.efConstruction(),
+                            this.vectorOptions.writeBufferSize(),
+                            out);
+                }
+                long minRowIdInBatch = batchVectors.get(0).rowId();
+                long maxRowIdInBatch = batchVectors.get(batchVectors.size() - 
1).rowId();

Review Comment:
   The code assumes that vectors are added in sorted order by rowId when 
calculating min/max row IDs from the first and last elements of `batchVectors`. 
If vectors are not sorted by rowId, this will produce incorrect row ranges in 
the `ResultEntry`. Consider either documenting this requirement or computing 
the actual min/max by iterating through the batch.
   ```suggestion
                   long minRowIdInBatch = Long.MAX_VALUE;
                   long maxRowIdInBatch = Long.MIN_VALUE;
                   for (VectorIndex v : batchVectors) {
                       long rowId = v.rowId();
                       if (rowId < minRowIdInBatch) minRowIdInBatch = rowId;
                       if (rowId > maxRowIdInBatch) maxRowIdInBatch = rowId;
                   }
   ```



##########
paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexReader.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector;
+
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.StoredFields;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.KnnByteVectorQuery;
+import org.apache.lucene.search.KnnFloatVectorQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.IndexOutput;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * Vector global index reader using Apache Lucene 9.x.
+ *
+ * <p>This implementation uses Lucene's native KnnFloatVectorQuery with HNSW 
graph for efficient
+ * approximate nearest neighbor search.
+ */
+public class VectorGlobalIndexReader implements GlobalIndexReader {
+
+    private static final int BUFFER_SIZE = 8192; // 8KB buffer for streaming
+
+    private final List<IndexSearcher> searchers;
+    private final List<IndexMMapDirectory> directories;
+
+    public VectorGlobalIndexReader(GlobalIndexFileReader fileReader, 
List<GlobalIndexIOMeta> files)
+            throws IOException {
+        this.searchers = new ArrayList<>();
+        this.directories = new ArrayList<>();
+        loadIndices(fileReader, files);
+    }
+
+    /**
+     * Search for similar vectors using Lucene KNN search.
+     *
+     * @param query query vector
+     * @param k number of results
+     * @return global index result containing row IDs
+     */
+    public GlobalIndexResult search(float[] query, int k) {
+        KnnFloatVectorQuery knnQuery = new 
KnnFloatVectorQuery(VectorIndex.VECTOR_FIELD, query, k);
+        return search(knnQuery, k);
+    }
+
+    public GlobalIndexResult search(byte[] query, int k) {
+        KnnByteVectorQuery knnQuery = new 
KnnByteVectorQuery(VectorIndex.VECTOR_FIELD, query, k);
+        return search(knnQuery, k);
+    }
+
+    @Override
+    public void close() throws IOException {
+        // Close readers
+        for (IndexSearcher searcher : searchers) {
+            searcher.getIndexReader().close();
+        }
+        searchers.clear();
+
+        // Close directories
+        for (IndexMMapDirectory directory : directories) {
+            try {
+                directory.close();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+        directories.clear();
+    }
+
+    private GlobalIndexResult search(Query query, int k) {
+        PriorityQueue<ScoredRow> topK =
+                new PriorityQueue<>(Comparator.comparingDouble(sr -> 
sr.score));
+        for (IndexSearcher searcher : searchers) {
+            try {
+                TopDocs topDocs = searcher.search(query, k);
+                StoredFields storedFields = searcher.storedFields();
+                Set<String> fieldsToLoad = Set.of(VectorIndex.ROW_ID_FIELD);
+                for (org.apache.lucene.search.ScoreDoc scoreDoc : 
topDocs.scoreDocs) {
+                    Document doc = storedFields.document(scoreDoc.doc, 
fieldsToLoad);
+                    long rowId = 
doc.getField(VectorIndex.ROW_ID_FIELD).numericValue().longValue();
+                    if (topK.size() < k) {
+                        topK.offer(new ScoredRow(rowId, scoreDoc.score));
+                    } else {
+                        if (topK.peek() != null && scoreDoc.score > 
topK.peek().score) {
+                            topK.poll();
+                            topK.offer(new ScoredRow(rowId, scoreDoc.score));
+                        }
+                    }
+                }
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to search vector index", e);
+            }
+        }
+        RoaringNavigableMap64 roaringBitmap64 = new RoaringNavigableMap64();
+        for (ScoredRow scoredRow : topK) {
+            roaringBitmap64.add(scoredRow.rowId);
+        }
+        return GlobalIndexResult.create(() -> roaringBitmap64);
+    }
+
+    /** Helper class to store row ID with its score. */
+    private static class ScoredRow {
+        final long rowId;
+        final float score;
+
+        ScoredRow(long rowId, float score) {
+            this.rowId = rowId;
+            this.score = score;
+        }
+    }
+
+    private void loadIndices(GlobalIndexFileReader fileReader, 
List<GlobalIndexIOMeta> files)
+            throws IOException {
+        for (GlobalIndexIOMeta meta : files) {
+            try (SeekableInputStream in = 
fileReader.getInputStream(meta.fileName())) {
+                IndexMMapDirectory directory = null;
+                IndexReader reader = null;
+                boolean success = false;
+                try {
+                    directory = deserializeDirectory(in);
+                    reader = DirectoryReader.open(directory.directory());
+                    IndexSearcher searcher = new IndexSearcher(reader);
+                    directories.add(directory);
+                    searchers.add(searcher);
+                    success = true;
+                } finally {
+                    if (!success) {
+                        if (reader != null) {
+                            try {
+                                reader.close();
+                            } catch (IOException e) {
+                            }
+                        }
+                        if (directory != null) {
+                            try {
+                                directory.close();
+                            } catch (Exception e) {
+                                throw new IOException("Failed to close 
directory", e);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private IndexMMapDirectory deserializeDirectory(SeekableInputStream in) 
throws IOException {
+        IndexMMapDirectory indexMMapDirectory = new IndexMMapDirectory();
+
+        // Read number of files
+        int numFiles = readInt(in);
+
+        // Reusable buffer for streaming
+        byte[] buffer = new byte[BUFFER_SIZE];
+
+        for (int i = 0; i < numFiles; i++) {
+            // Read file name
+            int nameLength = readInt(in);
+            byte[] nameBytes = new byte[nameLength];
+            readFully(in, nameBytes);
+            String fileName = new String(nameBytes, StandardCharsets.UTF_8);
+
+            // Read file content length
+            long fileLength = readLong(in);
+
+            // Stream file content directly to directory
+            try (IndexOutput output = 
indexMMapDirectory.directory().createOutput(fileName, null)) {
+                long remaining = fileLength;
+                while (remaining > 0) {
+                    int toRead = (int) Math.min(buffer.length, remaining);
+                    readFully(in, buffer, 0, toRead);
+                    output.writeBytes(buffer, 0, toRead);
+                    remaining -= toRead;
+                }
+            }
+        }
+
+        return indexMMapDirectory;

Review Comment:
   The `deserializeDirectory` method creates an `IndexMMapDirectory` (including 
a temp directory on disk) but doesn't handle cleanup if an exception occurs 
during deserialization. If an error occurs at lines 186-211, the temporary 
directory and its resources will be leaked. Consider wrapping the 
deserialization logic in a try-catch block and closing the directory on failure.
   ```suggestion
           boolean success = false;
           try {
               // Read number of files
               int numFiles = readInt(in);
   
               // Reusable buffer for streaming
               byte[] buffer = new byte[BUFFER_SIZE];
   
               for (int i = 0; i < numFiles; i++) {
                   // Read file name
                   int nameLength = readInt(in);
                   byte[] nameBytes = new byte[nameLength];
                   readFully(in, nameBytes);
                   String fileName = new String(nameBytes, 
StandardCharsets.UTF_8);
   
                   // Read file content length
                   long fileLength = readLong(in);
   
                   // Stream file content directly to directory
                   try (IndexOutput output = 
indexMMapDirectory.directory().createOutput(fileName, null)) {
                       long remaining = fileLength;
                       while (remaining > 0) {
                           int toRead = (int) Math.min(buffer.length, 
remaining);
                           readFully(in, buffer, 0, toRead);
                           output.writeBytes(buffer, 0, toRead);
                           remaining -= toRead;
                       }
                   }
               }
               success = true;
               return indexMMapDirectory;
           } catch (Exception e) {
               try {
                   indexMMapDirectory.close();
               } catch (Exception closeEx) {
                   // Suppress to not mask the original exception
               }
               if (e instanceof IOException) {
                   throw (IOException) e;
               } else {
                   throw new IOException("Failed to deserialize directory", e);
               }
           }
   ```



##########
paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexWriter.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector;
+
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.Range;
+
+import org.apache.lucene.codecs.KnnVectorsFormat;
+import org.apache.lucene.codecs.lucene912.Lucene912Codec;
+import 
org.apache.lucene.codecs.lucene99.Lucene99HnswScalarQuantizedVectorsFormat;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * Vector global index writer using Apache Lucene 9.x.
+ *
+ * <p>This implementation uses Lucene's native KnnFloatVectorField with HNSW 
algorithm for efficient
+ * approximate nearest neighbor search.
+ */
+public class VectorGlobalIndexWriter implements GlobalIndexWriter {
+
+    private final GlobalIndexFileWriter fileWriter;
+    private final VectorIndexOptions vectorOptions;
+    private final VectorSimilarityFunction similarityFunction;
+    private final int sizePerIndex;
+
+    private final List<VectorIndex> vectors;
+
+    public VectorGlobalIndexWriter(
+            GlobalIndexFileWriter fileWriter, DataType fieldType, Options 
options) {
+        checkArgument(
+                fieldType instanceof ArrayType,
+                "Vector field type must be ARRAY, but was: " + fieldType);
+        this.fileWriter = fileWriter;
+        this.vectors = new ArrayList<>();
+        this.vectorOptions = new VectorIndexOptions(options);
+        this.similarityFunction = parseMetricToLucene(vectorOptions.metric());
+        this.sizePerIndex = vectorOptions.sizePerIndex();
+    }
+
+    @Override
+    public void write(Object key) {
+        if (key instanceof FloatVectorIndex) {
+            FloatVectorIndex vectorKey = (FloatVectorIndex) key;
+            float[] vector = vectorKey.vector();
+
+            checkArgument(
+                    vector.length == vectorOptions.dimension(),
+                    "Vector dimension mismatch: expected "
+                            + vectorOptions.dimension()
+                            + ", but got "
+                            + vector.length);
+
+            vectors.add(vectorKey);
+        } else if (key instanceof ByteVectorIndex) {
+            ByteVectorIndex vectorKey = (ByteVectorIndex) key;
+            byte[] byteVector = vectorKey.vector();
+
+            checkArgument(
+                    byteVector.length == vectorOptions.dimension(),
+                    "Vector dimension mismatch: expected "
+                            + vectorOptions.dimension()
+                            + ", but got "
+                            + byteVector.length);
+
+            vectors.add(vectorKey);
+        } else {
+            throw new IllegalArgumentException(
+                    "Unsupported index type: " + key.getClass().getName());
+        }
+    }
+
+    @Override
+    public List<ResultEntry> finish() {
+        try {
+            if (vectors.isEmpty()) {
+                return new ArrayList<>();
+            }
+
+            List<ResultEntry> results = new ArrayList<>();
+
+            // Split vectors into batches if size exceeds sizePerIndex
+            int totalVectors = vectors.size();
+            int numBatches = (int) Math.ceil((double) totalVectors / 
sizePerIndex);
+
+            for (int batchIndex = 0; batchIndex < numBatches; batchIndex++) {
+                int startIdx = batchIndex * sizePerIndex;
+                int endIdx = Math.min(startIdx + sizePerIndex, totalVectors);
+                List<VectorIndex> batchVectors = vectors.subList(startIdx, 
endIdx);
+
+                String fileName = 
fileWriter.newFileName(VectorGlobalIndexerFactory.IDENTIFIER);
+                try (OutputStream out = fileWriter.newOutputStream(fileName)) {
+                    buildIndex(
+                            batchVectors,
+                            this.vectorOptions.m(),
+                            this.vectorOptions.efConstruction(),
+                            this.vectorOptions.writeBufferSize(),
+                            out);
+                }
+                long minRowIdInBatch = batchVectors.get(0).rowId();
+                long maxRowIdInBatch = batchVectors.get(batchVectors.size() - 
1).rowId();
+                results.add(
+                        ResultEntry.of(
+                                fileName, null, new Range(minRowIdInBatch, 
maxRowIdInBatch)));
+            }
+
+            return results;
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to write vector global index", 
e);
+        }
+    }
+
+    private VectorSimilarityFunction parseMetricToLucene(String metric) {
+        switch (metric.toUpperCase()) {
+            case "COSINE":
+                return VectorSimilarityFunction.COSINE;
+            case "DOT_PRODUCT":
+                return VectorSimilarityFunction.DOT_PRODUCT;
+            case "EUCLIDEAN":
+                return VectorSimilarityFunction.EUCLIDEAN;
+            case "MAX_INNER_PRODUCT":
+                return VectorSimilarityFunction.MAXIMUM_INNER_PRODUCT;
+            default:
+                throw new IllegalArgumentException("Unsupported metric: " + 
metric);

Review Comment:
   The `parseMetricToLucene` method manually maps string values to Lucene 
similarity functions using a switch statement. This duplicates the logic 
already encapsulated in the `VectorMetric` enum. Consider refactoring to use 
`VectorMetric.valueOf(metric.toUpperCase()).toLuceneFunction()` to centralize 
this mapping logic and improve maintainability.
   ```suggestion
           try {
               return 
VectorMetric.valueOf(metric.toUpperCase()).toLuceneFunction();
           } catch (IllegalArgumentException e) {
               throw new IllegalArgumentException("Unsupported metric: " + 
metric, e);
   ```



##########
paimon-vector/src/test/java/org/apache/paimon/vector/VectorGlobalIndexTest.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.FloatType;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link VectorGlobalIndexWriter} and {@link 
VectorGlobalIndexReader}. */
+public class VectorGlobalIndexTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private FileIO fileIO;
+    private Path indexPath;
+    private DataType vectorType;
+
+    @BeforeEach
+    public void setup() {
+        fileIO = new LocalFileIO();
+        indexPath = new Path(tempDir.toString());
+        vectorType = new ArrayType(new FloatType());
+    }
+
+    @AfterEach
+    public void cleanup() throws IOException {
+        if (fileIO != null) {
+            fileIO.delete(indexPath, true);
+        }
+    }
+
+    private GlobalIndexFileWriter createFileWriter(Path path) {
+        return new GlobalIndexFileWriter() {
+            @Override
+            public String newFileName(String prefix) {
+                return prefix + "-" + UUID.randomUUID();
+            }
+
+            @Override
+            public OutputStream newOutputStream(String fileName) throws 
IOException {
+                return fileIO.newOutputStream(new Path(path, fileName), false);
+            }
+        };
+    }
+
+    private GlobalIndexFileReader createFileReader(Path path) {
+        return fileName -> fileIO.newInputStream(new Path(path, fileName));
+    }
+
+    @Test
+    public void testDifferentSimilarityFunctions() throws IOException {
+        int dimension = 32;
+        int numVectors = 20;
+
+        String[] metrics = {"COSINE", "DOT_PRODUCT", "EUCLIDEAN"};
+
+        for (String metric : metrics) {
+            Options options = createDefaultOptions(dimension);
+            options.setString("vector.metric", metric);
+
+            Path metricIndexPath = new Path(indexPath, metric.toLowerCase());
+            GlobalIndexFileWriter fileWriter = 
createFileWriter(metricIndexPath);
+            VectorGlobalIndexWriter writer =
+                    new VectorGlobalIndexWriter(fileWriter, vectorType, 
options);
+
+            List<float[]> testVectors = generateRandomVectors(numVectors, 
dimension);
+            for (int i = 0; i < numVectors; i++) {
+                writer.write(new FloatVectorIndex(i, testVectors.get(i)));
+            }
+
+            List<GlobalIndexWriter.ResultEntry> results = writer.finish();
+            assertThat(results).hasSize(1);
+
+            GlobalIndexWriter.ResultEntry result = results.get(0);
+            GlobalIndexFileReader fileReader = 
createFileReader(metricIndexPath);
+            List<GlobalIndexIOMeta> metas = new ArrayList<>();
+            metas.add(
+                    new GlobalIndexIOMeta(
+                            result.fileName(),
+                            fileIO.getFileSize(new Path(metricIndexPath, 
result.fileName())),
+                            result.rowRange(),
+                            result.meta()));
+
+            VectorGlobalIndexReader reader = new 
VectorGlobalIndexReader(fileReader, metas);
+
+            // Verify search works with this metric
+            GlobalIndexResult searchResult = reader.search(testVectors.get(0), 
3);
+            assertThat(searchResult).isNotNull();
+
+            reader.close();

Review Comment:
   The `VectorGlobalIndexReader` is created but not closed in this test, 
potentially leaking resources (IndexSearcher, IndexReader, and 
IndexMMapDirectory with temp files). Consider using try-with-resources or 
adding `reader.close()` in a finally block to ensure proper cleanup.
   ```suggestion
               try (VectorGlobalIndexReader reader = new 
VectorGlobalIndexReader(fileReader, metas)) {
   
                   // Verify search works with this metric
                   GlobalIndexResult searchResult = 
reader.search(testVectors.get(0), 3);
                   assertThat(searchResult).isNotNull();
               }
   ```



##########
paimon-vector/src/test/java/org/apache/paimon/vector/VectorGlobalIndexTest.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.FloatType;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link VectorGlobalIndexWriter} and {@link 
VectorGlobalIndexReader}. */
+public class VectorGlobalIndexTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private FileIO fileIO;
+    private Path indexPath;
+    private DataType vectorType;
+
+    @BeforeEach
+    public void setup() {
+        fileIO = new LocalFileIO();
+        indexPath = new Path(tempDir.toString());
+        vectorType = new ArrayType(new FloatType());
+    }
+
+    @AfterEach
+    public void cleanup() throws IOException {
+        if (fileIO != null) {
+            fileIO.delete(indexPath, true);
+        }
+    }
+
+    private GlobalIndexFileWriter createFileWriter(Path path) {
+        return new GlobalIndexFileWriter() {
+            @Override
+            public String newFileName(String prefix) {
+                return prefix + "-" + UUID.randomUUID();
+            }
+
+            @Override
+            public OutputStream newOutputStream(String fileName) throws 
IOException {
+                return fileIO.newOutputStream(new Path(path, fileName), false);
+            }
+        };
+    }
+
+    private GlobalIndexFileReader createFileReader(Path path) {
+        return fileName -> fileIO.newInputStream(new Path(path, fileName));
+    }
+
+    @Test
+    public void testDifferentSimilarityFunctions() throws IOException {
+        int dimension = 32;
+        int numVectors = 20;
+
+        String[] metrics = {"COSINE", "DOT_PRODUCT", "EUCLIDEAN"};
+
+        for (String metric : metrics) {
+            Options options = createDefaultOptions(dimension);
+            options.setString("vector.metric", metric);
+
+            Path metricIndexPath = new Path(indexPath, metric.toLowerCase());
+            GlobalIndexFileWriter fileWriter = 
createFileWriter(metricIndexPath);
+            VectorGlobalIndexWriter writer =
+                    new VectorGlobalIndexWriter(fileWriter, vectorType, 
options);
+
+            List<float[]> testVectors = generateRandomVectors(numVectors, 
dimension);
+            for (int i = 0; i < numVectors; i++) {
+                writer.write(new FloatVectorIndex(i, testVectors.get(i)));
+            }
+
+            List<GlobalIndexWriter.ResultEntry> results = writer.finish();
+            assertThat(results).hasSize(1);
+
+            GlobalIndexWriter.ResultEntry result = results.get(0);
+            GlobalIndexFileReader fileReader = 
createFileReader(metricIndexPath);
+            List<GlobalIndexIOMeta> metas = new ArrayList<>();
+            metas.add(
+                    new GlobalIndexIOMeta(
+                            result.fileName(),
+                            fileIO.getFileSize(new Path(metricIndexPath, 
result.fileName())),
+                            result.rowRange(),
+                            result.meta()));
+
+            VectorGlobalIndexReader reader = new 
VectorGlobalIndexReader(fileReader, metas);
+
+            // Verify search works with this metric
+            GlobalIndexResult searchResult = reader.search(testVectors.get(0), 
3);
+            assertThat(searchResult).isNotNull();
+
+            reader.close();
+        }
+    }
+
+    @Test
+    public void testDifferentDimensions() throws IOException {
+        int[] dimensions = {8, 32, 128, 256};
+
+        for (int dimension : dimensions) {
+            Options options = createDefaultOptions(dimension);
+
+            Path dimIndexPath = new Path(indexPath, "dim_" + dimension);
+            GlobalIndexFileWriter fileWriter = createFileWriter(dimIndexPath);
+            VectorGlobalIndexWriter writer =
+                    new VectorGlobalIndexWriter(fileWriter, vectorType, 
options);
+
+            int numVectors = 10;
+            List<float[]> testVectors = generateRandomVectors(numVectors, 
dimension);
+            for (int i = 0; i < numVectors; i++) {
+                writer.write(new FloatVectorIndex(i, testVectors.get(i)));
+            }
+
+            List<GlobalIndexWriter.ResultEntry> results = writer.finish();
+            assertThat(results).hasSize(1);
+
+            GlobalIndexWriter.ResultEntry result = results.get(0);
+            GlobalIndexFileReader fileReader = createFileReader(dimIndexPath);
+            List<GlobalIndexIOMeta> metas = new ArrayList<>();
+            metas.add(
+                    new GlobalIndexIOMeta(
+                            result.fileName(),
+                            fileIO.getFileSize(new Path(dimIndexPath, 
result.fileName())),
+                            result.rowRange(),
+                            result.meta()));
+
+            VectorGlobalIndexReader reader = new 
VectorGlobalIndexReader(fileReader, metas);
+
+            // Verify search works with this dimension
+            GlobalIndexResult searchResult = reader.search(testVectors.get(0), 
5);
+            assertThat(searchResult).isNotNull();
+
+            reader.close();

Review Comment:
   The `VectorGlobalIndexReader` is created but not closed in this test, 
potentially leaking resources (IndexSearcher, IndexReader, and 
IndexMMapDirectory with temp files). Consider using try-with-resources or 
adding `reader.close()` in a finally block to ensure proper cleanup.
   ```suggestion
               try (VectorGlobalIndexReader reader = new 
VectorGlobalIndexReader(fileReader, metas)) {
   
                   // Verify search works with this dimension
                   GlobalIndexResult searchResult = 
reader.search(testVectors.get(0), 5);
                   assertThat(searchResult).isNotNull();
               }
   ```



##########
paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexWriter.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector;
+
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.Range;
+
+import org.apache.lucene.codecs.KnnVectorsFormat;
+import org.apache.lucene.codecs.lucene912.Lucene912Codec;
+import 
org.apache.lucene.codecs.lucene99.Lucene99HnswScalarQuantizedVectorsFormat;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * Vector global index writer using Apache Lucene 9.x.
+ *
+ * <p>This implementation uses Lucene's native KnnFloatVectorField with HNSW 
algorithm for efficient
+ * approximate nearest neighbor search.
+ */
+public class VectorGlobalIndexWriter implements GlobalIndexWriter {
+
+    private final GlobalIndexFileWriter fileWriter;
+    private final VectorIndexOptions vectorOptions;
+    private final VectorSimilarityFunction similarityFunction;
+    private final int sizePerIndex;
+
+    private final List<VectorIndex> vectors;
+
+    public VectorGlobalIndexWriter(
+            GlobalIndexFileWriter fileWriter, DataType fieldType, Options 
options) {
+        checkArgument(
+                fieldType instanceof ArrayType,
+                "Vector field type must be ARRAY, but was: " + fieldType);
+        this.fileWriter = fileWriter;
+        this.vectors = new ArrayList<>();
+        this.vectorOptions = new VectorIndexOptions(options);
+        this.similarityFunction = parseMetricToLucene(vectorOptions.metric());
+        this.sizePerIndex = vectorOptions.sizePerIndex();
+    }
+
+    @Override
+    public void write(Object key) {
+        if (key instanceof FloatVectorIndex) {
+            FloatVectorIndex vectorKey = (FloatVectorIndex) key;
+            float[] vector = vectorKey.vector();
+
+            checkArgument(
+                    vector.length == vectorOptions.dimension(),
+                    "Vector dimension mismatch: expected "
+                            + vectorOptions.dimension()
+                            + ", but got "
+                            + vector.length);
+
+            vectors.add(vectorKey);
+        } else if (key instanceof ByteVectorIndex) {
+            ByteVectorIndex vectorKey = (ByteVectorIndex) key;
+            byte[] byteVector = vectorKey.vector();
+
+            checkArgument(
+                    byteVector.length == vectorOptions.dimension(),
+                    "Vector dimension mismatch: expected "
+                            + vectorOptions.dimension()
+                            + ", but got "
+                            + byteVector.length);
+
+            vectors.add(vectorKey);
+        } else {
+            throw new IllegalArgumentException(
+                    "Unsupported index type: " + key.getClass().getName());
+        }
+    }
+
+    @Override
+    public List<ResultEntry> finish() {
+        try {
+            if (vectors.isEmpty()) {
+                return new ArrayList<>();
+            }
+
+            List<ResultEntry> results = new ArrayList<>();
+
+            // Split vectors into batches if size exceeds sizePerIndex
+            int totalVectors = vectors.size();
+            int numBatches = (int) Math.ceil((double) totalVectors / 
sizePerIndex);
+
+            for (int batchIndex = 0; batchIndex < numBatches; batchIndex++) {
+                int startIdx = batchIndex * sizePerIndex;
+                int endIdx = Math.min(startIdx + sizePerIndex, totalVectors);
+                List<VectorIndex> batchVectors = vectors.subList(startIdx, 
endIdx);
+
+                String fileName = 
fileWriter.newFileName(VectorGlobalIndexerFactory.IDENTIFIER);
+                try (OutputStream out = fileWriter.newOutputStream(fileName)) {
+                    buildIndex(
+                            batchVectors,
+                            this.vectorOptions.m(),
+                            this.vectorOptions.efConstruction(),
+                            this.vectorOptions.writeBufferSize(),
+                            out);
+                }
+                long minRowIdInBatch = batchVectors.get(0).rowId();
+                long maxRowIdInBatch = batchVectors.get(batchVectors.size() - 
1).rowId();
+                results.add(
+                        ResultEntry.of(
+                                fileName, null, new Range(minRowIdInBatch, 
maxRowIdInBatch)));
+            }
+
+            return results;
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to write vector global index", 
e);
+        }
+    }
+
+    private VectorSimilarityFunction parseMetricToLucene(String metric) {
+        switch (metric.toUpperCase()) {
+            case "COSINE":
+                return VectorSimilarityFunction.COSINE;
+            case "DOT_PRODUCT":
+                return VectorSimilarityFunction.DOT_PRODUCT;
+            case "EUCLIDEAN":
+                return VectorSimilarityFunction.EUCLIDEAN;
+            case "MAX_INNER_PRODUCT":
+                return VectorSimilarityFunction.MAXIMUM_INNER_PRODUCT;
+            default:
+                throw new IllegalArgumentException("Unsupported metric: " + 
metric);
+        }
+    }
+
+    private void buildIndex(
+            List<VectorIndex> batchVectors,
+            int m,
+            int efConstruction,
+            int writeBufferSize,
+            OutputStream out)
+            throws IOException {
+
+        IndexWriterConfig config = getIndexWriterConfig(m, efConstruction, 
writeBufferSize);
+        try (IndexMMapDirectory indexMMapDirectory = new IndexMMapDirectory()) 
{
+            try (IndexWriter writer = new 
IndexWriter(indexMMapDirectory.directory(), config)) {
+                for (VectorIndex vectorIndex : batchVectors) {
+                    Document doc = new Document();
+                    doc.add(vectorIndex.indexableField(similarityFunction));
+                    doc.add(vectorIndex.rowIdStoredField());
+                    writer.addDocument(doc);
+                }
+                writer.commit();
+            }
+
+            serializeDirectory(indexMMapDirectory.directory(), out);
+        } catch (Exception e) {
+            throw new RuntimeException(e);

Review Comment:
   The exception is wrapped in a RuntimeException with no additional context 
about what operation failed. Consider providing a more descriptive error 
message like "Failed to build vector index" to help with debugging.
   ```suggestion
               throw new RuntimeException("Failed to build vector index", e);
   ```



##########
paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexReader.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector;
+
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.StoredFields;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.KnnByteVectorQuery;
+import org.apache.lucene.search.KnnFloatVectorQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.IndexOutput;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * Vector global index reader using Apache Lucene 9.x.
+ *
+ * <p>This implementation uses Lucene's native KnnFloatVectorQuery with HNSW 
graph for efficient
+ * approximate nearest neighbor search.
+ */
+public class VectorGlobalIndexReader implements GlobalIndexReader {
+
+    private static final int BUFFER_SIZE = 8192; // 8KB buffer for streaming
+
+    private final List<IndexSearcher> searchers;
+    private final List<IndexMMapDirectory> directories;
+
+    public VectorGlobalIndexReader(GlobalIndexFileReader fileReader, 
List<GlobalIndexIOMeta> files)
+            throws IOException {
+        this.searchers = new ArrayList<>();
+        this.directories = new ArrayList<>();
+        loadIndices(fileReader, files);
+    }
+
+    /**
+     * Search for similar vectors using Lucene KNN search.
+     *
+     * @param query query vector
+     * @param k number of results
+     * @return global index result containing row IDs
+     */
+    public GlobalIndexResult search(float[] query, int k) {
+        KnnFloatVectorQuery knnQuery = new 
KnnFloatVectorQuery(VectorIndex.VECTOR_FIELD, query, k);
+        return search(knnQuery, k);
+    }
+
+    public GlobalIndexResult search(byte[] query, int k) {
+        KnnByteVectorQuery knnQuery = new 
KnnByteVectorQuery(VectorIndex.VECTOR_FIELD, query, k);
+        return search(knnQuery, k);
+    }
+
+    @Override
+    public void close() throws IOException {
+        // Close readers
+        for (IndexSearcher searcher : searchers) {
+            searcher.getIndexReader().close();
+        }
+        searchers.clear();
+
+        // Close directories
+        for (IndexMMapDirectory directory : directories) {
+            try {
+                directory.close();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+        directories.clear();
+    }
+
+    private GlobalIndexResult search(Query query, int k) {
+        PriorityQueue<ScoredRow> topK =
+                new PriorityQueue<>(Comparator.comparingDouble(sr -> 
sr.score));
+        for (IndexSearcher searcher : searchers) {
+            try {
+                TopDocs topDocs = searcher.search(query, k);
+                StoredFields storedFields = searcher.storedFields();
+                Set<String> fieldsToLoad = Set.of(VectorIndex.ROW_ID_FIELD);
+                for (org.apache.lucene.search.ScoreDoc scoreDoc : 
topDocs.scoreDocs) {
+                    Document doc = storedFields.document(scoreDoc.doc, 
fieldsToLoad);
+                    long rowId = 
doc.getField(VectorIndex.ROW_ID_FIELD).numericValue().longValue();
+                    if (topK.size() < k) {
+                        topK.offer(new ScoredRow(rowId, scoreDoc.score));
+                    } else {
+                        if (topK.peek() != null && scoreDoc.score > 
topK.peek().score) {
+                            topK.poll();
+                            topK.offer(new ScoredRow(rowId, scoreDoc.score));
+                        }
+                    }
+                }
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to search vector index", e);
+            }
+        }
+        RoaringNavigableMap64 roaringBitmap64 = new RoaringNavigableMap64();
+        for (ScoredRow scoredRow : topK) {
+            roaringBitmap64.add(scoredRow.rowId);
+        }
+        return GlobalIndexResult.create(() -> roaringBitmap64);
+    }
+
+    /** Helper class to store row ID with its score. */
+    private static class ScoredRow {
+        final long rowId;
+        final float score;
+
+        ScoredRow(long rowId, float score) {
+            this.rowId = rowId;
+            this.score = score;
+        }
+    }
+
+    private void loadIndices(GlobalIndexFileReader fileReader, 
List<GlobalIndexIOMeta> files)
+            throws IOException {
+        for (GlobalIndexIOMeta meta : files) {
+            try (SeekableInputStream in = 
fileReader.getInputStream(meta.fileName())) {
+                IndexMMapDirectory directory = null;
+                IndexReader reader = null;
+                boolean success = false;
+                try {
+                    directory = deserializeDirectory(in);
+                    reader = DirectoryReader.open(directory.directory());
+                    IndexSearcher searcher = new IndexSearcher(reader);
+                    directories.add(directory);
+                    searchers.add(searcher);
+                    success = true;
+                } finally {
+                    if (!success) {
+                        if (reader != null) {
+                            try {
+                                reader.close();
+                            } catch (IOException e) {
+                            }
+                        }
+                        if (directory != null) {
+                            try {
+                                directory.close();
+                            } catch (Exception e) {
+                                throw new IOException("Failed to close 
directory", e);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private IndexMMapDirectory deserializeDirectory(SeekableInputStream in) 
throws IOException {
+        IndexMMapDirectory indexMMapDirectory = new IndexMMapDirectory();
+
+        // Read number of files
+        int numFiles = readInt(in);
+
+        // Reusable buffer for streaming
+        byte[] buffer = new byte[BUFFER_SIZE];
+
+        for (int i = 0; i < numFiles; i++) {
+            // Read file name
+            int nameLength = readInt(in);
+            byte[] nameBytes = new byte[nameLength];
+            readFully(in, nameBytes);
+            String fileName = new String(nameBytes, StandardCharsets.UTF_8);
+
+            // Read file content length
+            long fileLength = readLong(in);
+
+            // Stream file content directly to directory
+            try (IndexOutput output = 
indexMMapDirectory.directory().createOutput(fileName, null)) {
+                long remaining = fileLength;
+                while (remaining > 0) {
+                    int toRead = (int) Math.min(buffer.length, remaining);
+                    readFully(in, buffer, 0, toRead);
+                    output.writeBytes(buffer, 0, toRead);
+                    remaining -= toRead;
+                }
+            }
+        }
+
+        return indexMMapDirectory;
+    }
+
+    private int readInt(SeekableInputStream in) throws IOException {
+        byte[] bytes = new byte[4];
+        readFully(in, bytes);
+        return ByteBuffer.wrap(bytes).getInt();
+    }
+
+    private long readLong(SeekableInputStream in) throws IOException {
+        byte[] bytes = new byte[8];
+        readFully(in, bytes);
+        return ByteBuffer.wrap(bytes).getLong();
+    }
+
+    private void readFully(SeekableInputStream in, byte[] buffer) throws 
IOException {
+        readFully(in, buffer, 0, buffer.length);
+    }
+
+    private void readFully(SeekableInputStream in, byte[] buffer, int offset, 
int length)
+            throws IOException {
+        int totalRead = 0;
+        while (totalRead < length) {
+            int read = in.read(buffer, offset + totalRead, length - totalRead);
+            if (read == -1) {
+                throw new IOException("Unexpected end of stream");
+            }
+            totalRead += read;
+        }
+    }
+
+    private void deleteDirectory(java.nio.file.Path path) throws IOException {
+        if (java.nio.file.Files.exists(path)) {
+            java.nio.file.Files.walk(path)
+                    .sorted(java.util.Comparator.reverseOrder())
+                    .forEach(
+                            p -> {
+                                try {
+                                    java.nio.file.Files.delete(p);
+                                } catch (IOException e) {
+                                    // Ignore cleanup errors
+                                }
+                            });
+        }
+    }
+

Review Comment:
   The `deleteDirectory` method is defined but never called anywhere in the 
class. This appears to be dead code that should be removed to improve code 
maintainability.
   ```suggestion
   
   ```



##########
paimon-vector/src/main/java/org/apache/paimon/vector/IndexMMapDirectory.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector;
+
+import org.apache.lucene.store.MMapDirectory;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.UUID;
+
+/** A wrapper of MMapDirectory for vector index. */
+public class IndexMMapDirectory implements AutoCloseable {
+    private final Path path;
+    private final MMapDirectory mmapDirectory;
+
+    public IndexMMapDirectory() throws IOException {
+        this.path =
+                java.nio.file.Files.createTempDirectory("paimon-vector-index-" 
+ UUID.randomUUID());
+        this.mmapDirectory = new MMapDirectory(path);
+    }
+
+    public MMapDirectory directory() {
+        return mmapDirectory;
+    }
+
+    public void close() throws Exception {
+        mmapDirectory.close();
+        if (java.nio.file.Files.exists(path)) {
+            java.nio.file.Files.walk(path)
+                    .sorted(java.util.Comparator.reverseOrder())
+                    .forEach(
+                            p -> {
+                                try {
+                                    java.nio.file.Files.delete(p);
+                                } catch (IOException e) {
+                                    // Ignore cleanup errors
+                                }

Review Comment:
   The IOException during cleanup is silently ignored. While cleanup errors can 
sometimes be acceptable to ignore, consider logging this exception at least at 
the DEBUG level to aid in troubleshooting potential file system issues.



##########
paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexReader.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector;
+
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.StoredFields;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.KnnByteVectorQuery;
+import org.apache.lucene.search.KnnFloatVectorQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.IndexOutput;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * Vector global index reader using Apache Lucene 9.x.
+ *
+ * <p>This implementation uses Lucene's native KnnFloatVectorQuery with HNSW 
graph for efficient
+ * approximate nearest neighbor search.
+ */
+public class VectorGlobalIndexReader implements GlobalIndexReader {
+
+    private static final int BUFFER_SIZE = 8192; // 8KB buffer for streaming
+
+    private final List<IndexSearcher> searchers;
+    private final List<IndexMMapDirectory> directories;
+
+    public VectorGlobalIndexReader(GlobalIndexFileReader fileReader, 
List<GlobalIndexIOMeta> files)
+            throws IOException {
+        this.searchers = new ArrayList<>();
+        this.directories = new ArrayList<>();
+        loadIndices(fileReader, files);
+    }
+
+    /**
+     * Search for similar vectors using Lucene KNN search.
+     *
+     * @param query query vector
+     * @param k number of results
+     * @return global index result containing row IDs
+     */
+    public GlobalIndexResult search(float[] query, int k) {
+        KnnFloatVectorQuery knnQuery = new 
KnnFloatVectorQuery(VectorIndex.VECTOR_FIELD, query, k);
+        return search(knnQuery, k);
+    }
+
+    public GlobalIndexResult search(byte[] query, int k) {
+        KnnByteVectorQuery knnQuery = new 
KnnByteVectorQuery(VectorIndex.VECTOR_FIELD, query, k);
+        return search(knnQuery, k);
+    }
+
+    @Override
+    public void close() throws IOException {
+        // Close readers
+        for (IndexSearcher searcher : searchers) {
+            searcher.getIndexReader().close();
+        }
+        searchers.clear();
+
+        // Close directories
+        for (IndexMMapDirectory directory : directories) {
+            try {
+                directory.close();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+        directories.clear();
+    }
+
+    private GlobalIndexResult search(Query query, int k) {
+        PriorityQueue<ScoredRow> topK =
+                new PriorityQueue<>(Comparator.comparingDouble(sr -> 
sr.score));
+        for (IndexSearcher searcher : searchers) {
+            try {
+                TopDocs topDocs = searcher.search(query, k);
+                StoredFields storedFields = searcher.storedFields();
+                Set<String> fieldsToLoad = Set.of(VectorIndex.ROW_ID_FIELD);
+                for (org.apache.lucene.search.ScoreDoc scoreDoc : 
topDocs.scoreDocs) {
+                    Document doc = storedFields.document(scoreDoc.doc, 
fieldsToLoad);
+                    long rowId = 
doc.getField(VectorIndex.ROW_ID_FIELD).numericValue().longValue();
+                    if (topK.size() < k) {
+                        topK.offer(new ScoredRow(rowId, scoreDoc.score));
+                    } else {
+                        if (topK.peek() != null && scoreDoc.score > 
topK.peek().score) {
+                            topK.poll();
+                            topK.offer(new ScoredRow(rowId, scoreDoc.score));
+                        }
+                    }
+                }
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to search vector index", e);
+            }
+        }
+        RoaringNavigableMap64 roaringBitmap64 = new RoaringNavigableMap64();
+        for (ScoredRow scoredRow : topK) {
+            roaringBitmap64.add(scoredRow.rowId);
+        }
+        return GlobalIndexResult.create(() -> roaringBitmap64);
+    }
+
+    /** Helper class to store row ID with its score. */
+    private static class ScoredRow {
+        final long rowId;
+        final float score;
+
+        ScoredRow(long rowId, float score) {
+            this.rowId = rowId;
+            this.score = score;
+        }
+    }
+
+    private void loadIndices(GlobalIndexFileReader fileReader, 
List<GlobalIndexIOMeta> files)
+            throws IOException {
+        for (GlobalIndexIOMeta meta : files) {
+            try (SeekableInputStream in = 
fileReader.getInputStream(meta.fileName())) {
+                IndexMMapDirectory directory = null;
+                IndexReader reader = null;
+                boolean success = false;
+                try {
+                    directory = deserializeDirectory(in);
+                    reader = DirectoryReader.open(directory.directory());
+                    IndexSearcher searcher = new IndexSearcher(reader);
+                    directories.add(directory);
+                    searchers.add(searcher);
+                    success = true;
+                } finally {
+                    if (!success) {
+                        if (reader != null) {
+                            try {
+                                reader.close();
+                            } catch (IOException e) {
+                            }

Review Comment:
   The exception caught during reader.close() is silently ignored without any 
logging. This could mask important errors during cleanup. Consider logging this 
exception at least at the DEBUG or WARN level to aid in troubleshooting.



##########
paimon-vector/src/main/java/org/apache/paimon/vector/VectorIndexOptions.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector;
+
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+
+/** Options for vector index. */
+public class VectorIndexOptions {
+
+    public static final ConfigOption<Integer> VECTOR_DIM =
+            ConfigOptions.key("vector.dim")
+                    .intType()
+                    .defaultValue(128)
+                    .withDescription("The dimension of the vector");
+
+    public static final ConfigOption<String> VECTOR_METRIC =
+            ConfigOptions.key("vector.metric")
+                    .stringType()
+                    .defaultValue("EUCLIDEAN")
+                    .withDescription(
+                            "The similarity metric for vector search (COSINE, 
DOT_PRODUCT, EUCLIDEAN, MAX_INNER_PRODUCT), and EUCLIDEAN is the default");
+
+    public static final ConfigOption<Integer> VECTOR_M =
+            ConfigOptions.key("vector.m")
+                    .intType()
+                    .defaultValue(16)
+                    .withDescription(
+                            "The maximum number of connections for each 
element during the index construction");
+
+    public static final ConfigOption<Integer> VECTOR_EF_CONSTRUCTION =
+            ConfigOptions.key("vector.ef-construction")
+                    .intType()
+                    .defaultValue(100)
+                    .withDescription(
+                            "The size of the dynamic candidate list during the 
index construction");
+
+    public static final ConfigOption<Integer> VECTOR_SIZE_PER_INDEX =
+            ConfigOptions.key("vector.size-per-index")
+                    .intType()
+                    .defaultValue(10000)
+                    .withDescription("The size of vectors stored in each 
vector index file");
+
+    public static final ConfigOption<Integer> VECTOR_WRITE_BUFFER_SIZE =
+            ConfigOptions.key("vector.write-buffer-size")
+                    .intType()
+                    .defaultValue(256)
+                    .withDescription("write buffer size in MB for vector 
index");

Review Comment:
   The description starts with a lowercase letter and is inconsistent with 
other descriptions in the file. Consider changing "write buffer size" to "Write 
buffer size" for consistency.
   ```suggestion
                       .withDescription("Write buffer size in MB for vector 
index");
   ```



##########
paimon-vector/src/main/java/org/apache/paimon/vector/VectorIndexOptions.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector;
+
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+
+/** Options for vector index. */
+public class VectorIndexOptions {
+
+    public static final ConfigOption<Integer> VECTOR_DIM =
+            ConfigOptions.key("vector.dim")
+                    .intType()
+                    .defaultValue(128)
+                    .withDescription("The dimension of the vector");
+
+    public static final ConfigOption<String> VECTOR_METRIC =
+            ConfigOptions.key("vector.metric")
+                    .stringType()
+                    .defaultValue("EUCLIDEAN")
+                    .withDescription(
+                            "The similarity metric for vector search (COSINE, 
DOT_PRODUCT, EUCLIDEAN, MAX_INNER_PRODUCT), and EUCLIDEAN is the default");
+
+    public static final ConfigOption<Integer> VECTOR_M =
+            ConfigOptions.key("vector.m")
+                    .intType()
+                    .defaultValue(16)
+                    .withDescription(
+                            "The maximum number of connections for each 
element during the index construction");
+
+    public static final ConfigOption<Integer> VECTOR_EF_CONSTRUCTION =
+            ConfigOptions.key("vector.ef-construction")
+                    .intType()
+                    .defaultValue(100)
+                    .withDescription(
+                            "The size of the dynamic candidate list during the 
index construction");
+
+    public static final ConfigOption<Integer> VECTOR_SIZE_PER_INDEX =
+            ConfigOptions.key("vector.size-per-index")
+                    .intType()
+                    .defaultValue(10000)
+                    .withDescription("The size of vectors stored in each 
vector index file");
+
+    public static final ConfigOption<Integer> VECTOR_WRITE_BUFFER_SIZE =
+            ConfigOptions.key("vector.write-buffer-size")
+                    .intType()
+                    .defaultValue(256)
+                    .withDescription("write buffer size in MB for vector 
index");
+
+    private final int dimension;
+    private final String metric;
+    private final int m;
+    private final int efConstruction;
+    private final int sizePerIndex;
+    private final int writeBufferSize;
+
+    public VectorIndexOptions(Options options) {
+        this.dimension = options.get(VECTOR_DIM);
+        this.metric = options.get(VECTOR_METRIC);
+        this.m = options.get(VECTOR_M);
+        this.efConstruction = options.get(VECTOR_EF_CONSTRUCTION);
+        this.sizePerIndex =
+                options.get(VECTOR_SIZE_PER_INDEX) > 0
+                        ? options.get(VECTOR_SIZE_PER_INDEX)
+                        : VECTOR_SIZE_PER_INDEX.defaultValue();
+        this.writeBufferSize = options.get(VECTOR_WRITE_BUFFER_SIZE);
+    }
+

Review Comment:
   The constructor does not validate critical parameters. Consider adding 
validation for: (1) dimension > 0, (2) metric is one of the supported values 
(COSINE, DOT_PRODUCT, EUCLIDEAN, MAX_INNER_PRODUCT), (3) m > 0, (4) 
efConstruction > 0, and (5) writeBufferSize > 0. Invalid values could cause 
cryptic errors later during index building.
   ```suggestion
           this.dimension = options.get(VECTOR_DIM);
           if (this.dimension <= 0) {
               throw new IllegalArgumentException("Vector dimension must be 
greater than 0, but was: " + this.dimension);
           }
           this.metric = options.get(VECTOR_METRIC);
           if (!isSupportedMetric(this.metric)) {
               throw new IllegalArgumentException(
                   "Vector metric must be one of COSINE, DOT_PRODUCT, 
EUCLIDEAN, MAX_INNER_PRODUCT, but was: " + this.metric);
           }
           this.m = options.get(VECTOR_M);
           if (this.m <= 0) {
               throw new IllegalArgumentException("Vector m must be greater 
than 0, but was: " + this.m);
           }
           this.efConstruction = options.get(VECTOR_EF_CONSTRUCTION);
           if (this.efConstruction <= 0) {
               throw new IllegalArgumentException("Vector efConstruction must 
be greater than 0, but was: " + this.efConstruction);
           }
           this.sizePerIndex =
                   options.get(VECTOR_SIZE_PER_INDEX) > 0
                           ? options.get(VECTOR_SIZE_PER_INDEX)
                           : VECTOR_SIZE_PER_INDEX.defaultValue();
           this.writeBufferSize = options.get(VECTOR_WRITE_BUFFER_SIZE);
           if (this.writeBufferSize <= 0) {
               throw new IllegalArgumentException("Vector writeBufferSize must 
be greater than 0, but was: " + this.writeBufferSize);
           }
       }
   
       private static boolean isSupportedMetric(String metric) {
           return "COSINE".equals(metric)
                   || "DOT_PRODUCT".equals(metric)
                   || "EUCLIDEAN".equals(metric)
                   || "MAX_INNER_PRODUCT".equals(metric);
       }
   ```



##########
paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexReader.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector;
+
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.StoredFields;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.KnnByteVectorQuery;
+import org.apache.lucene.search.KnnFloatVectorQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.IndexOutput;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * Vector global index reader using Apache Lucene 9.x.
+ *
+ * <p>This implementation uses Lucene's native KnnFloatVectorQuery with HNSW 
graph for efficient
+ * approximate nearest neighbor search.
+ */
+public class VectorGlobalIndexReader implements GlobalIndexReader {
+
+    private static final int BUFFER_SIZE = 8192; // 8KB buffer for streaming
+
+    private final List<IndexSearcher> searchers;
+    private final List<IndexMMapDirectory> directories;
+
+    public VectorGlobalIndexReader(GlobalIndexFileReader fileReader, 
List<GlobalIndexIOMeta> files)
+            throws IOException {
+        this.searchers = new ArrayList<>();
+        this.directories = new ArrayList<>();
+        loadIndices(fileReader, files);
+    }
+
+    /**
+     * Search for similar vectors using Lucene KNN search.
+     *
+     * @param query query vector
+     * @param k number of results
+     * @return global index result containing row IDs
+     */
+    public GlobalIndexResult search(float[] query, int k) {
+        KnnFloatVectorQuery knnQuery = new 
KnnFloatVectorQuery(VectorIndex.VECTOR_FIELD, query, k);
+        return search(knnQuery, k);
+    }
+
+    public GlobalIndexResult search(byte[] query, int k) {
+        KnnByteVectorQuery knnQuery = new 
KnnByteVectorQuery(VectorIndex.VECTOR_FIELD, query, k);
+        return search(knnQuery, k);
+    }
+
+    @Override
+    public void close() throws IOException {
+        // Close readers
+        for (IndexSearcher searcher : searchers) {
+            searcher.getIndexReader().close();
+        }
+        searchers.clear();
+
+        // Close directories
+        for (IndexMMapDirectory directory : directories) {
+            try {
+                directory.close();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+        directories.clear();
+    }

Review Comment:
   The close() method doesn't handle partial cleanup correctly. If closing one 
IndexReader throws an IOException, the remaining readers and directories won't 
be closed, leading to resource leaks. Consider collecting exceptions and 
ensuring all resources are closed before rethrowing, similar to Java 9's 
addSuppressed pattern.



##########
.github/workflows/utitcase-paimon-vector.yml:
##########
@@ -0,0 +1,63 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+name: UTCase and ITCase Paimon Vector
+
+on:
+  push:
+  pull_request:
+    paths:
+        - 'paimon-vector/**'
+        - 'pom.xml'
+
+env:
+  JDK_VERSION: 11
+  MAVEN_OPTS: -Dmaven.wagon.httpconnectionManager.ttlSeconds=30 
-Dmaven.wagon.http.retryHandler.requestSentEnabled=true
+
+concurrency:
+  group: ${{ github.workflow }}-${{ github.event_name }}-${{ 
github.event.number || github.run_id }}
+  cancel-in-progress: true
+
+jobs:
+  build_test:
+    runs-on: ubuntu-latest
+
+    steps:
+      - name: Checkout code
+        uses: actions/checkout@v4
+
+      - name: Set up JDK ${{ env.JDK_VERSION }}
+        uses: actions/setup-java@v4
+        with:
+          java-version: ${{ env.JDK_VERSION }}
+          distribution: 'temurin'
+
+      - name: Build Paimon Vector
+        run:  mvn -T 2C -B clean install -DskipTests -Ppaimon-vector
+
+      - name: Test Paimon Vector
+        run: |
+          . .github/workflows/utils.sh
+          jvm_timezone=$(random_timezone)
+          echo "JVM timezone is set to $jvm_timezone"
+          TEST_MODULE="org.apache.paimon:paimon-vector" 

Review Comment:
   There's trailing whitespace after 
`TEST_MODULE="org.apache.paimon:paimon-vector"`. Consider removing it for 
cleaner code.
   ```suggestion
             TEST_MODULE="org.apache.paimon:paimon-vector"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to