Copilot commented on code in PR #6716: URL: https://github.com/apache/paimon/pull/6716#discussion_r2587959780
########## paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexReader.java: ########## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexReader; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.utils.RoaringNavigableMap64; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.StoredFields; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.KnnByteVectorQuery; +import org.apache.lucene.search.KnnFloatVectorQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.MMapDirectory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +/** + * Vector global index reader using Apache Lucene 9.x. + * + * <p>This implementation uses Lucene's native KnnFloatVectorQuery with HNSW graph for efficient + * approximate nearest neighbor search. + */ +public class VectorGlobalIndexReader implements GlobalIndexReader { + + private final List<IndexSearcher> searchers; + private final List<Directory> directories; + private final List<java.nio.file.Path> tempDirs; + + public VectorGlobalIndexReader(GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files) + throws IOException { + this.searchers = new ArrayList<>(); + this.directories = new ArrayList<>(); + this.tempDirs = new ArrayList<>(); + loadIndices(fileReader, files); + } + + /** + * Search for similar vectors using Lucene KNN search. + * + * @param query query vector + * @param k number of results + * @return global index result containing row IDs + */ + public GlobalIndexResult search(float[] query, int k) { + KnnFloatVectorQuery knnQuery = new KnnFloatVectorQuery(VectorIndex.VECTOR_FIELD, query, k); + return search(knnQuery, k); + } + + public GlobalIndexResult search(byte[] query, int k) { + KnnByteVectorQuery knnQuery = new KnnByteVectorQuery(VectorIndex.VECTOR_FIELD, query, k); + return search(knnQuery, k); + } + + @Override + public void close() throws IOException { + // Close readers + for (IndexSearcher searcher : searchers) { + searcher.getIndexReader().close(); + } + searchers.clear(); + + // Close directories + for (Directory directory : directories) { + directory.close(); + } + directories.clear(); + + // Clean up temp directories + for (java.nio.file.Path tempDir : tempDirs) { + deleteDirectory(tempDir); + } + tempDirs.clear(); + } + + private GlobalIndexResult search(Query query, int k) { + RoaringNavigableMap64 roaringBitmap64 = new RoaringNavigableMap64(); + for (IndexSearcher searcher : searchers) { + try { + // Execute search + TopDocs topDocs = searcher.search(query, k); + StoredFields storedFields = searcher.storedFields(); + Set<String> fieldsToLoad = Set.of(VectorIndex.ROW_ID_FIELD); + // Collect row IDs from results + for (org.apache.lucene.search.ScoreDoc scoreDoc : topDocs.scoreDocs) { + float rawScore = scoreDoc.score; Review Comment: Unused variable `rawScore`. The score is retrieved from `scoreDoc.score` but never used. If the score is not needed for functionality, this line should be removed to avoid confusion. ```suggestion ``` ########## paimon-vector/src/test/java/org/apache/paimon/vector/VectorGlobalIndexTest.java: ########## @@ -0,0 +1,747 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.GlobalIndexWriter; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.utils.Range; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link VectorGlobalIndexWriter} and {@link VectorGlobalIndexReader}. */ +public class VectorGlobalIndexTest { + + @TempDir java.nio.file.Path tempDir; + + private FileIO fileIO; + private Path indexPath; + private DataType vectorType; + + @BeforeEach + public void setup() { + fileIO = new LocalFileIO(); + indexPath = new Path(tempDir.toString()); + vectorType = new ArrayType(new FloatType()); + } + + @AfterEach + public void cleanup() throws IOException { + if (fileIO != null) { + fileIO.delete(indexPath, true); + } + } + + private GlobalIndexFileWriter createFileWriter(Path path) { + return new GlobalIndexFileWriter() { + @Override + public String newFileName(String prefix) { + return prefix + "-" + UUID.randomUUID(); + } + + @Override + public OutputStream newOutputStream(String fileName) throws IOException { + return fileIO.newOutputStream(new Path(path, fileName), false); + } + }; + } + + private GlobalIndexFileReader createFileReader(Path path) { + return fileName -> fileIO.newInputStream(new Path(path, fileName)); + } + + @Test + public void testWriteAndReadVectorIndex() throws IOException { + Options options = createDefaultOptions(); + int dimension = 128; + int numVectors = 100; + + // Create writer and write vectors + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + assertThat(result.fileName()).isNotNull(); + assertThat(result.rowRange()).isEqualTo(new Range(0, numVectors - 1)); + + // Create reader and verify + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Test search - query with first vector should return itself + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 5); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + + @Test + public void testSearchSimilarVectors() throws IOException { + Options options = createDefaultOptions(); + int dimension = 64; + options.setInteger("vector.dim", dimension); // Set correct dimension + int numVectors = 50; + + // Create vectors with known similarities + float[] baseVector = new float[dimension]; + for (int i = 0; i < dimension; i++) { + baseVector[i] = 1.0f; + } + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + // Write base vector and similar variants + writer.write(new FloatVectorIndex(0, baseVector)); + + for (int i = 1; i < numVectors; i++) { + float[] variant = baseVector.clone(); + // Add small noise + variant[i % dimension] += (i % 2 == 0 ? 0.1f : -0.1f); + writer.write(new FloatVectorIndex(i, variant)); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Search with base vector should return similar vectors + GlobalIndexResult searchResult = reader.search(baseVector, 10); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + + @Test + public void testDifferentSimilarityFunctions() throws IOException { + int dimension = 32; + int numVectors = 20; + + String[] metrics = {"COSINE", "DOT_PRODUCT", "EUCLIDEAN"}; + + for (String metric : metrics) { + Options options = createDefaultOptions(); + options.setString("vector.metric", metric); + options.setInteger("vector.dim", dimension); // Set correct dimension + + Path metricIndexPath = new Path(indexPath, metric.toLowerCase()); + GlobalIndexFileWriter fileWriter = createFileWriter(metricIndexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(metricIndexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(metricIndexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Verify search works with this metric + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 3); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + } + + @Test + public void testDifferentDimensions() throws IOException { + int[] dimensions = {8, 32, 128, 256}; + + for (int dimension : dimensions) { + Options options = createDefaultOptions(); + options.setInteger("vector.dim", dimension); + + Path dimIndexPath = new Path(indexPath, "dim_" + dimension); + GlobalIndexFileWriter fileWriter = createFileWriter(dimIndexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + int numVectors = 10; + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(dimIndexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(dimIndexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Verify search works with this dimension + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 5); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + } + + @Test + public void testDimensionMismatch() throws IOException { + Options options = createDefaultOptions(); + options.setInteger("vector.dim", 64); + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + // Try to write vector with wrong dimension + float[] wrongDimVector = new float[32]; // Wrong dimension + assertThatThrownBy(() -> writer.write(new FloatVectorIndex(0, wrongDimVector))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("dimension mismatch"); + } + + @Test + public void testEmptyIndex() throws IOException { + Options options = createDefaultOptions(); + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).isEmpty(); + } + + @Test + public void testHNSWParameters() throws IOException { + Options options = createDefaultOptions(); + options.setInteger("vector.m", 32); // Larger M for better recall + options.setInteger("vector.ef-construction", 200); // Larger ef for better quality + + int dimension = 64; + options.setInteger("vector.dim", dimension); // Set correct dimension + int numVectors = 50; + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Verify search works with custom HNSW parameters + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 5); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + + @Test + public void testLargeK() throws IOException { + Options options = createDefaultOptions(); + int dimension = 32; + options.setInteger("vector.dim", dimension); // Set correct dimension + int numVectors = 100; + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Search with k larger than number of vectors + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 200); + assertThat(searchResult).isNotNull(); + assertThat(searchResult).isNotNull(); Review Comment: Duplicate assertion: Line 373 repeats the same assertion as line 372 (`assertThat(searchResult).isNotNull()`). This redundant check should be removed. ```suggestion ``` ########## paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexWriter.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.paimon.globalindex.GlobalIndexWriter; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.utils.Range; + +import org.apache.lucene.codecs.KnnVectorsFormat; +import org.apache.lucene.codecs.lucene912.Lucene912Codec; +import org.apache.lucene.codecs.lucene99.Lucene99HnswScalarQuantizedVectorsFormat; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.VectorSimilarityFunction; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MMapDirectory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * Vector global index writer using Apache Lucene 9.x. + * + * <p>This implementation uses Lucene's native KnnFloatVectorField with HNSW algorithm for efficient + * approximate nearest neighbor search. + */ +public class VectorGlobalIndexWriter implements GlobalIndexWriter { + + private final GlobalIndexFileWriter fileWriter; + private final DataType fieldType; + private final VectorIndexOptions vectorOptions; + private final VectorSimilarityFunction similarityFunction; + private final int sizePerIndex; + + private final List<VectorIndex> vectors; + + public VectorGlobalIndexWriter( + GlobalIndexFileWriter fileWriter, DataType fieldType, Options options) { + checkArgument( + fieldType instanceof ArrayType, + "Vector field type must be ARRAY, but was: " + fieldType); + this.fileWriter = fileWriter; + this.fieldType = fieldType; + this.vectors = new ArrayList<>(); + this.vectorOptions = new VectorIndexOptions(options); + this.similarityFunction = parseMetricToLucene(vectorOptions.metric()); + this.sizePerIndex = vectorOptions.sizePerIndex(); + } + + @Override + public void write(Object key) { + if (key instanceof FloatVectorIndex) { + FloatVectorIndex vectorKey = (FloatVectorIndex) key; + float[] vector = vectorKey.vector(); + + checkArgument( + vector.length == vectorOptions.dimension(), + "Vector dimension mismatch: expected " + + vectorOptions.dimension() + + ", but got " + + vector.length); + + vectors.add(vectorKey); + } else if (key instanceof ByteVectorIndex) { + ByteVectorIndex vectorKey = (ByteVectorIndex) key; + byte[] byteVector = vectorKey.vector(); + + checkArgument( + byteVector.length == vectorOptions.dimension(), + "Vector dimension mismatch: expected " + + vectorOptions.dimension() + + ", but got " + + byteVector.length); + + vectors.add(vectorKey); + } else { + throw new IllegalArgumentException( + "Unsupported index type: " + key.getClass().getName()); + } + } + + @Override + public List<ResultEntry> finish() { + try { + if (vectors.isEmpty()) { + return new ArrayList<>(); + } + + List<ResultEntry> results = new ArrayList<>(); + + // Split vectors into batches if size exceeds sizePerIndex + int totalVectors = vectors.size(); + int numBatches = (int) Math.ceil((double) totalVectors / sizePerIndex); + + for (int batchIndex = 0; batchIndex < numBatches; batchIndex++) { + int startIdx = batchIndex * sizePerIndex; + int endIdx = Math.min(startIdx + sizePerIndex, totalVectors); + List<VectorIndex> batchVectors = vectors.subList(startIdx, endIdx); + + // Build index + byte[] indexBytes = + buildIndex( + batchVectors, + this.vectorOptions.m(), + this.vectorOptions.efConstruction()); + + // Create metadata + VectorIndexMetadata metadata = + new VectorIndexMetadata( + vectorOptions.dimension(), + vectorOptions.metric(), + vectorOptions.m(), + vectorOptions.efConstruction()); + byte[] metaBytes = VectorIndexMetadata.serializeMetadata(metadata); + + // Write to file + String fileName = fileWriter.newFileName(VectorGlobalIndexerFactory.IDENTIFIER); + try (OutputStream out = fileWriter.newOutputStream(fileName)) { + out.write(indexBytes); + } + long minRowIdInBatch = batchVectors.get(0).rowId(); + long maxRowIdInBatch = batchVectors.get(batchVectors.size() - 1).rowId(); + results.add( + ResultEntry.of( + fileName, metaBytes, new Range(minRowIdInBatch, maxRowIdInBatch))); + } + + return results; + } catch (IOException e) { + throw new RuntimeException("Failed to write vector global index", e); + } + } + + private VectorSimilarityFunction parseMetricToLucene(String metric) { + switch (metric.toUpperCase()) { + case "COSINE": + return VectorSimilarityFunction.COSINE; + case "DOT_PRODUCT": + return VectorSimilarityFunction.DOT_PRODUCT; + case "EUCLIDEAN": + return VectorSimilarityFunction.EUCLIDEAN; + case "MAX_INNER_PRODUCT": + return VectorSimilarityFunction.MAXIMUM_INNER_PRODUCT; + default: + throw new IllegalArgumentException("Unsupported metric: " + metric); + } + } + + private byte[] buildIndex(List<VectorIndex> batchVectors, int m, int efConstruction) + throws IOException { + // Create temporary directory for MMap + java.nio.file.Path tempDir = + java.nio.file.Files.createTempDirectory( + fileWriter.newFileName("paimon-vector-index")); + Directory directory = new MMapDirectory(tempDir); + + try { + // Configure index writer + IndexWriterConfig config = getIndexWriterConfig(m, efConstruction); + + try (IndexWriter writer = new IndexWriter(directory, config)) { + // Add each vector as a document + for (VectorIndex vectorIndex : batchVectors) { + Document doc = new Document(); + + // Add KNN vector field + doc.add(vectorIndex.indexableField(similarityFunction)); + + // Store row ID + doc.add(vectorIndex.rowIdStoredField()); + + writer.addDocument(doc); + } + + // Commit changes + writer.commit(); + } + + // Serialize directory to byte array + return serializeDirectory(directory); + } finally { + // Clean up + directory.close(); + deleteDirectory(tempDir); + } + } + + private static IndexWriterConfig getIndexWriterConfig(int m, int efConstruction) { + IndexWriterConfig config = new IndexWriterConfig(); + config.setRAMBufferSizeMB(256); // Increase buffer for better performance + config.setCodec( + new Lucene912Codec(Lucene912Codec.Mode.BEST_SPEED) { + @Override + public KnnVectorsFormat getKnnVectorsFormatForField(String field) { + return new Lucene99HnswScalarQuantizedVectorsFormat(m, efConstruction); + } + }); + return config; + } + + private void deleteDirectory(java.nio.file.Path path) throws IOException { + if (java.nio.file.Files.exists(path)) { + java.nio.file.Files.walk(path) + .sorted(java.util.Comparator.reverseOrder()) + .forEach( + p -> { + try { + java.nio.file.Files.delete(p); + } catch (IOException e) { + // Ignore cleanup errors + } Review Comment: Silently ignoring cleanup errors: IOException during directory cleanup is caught and ignored without logging. This could hide issues like permission problems or disk errors. Consider logging the exception at least at debug level to aid troubleshooting. ########## paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexReader.java: ########## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexReader; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.utils.RoaringNavigableMap64; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.StoredFields; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.KnnByteVectorQuery; +import org.apache.lucene.search.KnnFloatVectorQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.MMapDirectory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +/** + * Vector global index reader using Apache Lucene 9.x. + * + * <p>This implementation uses Lucene's native KnnFloatVectorQuery with HNSW graph for efficient + * approximate nearest neighbor search. + */ +public class VectorGlobalIndexReader implements GlobalIndexReader { + + private final List<IndexSearcher> searchers; + private final List<Directory> directories; + private final List<java.nio.file.Path> tempDirs; + + public VectorGlobalIndexReader(GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files) + throws IOException { + this.searchers = new ArrayList<>(); + this.directories = new ArrayList<>(); + this.tempDirs = new ArrayList<>(); + loadIndices(fileReader, files); + } + + /** + * Search for similar vectors using Lucene KNN search. + * + * @param query query vector + * @param k number of results + * @return global index result containing row IDs + */ + public GlobalIndexResult search(float[] query, int k) { + KnnFloatVectorQuery knnQuery = new KnnFloatVectorQuery(VectorIndex.VECTOR_FIELD, query, k); + return search(knnQuery, k); + } + + public GlobalIndexResult search(byte[] query, int k) { + KnnByteVectorQuery knnQuery = new KnnByteVectorQuery(VectorIndex.VECTOR_FIELD, query, k); + return search(knnQuery, k); + } + + @Override + public void close() throws IOException { + // Close readers + for (IndexSearcher searcher : searchers) { + searcher.getIndexReader().close(); + } + searchers.clear(); + + // Close directories + for (Directory directory : directories) { + directory.close(); + } + directories.clear(); + + // Clean up temp directories + for (java.nio.file.Path tempDir : tempDirs) { + deleteDirectory(tempDir); + } + tempDirs.clear(); + } + + private GlobalIndexResult search(Query query, int k) { + RoaringNavigableMap64 roaringBitmap64 = new RoaringNavigableMap64(); + for (IndexSearcher searcher : searchers) { + try { + // Execute search + TopDocs topDocs = searcher.search(query, k); + StoredFields storedFields = searcher.storedFields(); + Set<String> fieldsToLoad = Set.of(VectorIndex.ROW_ID_FIELD); + // Collect row IDs from results + for (org.apache.lucene.search.ScoreDoc scoreDoc : topDocs.scoreDocs) { + float rawScore = scoreDoc.score; + Document doc = storedFields.document(scoreDoc.doc, fieldsToLoad); + long rowId = doc.getField(VectorIndex.ROW_ID_FIELD).numericValue().longValue(); + roaringBitmap64.add(rowId); + } + } catch (IOException e) { + throw new RuntimeException("Failed to search vector index", e); + } + } + + return GlobalIndexResult.create(() -> roaringBitmap64); + } + + private void loadIndices(GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files) + throws IOException { + for (GlobalIndexIOMeta meta : files) { + try (SeekableInputStream in = fileReader.getInputStream(meta.fileName())) { + byte[] indexBytes = new byte[(int) meta.fileSize()]; + int totalRead = 0; + while (totalRead < indexBytes.length) { + int read = in.read(indexBytes, totalRead, indexBytes.length - totalRead); + if (read == -1) { + throw new IOException("Unexpected end of stream"); + } + totalRead += read; + } + + Directory directory = deserializeDirectory(indexBytes); + directories.add(directory); + + IndexReader reader = DirectoryReader.open(directory); + IndexSearcher searcher = new IndexSearcher(reader); + searchers.add(searcher); + } + } + } + + private Directory deserializeDirectory(byte[] data) throws IOException { + // Create temporary directory for MMap + Path tempDir = Files.createTempDirectory("paimon-vector-read" + UUID.randomUUID()); + tempDirs.add(tempDir); + Directory directory = new MMapDirectory(tempDir); + + ByteBuffer buffer = ByteBuffer.wrap(data); + + // Read number of files + int numFiles = buffer.getInt(); + + for (int i = 0; i < numFiles; i++) { + // Read file name + int nameLength = buffer.getInt(); + byte[] nameBytes = new byte[nameLength]; + buffer.get(nameBytes); + String fileName = new String(nameBytes, StandardCharsets.UTF_8); + + // Read file content + long fileLength = buffer.getLong(); + byte[] fileContent = new byte[(int) fileLength]; + buffer.get(fileContent); + + // Write to directory + try (IndexOutput output = directory.createOutput(fileName, null)) { + output.writeBytes(fileContent, 0, fileContent.length); + } + } + + return directory; + } + + private void deleteDirectory(java.nio.file.Path path) throws IOException { + if (java.nio.file.Files.exists(path)) { + java.nio.file.Files.walk(path) + .sorted(java.util.Comparator.reverseOrder()) + .forEach( + p -> { + try { + java.nio.file.Files.delete(p); + } catch (IOException e) { + // Ignore cleanup errors + } Review Comment: Silently ignoring cleanup errors: IOException during directory cleanup is caught and ignored without logging. This could hide issues like permission problems or disk errors. Consider logging the exception at least at debug level to aid troubleshooting. ########## paimon-vector/src/test/java/org/apache/paimon/vector/VectorGlobalIndexTest.java: ########## @@ -0,0 +1,747 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.GlobalIndexWriter; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.utils.Range; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link VectorGlobalIndexWriter} and {@link VectorGlobalIndexReader}. */ +public class VectorGlobalIndexTest { + + @TempDir java.nio.file.Path tempDir; + + private FileIO fileIO; + private Path indexPath; + private DataType vectorType; + + @BeforeEach + public void setup() { + fileIO = new LocalFileIO(); + indexPath = new Path(tempDir.toString()); + vectorType = new ArrayType(new FloatType()); + } + + @AfterEach + public void cleanup() throws IOException { + if (fileIO != null) { + fileIO.delete(indexPath, true); + } + } + + private GlobalIndexFileWriter createFileWriter(Path path) { + return new GlobalIndexFileWriter() { + @Override + public String newFileName(String prefix) { + return prefix + "-" + UUID.randomUUID(); + } + + @Override + public OutputStream newOutputStream(String fileName) throws IOException { + return fileIO.newOutputStream(new Path(path, fileName), false); + } + }; + } + + private GlobalIndexFileReader createFileReader(Path path) { + return fileName -> fileIO.newInputStream(new Path(path, fileName)); + } + + @Test + public void testWriteAndReadVectorIndex() throws IOException { + Options options = createDefaultOptions(); + int dimension = 128; + int numVectors = 100; + + // Create writer and write vectors + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + assertThat(result.fileName()).isNotNull(); + assertThat(result.rowRange()).isEqualTo(new Range(0, numVectors - 1)); + + // Create reader and verify + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Test search - query with first vector should return itself + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 5); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + + @Test + public void testSearchSimilarVectors() throws IOException { + Options options = createDefaultOptions(); + int dimension = 64; + options.setInteger("vector.dim", dimension); // Set correct dimension + int numVectors = 50; + + // Create vectors with known similarities + float[] baseVector = new float[dimension]; + for (int i = 0; i < dimension; i++) { + baseVector[i] = 1.0f; + } + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + // Write base vector and similar variants + writer.write(new FloatVectorIndex(0, baseVector)); + + for (int i = 1; i < numVectors; i++) { + float[] variant = baseVector.clone(); + // Add small noise + variant[i % dimension] += (i % 2 == 0 ? 0.1f : -0.1f); + writer.write(new FloatVectorIndex(i, variant)); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Search with base vector should return similar vectors + GlobalIndexResult searchResult = reader.search(baseVector, 10); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + + @Test + public void testDifferentSimilarityFunctions() throws IOException { + int dimension = 32; + int numVectors = 20; + + String[] metrics = {"COSINE", "DOT_PRODUCT", "EUCLIDEAN"}; + + for (String metric : metrics) { + Options options = createDefaultOptions(); + options.setString("vector.metric", metric); + options.setInteger("vector.dim", dimension); // Set correct dimension + + Path metricIndexPath = new Path(indexPath, metric.toLowerCase()); + GlobalIndexFileWriter fileWriter = createFileWriter(metricIndexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(metricIndexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(metricIndexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Verify search works with this metric + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 3); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + } + + @Test + public void testDifferentDimensions() throws IOException { + int[] dimensions = {8, 32, 128, 256}; + + for (int dimension : dimensions) { + Options options = createDefaultOptions(); + options.setInteger("vector.dim", dimension); + + Path dimIndexPath = new Path(indexPath, "dim_" + dimension); + GlobalIndexFileWriter fileWriter = createFileWriter(dimIndexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + int numVectors = 10; + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(dimIndexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(dimIndexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Verify search works with this dimension + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 5); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + } + + @Test + public void testDimensionMismatch() throws IOException { + Options options = createDefaultOptions(); + options.setInteger("vector.dim", 64); + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + // Try to write vector with wrong dimension + float[] wrongDimVector = new float[32]; // Wrong dimension + assertThatThrownBy(() -> writer.write(new FloatVectorIndex(0, wrongDimVector))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("dimension mismatch"); + } + + @Test + public void testEmptyIndex() throws IOException { + Options options = createDefaultOptions(); + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).isEmpty(); + } + + @Test + public void testHNSWParameters() throws IOException { + Options options = createDefaultOptions(); + options.setInteger("vector.m", 32); // Larger M for better recall + options.setInteger("vector.ef-construction", 200); // Larger ef for better quality + + int dimension = 64; + options.setInteger("vector.dim", dimension); // Set correct dimension + int numVectors = 50; + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Verify search works with custom HNSW parameters + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 5); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + + @Test + public void testLargeK() throws IOException { + Options options = createDefaultOptions(); + int dimension = 32; + options.setInteger("vector.dim", dimension); // Set correct dimension + int numVectors = 100; + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Search with k larger than number of vectors + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 200); + assertThat(searchResult).isNotNull(); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + + @Test + public void testEndToEndMultiShardScenario() throws IOException { + Options options = createDefaultOptions(); + int dimension = 128; + options.setInteger("vector.dim", dimension); + options.setString("vector.metric", "COSINE"); + options.setInteger("vector.m", 24); + options.setInteger("vector.ef-construction", 150); + + // Simulate a multi-shard scenario with incremental writes + int totalVectors = 300; + int shardsCount = 3; + int vectorsPerShard = totalVectors / shardsCount; + + List<GlobalIndexIOMeta> allMetas = new ArrayList<>(); + List<float[]> allTestVectors = generateRandomVectors(totalVectors, dimension); + + // Write vectors in multiple shards (simulating incremental indexing) + for (int shardIdx = 0; shardIdx < shardsCount; shardIdx++) { + Path shardPath = new Path(indexPath, "shard_" + shardIdx); + GlobalIndexFileWriter fileWriter = createFileWriter(shardPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + int startIdx = shardIdx * vectorsPerShard; + int endIdx = Math.min(startIdx + vectorsPerShard, totalVectors); + + // Write vectors for this shard + for (int i = startIdx; i < endIdx; i++) { + writer.write(new FloatVectorIndex(i, allTestVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + allMetas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(shardPath, result.fileName())), + result.rowRange(), + result.meta())); + } + + // Create a single reader for all shards + GlobalIndexFileReader fileReader = + new GlobalIndexFileReader() { + @Override + public org.apache.paimon.fs.SeekableInputStream getInputStream(String fileName) + throws IOException { + // Try to find the file in any shard directory + for (int shardIdx = 0; shardIdx < shardsCount; shardIdx++) { + Path shardPath = new Path(indexPath, "shard_" + shardIdx); + Path filePath = new Path(shardPath, fileName); + if (fileIO.exists(filePath)) { + return fileIO.newInputStream(filePath); + } + } + throw new IOException("File not found: " + fileName); + } + }; + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, allMetas); + + // Test 1: Search with various k values + int[] kValues = {1, 5, 10, 20, 50}; + for (int k : kValues) { + GlobalIndexResult searchResult = reader.search(allTestVectors.get(0), k); + assertThat(searchResult).isNotNull(); + assertThat(containsRowId(searchResult, 0)).isTrue(); + } + + // Test 2: Search with query vectors from different shards + for (int shardIdx = 0; shardIdx < shardsCount; shardIdx++) { + int queryIdx = shardIdx * vectorsPerShard + 5; + GlobalIndexResult searchResult = reader.search(allTestVectors.get(queryIdx), 10); + assertThat(searchResult).isNotNull(); + assertThat(containsRowId(searchResult, queryIdx)) + .as("Search result should contain the query vector's own ID") + .isTrue(); + } + + // Test 3: Create a known similar vector and verify top result + float[] originalVector = allTestVectors.get(50); + float[] slightlyModifiedVector = originalVector.clone(); + // Add minimal noise to create a very similar vector + for (int i = 0; i < dimension; i++) { + slightlyModifiedVector[i] += (i % 2 == 0 ? 0.001f : -0.001f); + } + normalize(slightlyModifiedVector); + + GlobalIndexResult similarSearchResult = reader.search(slightlyModifiedVector, 5); + assertThat(similarSearchResult).isNotNull(); + assertThat(containsRowId(similarSearchResult, 50)) + .as("Search with very similar vector should find the original") + .isTrue(); + + // Test 4: Verify search quality with a synthetic cluster + // Create a cluster center + float[] clusterCenter = new float[dimension]; + Random random = new Random(123); + for (int i = 0; i < dimension; i++) { + clusterCenter[i] = random.nextFloat() * 2 - 1; + } + normalize(clusterCenter); + + GlobalIndexResult clusterSearchResult = reader.search(clusterCenter, 20); + assertThat(clusterSearchResult).isNotNull(); + assertThat(clusterSearchResult.results().iterator().hasNext()).isTrue(); + + // Test 5: Edge case - search with k larger than total vectors + GlobalIndexResult largeKResult = reader.search(allTestVectors.get(10), totalVectors * 2); + assertThat(largeKResult).isNotNull(); + + // Test 6: Multiple consecutive searches (stress test) + for (int i = 0; i < 20; i++) { + int randomIdx = random.nextInt(totalVectors); Review Comment: Using uninitialized `Random` variable. Line 478 creates a `Random` instance with seed 123, but line 494 uses an uninitialized `random` variable (lowercase). This will cause a compilation error. Should use the `random` instance from line 478 or create a new one before this line. ########## paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexReader.java: ########## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexReader; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.utils.RoaringNavigableMap64; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.StoredFields; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.KnnByteVectorQuery; +import org.apache.lucene.search.KnnFloatVectorQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.MMapDirectory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +/** + * Vector global index reader using Apache Lucene 9.x. + * + * <p>This implementation uses Lucene's native KnnFloatVectorQuery with HNSW graph for efficient + * approximate nearest neighbor search. + */ +public class VectorGlobalIndexReader implements GlobalIndexReader { + + private final List<IndexSearcher> searchers; + private final List<Directory> directories; + private final List<java.nio.file.Path> tempDirs; + + public VectorGlobalIndexReader(GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files) + throws IOException { + this.searchers = new ArrayList<>(); + this.directories = new ArrayList<>(); + this.tempDirs = new ArrayList<>(); + loadIndices(fileReader, files); + } + + /** + * Search for similar vectors using Lucene KNN search. + * + * @param query query vector + * @param k number of results + * @return global index result containing row IDs + */ + public GlobalIndexResult search(float[] query, int k) { + KnnFloatVectorQuery knnQuery = new KnnFloatVectorQuery(VectorIndex.VECTOR_FIELD, query, k); + return search(knnQuery, k); + } + + public GlobalIndexResult search(byte[] query, int k) { + KnnByteVectorQuery knnQuery = new KnnByteVectorQuery(VectorIndex.VECTOR_FIELD, query, k); + return search(knnQuery, k); + } + + @Override + public void close() throws IOException { + // Close readers + for (IndexSearcher searcher : searchers) { + searcher.getIndexReader().close(); + } + searchers.clear(); + + // Close directories + for (Directory directory : directories) { + directory.close(); + } + directories.clear(); + + // Clean up temp directories + for (java.nio.file.Path tempDir : tempDirs) { + deleteDirectory(tempDir); + } + tempDirs.clear(); + } + + private GlobalIndexResult search(Query query, int k) { + RoaringNavigableMap64 roaringBitmap64 = new RoaringNavigableMap64(); + for (IndexSearcher searcher : searchers) { + try { + // Execute search + TopDocs topDocs = searcher.search(query, k); + StoredFields storedFields = searcher.storedFields(); + Set<String> fieldsToLoad = Set.of(VectorIndex.ROW_ID_FIELD); + // Collect row IDs from results + for (org.apache.lucene.search.ScoreDoc scoreDoc : topDocs.scoreDocs) { + float rawScore = scoreDoc.score; + Document doc = storedFields.document(scoreDoc.doc, fieldsToLoad); + long rowId = doc.getField(VectorIndex.ROW_ID_FIELD).numericValue().longValue(); + roaringBitmap64.add(rowId); + } + } catch (IOException e) { + throw new RuntimeException("Failed to search vector index", e); + } + } + + return GlobalIndexResult.create(() -> roaringBitmap64); + } + + private void loadIndices(GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files) + throws IOException { + for (GlobalIndexIOMeta meta : files) { + try (SeekableInputStream in = fileReader.getInputStream(meta.fileName())) { + byte[] indexBytes = new byte[(int) meta.fileSize()]; Review Comment: Inefficient buffer allocation: The entire index file is loaded into memory at once using `new byte[(int) meta.fileSize()]`. For large vector indices, this could lead to OutOfMemoryError. Consider streaming the data or reading in chunks to reduce memory pressure. ########## paimon-vector/src/main/java/org/apache/paimon/vector/ByteVectorIndex.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.lucene.document.KnnByteVectorField; +import org.apache.lucene.index.IndexableField; +import org.apache.lucene.index.VectorSimilarityFunction; + +/** Vector index for byte vector. */ +public class ByteVectorIndex extends VectorIndex { + private final long rowId; + private final byte[] vector; + + public ByteVectorIndex(long rowId, byte[] vector) { + this.rowId = rowId; + this.vector = vector; + } + Review Comment: Missing `@Override` annotation: The `rowId()` method overrides a method from the parent class `VectorIndex` but is missing the `@Override` annotation. This should be added for consistency with the `indexableField()` method at line 39 and with `FloatVectorIndex.java`. ```suggestion @Override ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
