Copilot commented on code in PR #6716: URL: https://github.com/apache/paimon/pull/6716#discussion_r2588132965
########## paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexWriter.java: ########## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.paimon.globalindex.GlobalIndexWriter; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.utils.Range; + +import org.apache.lucene.codecs.KnnVectorsFormat; +import org.apache.lucene.codecs.lucene912.Lucene912Codec; +import org.apache.lucene.codecs.lucene99.Lucene99HnswScalarQuantizedVectorsFormat; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.VectorSimilarityFunction; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MMapDirectory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * Vector global index writer using Apache Lucene 9.x. + * + * <p>This implementation uses Lucene's native KnnFloatVectorField with HNSW algorithm for efficient + * approximate nearest neighbor search. + */ +public class VectorGlobalIndexWriter implements GlobalIndexWriter { + + private final GlobalIndexFileWriter fileWriter; + private final DataType fieldType; + private final VectorIndexOptions vectorOptions; + private final VectorSimilarityFunction similarityFunction; + private final int sizePerIndex; + private final int writeBufferSize; + + private final List<VectorIndex> vectors; + + public VectorGlobalIndexWriter( + GlobalIndexFileWriter fileWriter, DataType fieldType, Options options) { + checkArgument( + fieldType instanceof ArrayType, + "Vector field type must be ARRAY, but was: " + fieldType); + this.fileWriter = fileWriter; + this.fieldType = fieldType; + this.vectors = new ArrayList<>(); + this.vectorOptions = new VectorIndexOptions(options); + this.similarityFunction = parseMetricToLucene(vectorOptions.metric()); + this.sizePerIndex = vectorOptions.sizePerIndex(); + this.writeBufferSize = vectorOptions.writeBufferSize(); + } + + @Override + public void write(Object key) { + if (key instanceof FloatVectorIndex) { + FloatVectorIndex vectorKey = (FloatVectorIndex) key; + float[] vector = vectorKey.vector(); + + checkArgument( + vector.length == vectorOptions.dimension(), + "Vector dimension mismatch: expected " + + vectorOptions.dimension() + + ", but got " + + vector.length); + + vectors.add(vectorKey); + } else if (key instanceof ByteVectorIndex) { + ByteVectorIndex vectorKey = (ByteVectorIndex) key; + byte[] byteVector = vectorKey.vector(); + + checkArgument( + byteVector.length == vectorOptions.dimension(), + "Vector dimension mismatch: expected " + + vectorOptions.dimension() + + ", but got " + + byteVector.length); + + vectors.add(vectorKey); + } else { + throw new IllegalArgumentException( + "Unsupported index type: " + key.getClass().getName()); + } + } + + @Override + public List<ResultEntry> finish() { + try { + if (vectors.isEmpty()) { + return new ArrayList<>(); + } + + List<ResultEntry> results = new ArrayList<>(); + + // Split vectors into batches if size exceeds sizePerIndex + int totalVectors = vectors.size(); + int numBatches = (int) Math.ceil((double) totalVectors / sizePerIndex); + + for (int batchIndex = 0; batchIndex < numBatches; batchIndex++) { + int startIdx = batchIndex * sizePerIndex; + int endIdx = Math.min(startIdx + sizePerIndex, totalVectors); + List<VectorIndex> batchVectors = vectors.subList(startIdx, endIdx); + + // Build index + byte[] indexBytes = + buildIndex( + batchVectors, + this.vectorOptions.m(), + this.vectorOptions.efConstruction(), + this.vectorOptions.writeBufferSize()); + + // Create metadata + VectorIndexMetadata metadata = + new VectorIndexMetadata( + vectorOptions.dimension(), + vectorOptions.metric(), + vectorOptions.m(), + vectorOptions.efConstruction()); + byte[] metaBytes = VectorIndexMetadata.serializeMetadata(metadata); + + // Write to file + String fileName = fileWriter.newFileName(VectorGlobalIndexerFactory.IDENTIFIER); + try (OutputStream out = fileWriter.newOutputStream(fileName)) { + out.write(indexBytes); + } + long minRowIdInBatch = batchVectors.get(0).rowId(); + long maxRowIdInBatch = batchVectors.get(batchVectors.size() - 1).rowId(); + results.add( + ResultEntry.of( + fileName, metaBytes, new Range(minRowIdInBatch, maxRowIdInBatch))); + } + + return results; + } catch (IOException e) { + throw new RuntimeException("Failed to write vector global index", e); + } + } + + private VectorSimilarityFunction parseMetricToLucene(String metric) { + switch (metric.toUpperCase()) { + case "COSINE": + return VectorSimilarityFunction.COSINE; + case "DOT_PRODUCT": + return VectorSimilarityFunction.DOT_PRODUCT; + case "EUCLIDEAN": + return VectorSimilarityFunction.EUCLIDEAN; + case "MAX_INNER_PRODUCT": + return VectorSimilarityFunction.MAXIMUM_INNER_PRODUCT; + default: + throw new IllegalArgumentException("Unsupported metric: " + metric); + } + } + + private byte[] buildIndex( + List<VectorIndex> batchVectors, int m, int efConstruction, int writeBufferSize) + throws IOException { + // Create temporary directory for MMap + java.nio.file.Path tempDir = + java.nio.file.Files.createTempDirectory( + fileWriter.newFileName("paimon-vector-index")); + Directory directory = new MMapDirectory(tempDir); + + try { + // Configure index writer + IndexWriterConfig config = getIndexWriterConfig(m, efConstruction, writeBufferSize); + + try (IndexWriter writer = new IndexWriter(directory, config)) { + // Add each vector as a document + for (VectorIndex vectorIndex : batchVectors) { + Document doc = new Document(); + + // Add KNN vector field + doc.add(vectorIndex.indexableField(similarityFunction)); + + // Store row ID + doc.add(vectorIndex.rowIdStoredField()); + + writer.addDocument(doc); + } + + // Commit changes + writer.commit(); + } + + // Serialize directory to byte array + return serializeDirectory(directory); + } finally { + // Clean up + directory.close(); + deleteDirectory(tempDir); + } + } + + private static IndexWriterConfig getIndexWriterConfig( + int m, int efConstruction, int writeBufferSize) { + IndexWriterConfig config = new IndexWriterConfig(); + config.setRAMBufferSizeMB(writeBufferSize); // Increase buffer for better performance Review Comment: The comment says "Increase buffer for better performance" but this is setting the buffer to a configurable value, not necessarily increasing it. The comment should reflect that this is configuring the RAM buffer size based on user settings. ```suggestion config.setRAMBufferSizeMB(writeBufferSize); // Configure RAM buffer size based on user settings ``` ########## paimon-vector/src/test/java/org/apache/paimon/vector/VectorGlobalIndexTest.java: ########## @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.GlobalIndexWriter; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.utils.Range; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link VectorGlobalIndexWriter} and {@link VectorGlobalIndexReader}. */ +public class VectorGlobalIndexTest { + + @TempDir java.nio.file.Path tempDir; + + private FileIO fileIO; + private Path indexPath; + private DataType vectorType; + + @BeforeEach + public void setup() { + fileIO = new LocalFileIO(); + indexPath = new Path(tempDir.toString()); + vectorType = new ArrayType(new FloatType()); + } + + @AfterEach + public void cleanup() throws IOException { + if (fileIO != null) { + fileIO.delete(indexPath, true); + } + } + + private GlobalIndexFileWriter createFileWriter(Path path) { + return new GlobalIndexFileWriter() { + @Override + public String newFileName(String prefix) { + return prefix + "-" + UUID.randomUUID(); + } + + @Override + public OutputStream newOutputStream(String fileName) throws IOException { + return fileIO.newOutputStream(new Path(path, fileName), false); + } + }; + } + + private GlobalIndexFileReader createFileReader(Path path) { + return fileName -> fileIO.newInputStream(new Path(path, fileName)); + } + + @Test + public void testWriteAndReadVectorIndex() throws IOException { + Options options = createDefaultOptions(); + int dimension = 128; + int numVectors = 100; + + // Create writer and write vectors + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + assertThat(result.fileName()).isNotNull(); + assertThat(result.rowRange()).isEqualTo(new Range(0, numVectors - 1)); + + // Create reader and verify + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Test search - query with first vector should return itself + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 5); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + + @Test + public void testSearchSimilarVectors() throws IOException { + Options options = createDefaultOptions(); + int dimension = 64; + options.setInteger("vector.dim", dimension); // Set correct dimension + int numVectors = 50; + + // Create vectors with known similarities + float[] baseVector = new float[dimension]; + for (int i = 0; i < dimension; i++) { + baseVector[i] = 1.0f; + } + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + // Write base vector and similar variants + writer.write(new FloatVectorIndex(0, baseVector)); + + for (int i = 1; i < numVectors; i++) { + float[] variant = baseVector.clone(); + // Add small noise + variant[i % dimension] += (i % 2 == 0 ? 0.1f : -0.1f); + writer.write(new FloatVectorIndex(i, variant)); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Search with base vector should return similar vectors + GlobalIndexResult searchResult = reader.search(baseVector, 10); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + + @Test + public void testDifferentSimilarityFunctions() throws IOException { + int dimension = 32; + int numVectors = 20; + + String[] metrics = {"COSINE", "DOT_PRODUCT", "EUCLIDEAN"}; + + for (String metric : metrics) { + Options options = createDefaultOptions(); + options.setString("vector.metric", metric); + options.setInteger("vector.dim", dimension); // Set correct dimension + + Path metricIndexPath = new Path(indexPath, metric.toLowerCase()); + GlobalIndexFileWriter fileWriter = createFileWriter(metricIndexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(metricIndexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(metricIndexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Verify search works with this metric + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 3); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + } + + @Test + public void testDifferentDimensions() throws IOException { + int[] dimensions = {8, 32, 128, 256}; + + for (int dimension : dimensions) { + Options options = createDefaultOptions(); + options.setInteger("vector.dim", dimension); + + Path dimIndexPath = new Path(indexPath, "dim_" + dimension); + GlobalIndexFileWriter fileWriter = createFileWriter(dimIndexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + int numVectors = 10; + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(dimIndexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(dimIndexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Verify search works with this dimension + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 5); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + } + + @Test + public void testDimensionMismatch() throws IOException { + Options options = createDefaultOptions(); + options.setInteger("vector.dim", 64); + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + // Try to write vector with wrong dimension + float[] wrongDimVector = new float[32]; // Wrong dimension + assertThatThrownBy(() -> writer.write(new FloatVectorIndex(0, wrongDimVector))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("dimension mismatch"); + } + + @Test + public void testEmptyIndex() throws IOException { + Options options = createDefaultOptions(); + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).isEmpty(); + } + + @Test + public void testHNSWParameters() throws IOException { + Options options = createDefaultOptions(); + options.setInteger("vector.m", 32); // Larger M for better recall + options.setInteger("vector.ef-construction", 200); // Larger ef for better quality + + int dimension = 64; + options.setInteger("vector.dim", dimension); // Set correct dimension + int numVectors = 50; + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Verify search works with custom HNSW parameters + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 5); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + + @Test + public void testLargeK() throws IOException { + Options options = createDefaultOptions(); + int dimension = 32; + options.setInteger("vector.dim", dimension); // Set correct dimension + int numVectors = 100; + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Search with k larger than number of vectors + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 200); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + + @Test + public void testVectorIndexEndToEnd() throws IOException { + Options options = createDefaultOptions(); + int dimension = 2; + options.setInteger("vector.dim", dimension); + options.setString("vector.metric", "EUCLIDEAN"); + options.setInteger("vector.m", 16); + options.setInteger("vector.ef-construction", 100); + options.setInteger("vector.size-per-index", 3); + + // Create semantic vectors representing "documents" + // Apple [0.9, 0.1] and Banana [0.8, 0.2] and Orange [0.85, 0.15] are similar (fruits) + // Car [0.1, 0.9], Bike [0.15, 0.85], and Truck [0.2, 0.8] are different (vehicles) Review Comment: The comment says "Apple [0.9, 0.1] and Banana [0.8, 0.2] and Orange [0.85, 0.15] are similar (fruits)" but this could be misleading because the similarity actually depends on the metric being used (EUCLIDEAN in this test). The comment describes conceptual similarity but doesn't clarify that these are close in Euclidean distance. ```suggestion // Apple [0.9, 0.1], Banana [0.8, 0.2], and Orange [0.85, 0.15] are close in Euclidean distance (fruits) // Car [0.1, 0.9], Bike [0.15, 0.85], and Truck [0.2, 0.8] are close to each other in Euclidean distance (vehicles), but far from the fruit vectors. ``` ########## paimon-vector/src/test/java/org/apache/paimon/vector/VectorGlobalIndexTest.java: ########## @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.GlobalIndexWriter; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.utils.Range; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link VectorGlobalIndexWriter} and {@link VectorGlobalIndexReader}. */ +public class VectorGlobalIndexTest { + + @TempDir java.nio.file.Path tempDir; + + private FileIO fileIO; + private Path indexPath; + private DataType vectorType; + + @BeforeEach + public void setup() { + fileIO = new LocalFileIO(); + indexPath = new Path(tempDir.toString()); + vectorType = new ArrayType(new FloatType()); + } + + @AfterEach + public void cleanup() throws IOException { + if (fileIO != null) { + fileIO.delete(indexPath, true); + } + } + + private GlobalIndexFileWriter createFileWriter(Path path) { + return new GlobalIndexFileWriter() { + @Override + public String newFileName(String prefix) { + return prefix + "-" + UUID.randomUUID(); + } + + @Override + public OutputStream newOutputStream(String fileName) throws IOException { + return fileIO.newOutputStream(new Path(path, fileName), false); + } + }; + } + + private GlobalIndexFileReader createFileReader(Path path) { + return fileName -> fileIO.newInputStream(new Path(path, fileName)); + } + + @Test + public void testWriteAndReadVectorIndex() throws IOException { + Options options = createDefaultOptions(); + int dimension = 128; + int numVectors = 100; + + // Create writer and write vectors + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + assertThat(result.fileName()).isNotNull(); + assertThat(result.rowRange()).isEqualTo(new Range(0, numVectors - 1)); + + // Create reader and verify + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Test search - query with first vector should return itself + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 5); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + + @Test + public void testSearchSimilarVectors() throws IOException { + Options options = createDefaultOptions(); + int dimension = 64; + options.setInteger("vector.dim", dimension); // Set correct dimension + int numVectors = 50; + + // Create vectors with known similarities + float[] baseVector = new float[dimension]; + for (int i = 0; i < dimension; i++) { + baseVector[i] = 1.0f; + } + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + // Write base vector and similar variants + writer.write(new FloatVectorIndex(0, baseVector)); + + for (int i = 1; i < numVectors; i++) { + float[] variant = baseVector.clone(); + // Add small noise + variant[i % dimension] += (i % 2 == 0 ? 0.1f : -0.1f); + writer.write(new FloatVectorIndex(i, variant)); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Search with base vector should return similar vectors + GlobalIndexResult searchResult = reader.search(baseVector, 10); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + + @Test + public void testDifferentSimilarityFunctions() throws IOException { + int dimension = 32; + int numVectors = 20; + + String[] metrics = {"COSINE", "DOT_PRODUCT", "EUCLIDEAN"}; + + for (String metric : metrics) { + Options options = createDefaultOptions(); + options.setString("vector.metric", metric); + options.setInteger("vector.dim", dimension); // Set correct dimension + + Path metricIndexPath = new Path(indexPath, metric.toLowerCase()); + GlobalIndexFileWriter fileWriter = createFileWriter(metricIndexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(metricIndexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(metricIndexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Verify search works with this metric + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 3); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + } + + @Test + public void testDifferentDimensions() throws IOException { + int[] dimensions = {8, 32, 128, 256}; + + for (int dimension : dimensions) { + Options options = createDefaultOptions(); + options.setInteger("vector.dim", dimension); + + Path dimIndexPath = new Path(indexPath, "dim_" + dimension); + GlobalIndexFileWriter fileWriter = createFileWriter(dimIndexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + int numVectors = 10; + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(dimIndexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(dimIndexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Verify search works with this dimension + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 5); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + } + + @Test + public void testDimensionMismatch() throws IOException { + Options options = createDefaultOptions(); + options.setInteger("vector.dim", 64); + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + // Try to write vector with wrong dimension + float[] wrongDimVector = new float[32]; // Wrong dimension + assertThatThrownBy(() -> writer.write(new FloatVectorIndex(0, wrongDimVector))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("dimension mismatch"); + } + + @Test + public void testEmptyIndex() throws IOException { + Options options = createDefaultOptions(); + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).isEmpty(); + } + + @Test + public void testHNSWParameters() throws IOException { + Options options = createDefaultOptions(); + options.setInteger("vector.m", 32); // Larger M for better recall + options.setInteger("vector.ef-construction", 200); // Larger ef for better quality + + int dimension = 64; + options.setInteger("vector.dim", dimension); // Set correct dimension + int numVectors = 50; + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Verify search works with custom HNSW parameters + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 5); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + + @Test + public void testLargeK() throws IOException { + Options options = createDefaultOptions(); + int dimension = 32; + options.setInteger("vector.dim", dimension); // Set correct dimension + int numVectors = 100; + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Search with k larger than number of vectors + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 200); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + + @Test + public void testVectorIndexEndToEnd() throws IOException { + Options options = createDefaultOptions(); + int dimension = 2; + options.setInteger("vector.dim", dimension); + options.setString("vector.metric", "EUCLIDEAN"); + options.setInteger("vector.m", 16); + options.setInteger("vector.ef-construction", 100); + options.setInteger("vector.size-per-index", 3); + + // Create semantic vectors representing "documents" + // Apple [0.9, 0.1] and Banana [0.8, 0.2] and Orange [0.85, 0.15] are similar (fruits) + // Car [0.1, 0.9], Bike [0.15, 0.85], and Truck [0.2, 0.8] are different (vehicles) + float[] appleVector = new float[] {0.9f, 0.1f}; + float[] bananaVector = new float[] {0.8f, 0.2f}; + float[] carVector = new float[] {0.1f, 0.9f}; + float[] orangeVector = new float[] {0.85f, 0.15f}; + float[] bikeVector = new float[] {0.15f, 0.85f}; + float[] truckVector = new float[] {0.2f, 0.8f}; + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + // Write vectors with row IDs representing documents + long appleId = 0L; + long bananaId = 1L; + long carId = 2L; + long orangeId = 3L; + long bikeId = 4L; + long truckId = 5L; + + writer.write(new FloatVectorIndex(appleId, appleVector)); + writer.write(new FloatVectorIndex(bananaId, bananaVector)); + writer.write(new FloatVectorIndex(carId, carVector)); + writer.write(new FloatVectorIndex(orangeId, orangeVector)); + writer.write(new FloatVectorIndex(bikeId, bikeVector)); + writer.write(new FloatVectorIndex(truckId, truckVector)); + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(2); + + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + for (GlobalIndexWriter.ResultEntry result : results) { + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + } + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Test 1: Query with vector similar to "Apple" - should find Apple, Banana, and Orange + // Query: [0.85, 0.15] is between Apple and Banana + float[] queryVector = new float[] {0.85f, 0.15f}; + GlobalIndexResult searchResult = reader.search(queryVector, 1); + + assertThat(searchResult).isNotNull(); + assertThat(searchResult.results().iterator().hasNext()) + .as("Search should return results") + .isTrue(); + + // Collect all result IDs + List<Long> resultIds = new ArrayList<>(); + searchResult.results().iterator().forEachRemaining(resultIds::add); + + // Should find Apple, Banana, and Orange (similar fruits), not vehicles + assertThat(resultIds).as("Should find fruits").containsAnyOf(appleId, bananaId, orangeId); + assertThat(resultIds.size()).as("Should return top 3 results").isLessThanOrEqualTo(3); + + // Test 2: Query with exact match - should find exact document + GlobalIndexResult exactMatchResult = reader.search(appleVector, 1); + assertThat(containsRowId(exactMatchResult, appleId)) + .as("Exact match query should find Apple") + .isTrue(); + + // Test 3: Query with vector similar to "Car" - should find Car first + float[] carQueryVector = new float[] {0.15f, 0.85f}; + GlobalIndexResult carSearchResult = reader.search(carQueryVector, 1); + + List<Long> carResultIds = new ArrayList<>(); + carSearchResult.results().iterator().forEachRemaining(carResultIds::add); + + assertThat(carResultIds).as("Query similar to Car should find Car").contains(carId); Review Comment: The test comment and assertion are incorrect. The query vector [0.15f, 0.85f] is exactly equal to the Bike vector [0.15f, 0.85f] (line 394), not the Car vector [0.1f, 0.9f] (line 392). The test should either: (1) change the comment and assertion to expect Bike, or (2) change the query vector to be closer to Car, such as [0.12f, 0.88f]. ########## paimon-vector/src/main/java/org/apache/paimon/vector/VectorIndexMetadata.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; + +/** Metadata for the vector index, including dimension, similarity function, and HNSW parameters. */ +public class VectorIndexMetadata implements Serializable { + + private static final long serialVersionUID = 1L; + + private final int dimension; + private final String similarityFunction; + private final int m; + private final int efConstruction; + + public VectorIndexMetadata( + int dimension, String similarityFunction, int m, int efConstruction) { + this.dimension = dimension; + this.similarityFunction = similarityFunction; + this.m = m; + this.efConstruction = efConstruction; + } + + public static byte[] serializeMetadata(VectorIndexMetadata vectorIndexMetadata) + throws IOException { + ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); + try (DataOutputStream dataOut = new DataOutputStream(byteOut)) { + dataOut.writeInt(vectorIndexMetadata.dimension()); + dataOut.writeUTF(vectorIndexMetadata.similarityFunction()); + dataOut.writeInt(vectorIndexMetadata.m()); + dataOut.writeInt(vectorIndexMetadata.efConstruction()); + } + return byteOut.toByteArray(); + } + Review Comment: The `serializeMetadata` method only serializes metadata but there's no corresponding `deserializeMetadata` method. This creates an asymmetric API and may cause confusion. Consider adding a matching deserialization method for consistency and future use. ```suggestion public static VectorIndexMetadata deserializeMetadata(byte[] data) throws IOException { try (java.io.DataInputStream dataIn = new java.io.DataInputStream(new java.io.ByteArrayInputStream(data))) { int dimension = dataIn.readInt(); String similarityFunction = dataIn.readUTF(); int m = dataIn.readInt(); int efConstruction = dataIn.readInt(); return new VectorIndexMetadata(dimension, similarityFunction, m, efConstruction); } } ``` ########## paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexWriter.java: ########## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.paimon.globalindex.GlobalIndexWriter; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.utils.Range; + +import org.apache.lucene.codecs.KnnVectorsFormat; +import org.apache.lucene.codecs.lucene912.Lucene912Codec; +import org.apache.lucene.codecs.lucene99.Lucene99HnswScalarQuantizedVectorsFormat; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.VectorSimilarityFunction; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MMapDirectory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * Vector global index writer using Apache Lucene 9.x. + * + * <p>This implementation uses Lucene's native KnnFloatVectorField with HNSW algorithm for efficient + * approximate nearest neighbor search. + */ +public class VectorGlobalIndexWriter implements GlobalIndexWriter { + + private final GlobalIndexFileWriter fileWriter; + private final DataType fieldType; + private final VectorIndexOptions vectorOptions; + private final VectorSimilarityFunction similarityFunction; + private final int sizePerIndex; + private final int writeBufferSize; + + private final List<VectorIndex> vectors; + + public VectorGlobalIndexWriter( + GlobalIndexFileWriter fileWriter, DataType fieldType, Options options) { + checkArgument( + fieldType instanceof ArrayType, + "Vector field type must be ARRAY, but was: " + fieldType); + this.fileWriter = fileWriter; + this.fieldType = fieldType; + this.vectors = new ArrayList<>(); + this.vectorOptions = new VectorIndexOptions(options); + this.similarityFunction = parseMetricToLucene(vectorOptions.metric()); + this.sizePerIndex = vectorOptions.sizePerIndex(); + this.writeBufferSize = vectorOptions.writeBufferSize(); + } + + @Override + public void write(Object key) { + if (key instanceof FloatVectorIndex) { + FloatVectorIndex vectorKey = (FloatVectorIndex) key; + float[] vector = vectorKey.vector(); + + checkArgument( + vector.length == vectorOptions.dimension(), + "Vector dimension mismatch: expected " + + vectorOptions.dimension() + + ", but got " + + vector.length); + + vectors.add(vectorKey); + } else if (key instanceof ByteVectorIndex) { + ByteVectorIndex vectorKey = (ByteVectorIndex) key; + byte[] byteVector = vectorKey.vector(); + + checkArgument( + byteVector.length == vectorOptions.dimension(), + "Vector dimension mismatch: expected " + + vectorOptions.dimension() + + ", but got " + + byteVector.length); + + vectors.add(vectorKey); + } else { + throw new IllegalArgumentException( + "Unsupported index type: " + key.getClass().getName()); + } + } + + @Override + public List<ResultEntry> finish() { + try { + if (vectors.isEmpty()) { + return new ArrayList<>(); + } + + List<ResultEntry> results = new ArrayList<>(); + + // Split vectors into batches if size exceeds sizePerIndex + int totalVectors = vectors.size(); + int numBatches = (int) Math.ceil((double) totalVectors / sizePerIndex); + + for (int batchIndex = 0; batchIndex < numBatches; batchIndex++) { + int startIdx = batchIndex * sizePerIndex; + int endIdx = Math.min(startIdx + sizePerIndex, totalVectors); + List<VectorIndex> batchVectors = vectors.subList(startIdx, endIdx); + + // Build index + byte[] indexBytes = + buildIndex( + batchVectors, + this.vectorOptions.m(), + this.vectorOptions.efConstruction(), + this.vectorOptions.writeBufferSize()); + + // Create metadata + VectorIndexMetadata metadata = + new VectorIndexMetadata( + vectorOptions.dimension(), + vectorOptions.metric(), + vectorOptions.m(), + vectorOptions.efConstruction()); + byte[] metaBytes = VectorIndexMetadata.serializeMetadata(metadata); + + // Write to file + String fileName = fileWriter.newFileName(VectorGlobalIndexerFactory.IDENTIFIER); + try (OutputStream out = fileWriter.newOutputStream(fileName)) { + out.write(indexBytes); + } + long minRowIdInBatch = batchVectors.get(0).rowId(); + long maxRowIdInBatch = batchVectors.get(batchVectors.size() - 1).rowId(); + results.add( + ResultEntry.of( + fileName, metaBytes, new Range(minRowIdInBatch, maxRowIdInBatch))); + } + + return results; + } catch (IOException e) { + throw new RuntimeException("Failed to write vector global index", e); + } + } + + private VectorSimilarityFunction parseMetricToLucene(String metric) { + switch (metric.toUpperCase()) { + case "COSINE": + return VectorSimilarityFunction.COSINE; + case "DOT_PRODUCT": + return VectorSimilarityFunction.DOT_PRODUCT; + case "EUCLIDEAN": + return VectorSimilarityFunction.EUCLIDEAN; + case "MAX_INNER_PRODUCT": + return VectorSimilarityFunction.MAXIMUM_INNER_PRODUCT; + default: + throw new IllegalArgumentException("Unsupported metric: " + metric); + } + } + + private byte[] buildIndex( + List<VectorIndex> batchVectors, int m, int efConstruction, int writeBufferSize) + throws IOException { + // Create temporary directory for MMap + java.nio.file.Path tempDir = + java.nio.file.Files.createTempDirectory( + fileWriter.newFileName("paimon-vector-index")); + Directory directory = new MMapDirectory(tempDir); + + try { + // Configure index writer + IndexWriterConfig config = getIndexWriterConfig(m, efConstruction, writeBufferSize); + + try (IndexWriter writer = new IndexWriter(directory, config)) { + // Add each vector as a document + for (VectorIndex vectorIndex : batchVectors) { + Document doc = new Document(); + + // Add KNN vector field + doc.add(vectorIndex.indexableField(similarityFunction)); + + // Store row ID + doc.add(vectorIndex.rowIdStoredField()); + + writer.addDocument(doc); + } + + // Commit changes + writer.commit(); + } + + // Serialize directory to byte array + return serializeDirectory(directory); + } finally { + // Clean up + directory.close(); + deleteDirectory(tempDir); + } + } + + private static IndexWriterConfig getIndexWriterConfig( + int m, int efConstruction, int writeBufferSize) { + IndexWriterConfig config = new IndexWriterConfig(); + config.setRAMBufferSizeMB(writeBufferSize); // Increase buffer for better performance + config.setCodec( + new Lucene912Codec(Lucene912Codec.Mode.BEST_SPEED) { + @Override + public KnnVectorsFormat getKnnVectorsFormatForField(String field) { + return new Lucene99HnswScalarQuantizedVectorsFormat(m, efConstruction); + } + }); + return config; + } + + private void deleteDirectory(java.nio.file.Path path) throws IOException { + if (java.nio.file.Files.exists(path)) { + java.nio.file.Files.walk(path) + .sorted(java.util.Comparator.reverseOrder()) + .forEach( + p -> { + try { + java.nio.file.Files.delete(p); + } catch (IOException e) { + // Ignore cleanup errors + } + }); Review Comment: The lambda silently ignores cleanup errors which could mask issues during testing or debugging. Consider at least logging these errors instead of completely ignoring them, or document why silent failure is acceptable here. ########## .github/workflows/utitcase-paimon-vector.yml: ########## @@ -0,0 +1,65 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +name: UTCase and ITCase Spark 4.x Review Comment: The workflow name "UTCase and ITCase Spark 4.x" is incorrect for a paimon-vector workflow. This should be something like "UTCase and ITCase Paimon Vector" to accurately reflect what this workflow tests. ```suggestion name: UTCase and ITCase Paimon Vector ``` ########## paimon-vector/src/test/java/org/apache/paimon/vector/VectorGlobalIndexTest.java: ########## @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.GlobalIndexWriter; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.utils.Range; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link VectorGlobalIndexWriter} and {@link VectorGlobalIndexReader}. */ +public class VectorGlobalIndexTest { + + @TempDir java.nio.file.Path tempDir; + + private FileIO fileIO; + private Path indexPath; + private DataType vectorType; + + @BeforeEach + public void setup() { + fileIO = new LocalFileIO(); + indexPath = new Path(tempDir.toString()); + vectorType = new ArrayType(new FloatType()); + } + + @AfterEach + public void cleanup() throws IOException { + if (fileIO != null) { + fileIO.delete(indexPath, true); + } + } + + private GlobalIndexFileWriter createFileWriter(Path path) { + return new GlobalIndexFileWriter() { + @Override + public String newFileName(String prefix) { + return prefix + "-" + UUID.randomUUID(); + } + + @Override + public OutputStream newOutputStream(String fileName) throws IOException { + return fileIO.newOutputStream(new Path(path, fileName), false); + } + }; + } + + private GlobalIndexFileReader createFileReader(Path path) { + return fileName -> fileIO.newInputStream(new Path(path, fileName)); + } + + @Test + public void testWriteAndReadVectorIndex() throws IOException { + Options options = createDefaultOptions(); + int dimension = 128; + int numVectors = 100; + + // Create writer and write vectors + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + assertThat(result.fileName()).isNotNull(); + assertThat(result.rowRange()).isEqualTo(new Range(0, numVectors - 1)); + + // Create reader and verify + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Test search - query with first vector should return itself + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 5); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + + @Test + public void testSearchSimilarVectors() throws IOException { + Options options = createDefaultOptions(); + int dimension = 64; + options.setInteger("vector.dim", dimension); // Set correct dimension + int numVectors = 50; + + // Create vectors with known similarities + float[] baseVector = new float[dimension]; + for (int i = 0; i < dimension; i++) { + baseVector[i] = 1.0f; + } + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + // Write base vector and similar variants + writer.write(new FloatVectorIndex(0, baseVector)); + + for (int i = 1; i < numVectors; i++) { + float[] variant = baseVector.clone(); + // Add small noise + variant[i % dimension] += (i % 2 == 0 ? 0.1f : -0.1f); + writer.write(new FloatVectorIndex(i, variant)); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Search with base vector should return similar vectors + GlobalIndexResult searchResult = reader.search(baseVector, 10); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + + @Test + public void testDifferentSimilarityFunctions() throws IOException { + int dimension = 32; + int numVectors = 20; + + String[] metrics = {"COSINE", "DOT_PRODUCT", "EUCLIDEAN"}; + + for (String metric : metrics) { + Options options = createDefaultOptions(); + options.setString("vector.metric", metric); + options.setInteger("vector.dim", dimension); // Set correct dimension + + Path metricIndexPath = new Path(indexPath, metric.toLowerCase()); + GlobalIndexFileWriter fileWriter = createFileWriter(metricIndexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(metricIndexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(metricIndexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Verify search works with this metric + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 3); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + } + + @Test + public void testDifferentDimensions() throws IOException { + int[] dimensions = {8, 32, 128, 256}; + + for (int dimension : dimensions) { + Options options = createDefaultOptions(); + options.setInteger("vector.dim", dimension); + + Path dimIndexPath = new Path(indexPath, "dim_" + dimension); + GlobalIndexFileWriter fileWriter = createFileWriter(dimIndexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + int numVectors = 10; + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(dimIndexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(dimIndexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Verify search works with this dimension + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 5); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + } + + @Test + public void testDimensionMismatch() throws IOException { + Options options = createDefaultOptions(); + options.setInteger("vector.dim", 64); + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + // Try to write vector with wrong dimension + float[] wrongDimVector = new float[32]; // Wrong dimension + assertThatThrownBy(() -> writer.write(new FloatVectorIndex(0, wrongDimVector))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("dimension mismatch"); + } + + @Test + public void testEmptyIndex() throws IOException { + Options options = createDefaultOptions(); + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).isEmpty(); + } + + @Test + public void testHNSWParameters() throws IOException { + Options options = createDefaultOptions(); + options.setInteger("vector.m", 32); // Larger M for better recall + options.setInteger("vector.ef-construction", 200); // Larger ef for better quality + + int dimension = 64; + options.setInteger("vector.dim", dimension); // Set correct dimension + int numVectors = 50; + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Verify search works with custom HNSW parameters + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 5); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + + @Test + public void testLargeK() throws IOException { + Options options = createDefaultOptions(); + int dimension = 32; + options.setInteger("vector.dim", dimension); // Set correct dimension + int numVectors = 100; + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + List<float[]> testVectors = generateRandomVectors(numVectors, dimension); + for (int i = 0; i < numVectors; i++) { + writer.write(new FloatVectorIndex(i, testVectors.get(i))); + } + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Search with k larger than number of vectors + GlobalIndexResult searchResult = reader.search(testVectors.get(0), 200); + assertThat(searchResult).isNotNull(); + + reader.close(); + } + + @Test + public void testVectorIndexEndToEnd() throws IOException { + Options options = createDefaultOptions(); + int dimension = 2; + options.setInteger("vector.dim", dimension); + options.setString("vector.metric", "EUCLIDEAN"); + options.setInteger("vector.m", 16); + options.setInteger("vector.ef-construction", 100); + options.setInteger("vector.size-per-index", 3); + + // Create semantic vectors representing "documents" + // Apple [0.9, 0.1] and Banana [0.8, 0.2] and Orange [0.85, 0.15] are similar (fruits) + // Car [0.1, 0.9], Bike [0.15, 0.85], and Truck [0.2, 0.8] are different (vehicles) + float[] appleVector = new float[] {0.9f, 0.1f}; + float[] bananaVector = new float[] {0.8f, 0.2f}; + float[] carVector = new float[] {0.1f, 0.9f}; + float[] orangeVector = new float[] {0.85f, 0.15f}; + float[] bikeVector = new float[] {0.15f, 0.85f}; + float[] truckVector = new float[] {0.2f, 0.8f}; + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, vectorType, options); + + // Write vectors with row IDs representing documents + long appleId = 0L; + long bananaId = 1L; + long carId = 2L; + long orangeId = 3L; + long bikeId = 4L; + long truckId = 5L; + + writer.write(new FloatVectorIndex(appleId, appleVector)); + writer.write(new FloatVectorIndex(bananaId, bananaVector)); + writer.write(new FloatVectorIndex(carId, carVector)); + writer.write(new FloatVectorIndex(orangeId, orangeVector)); + writer.write(new FloatVectorIndex(bikeId, bikeVector)); + writer.write(new FloatVectorIndex(truckId, truckVector)); + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(2); + + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + for (GlobalIndexWriter.ResultEntry result : results) { + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + } + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Test 1: Query with vector similar to "Apple" - should find Apple, Banana, and Orange + // Query: [0.85, 0.15] is between Apple and Banana + float[] queryVector = new float[] {0.85f, 0.15f}; + GlobalIndexResult searchResult = reader.search(queryVector, 1); + + assertThat(searchResult).isNotNull(); + assertThat(searchResult.results().iterator().hasNext()) + .as("Search should return results") + .isTrue(); + + // Collect all result IDs + List<Long> resultIds = new ArrayList<>(); + searchResult.results().iterator().forEachRemaining(resultIds::add); + + // Should find Apple, Banana, and Orange (similar fruits), not vehicles + assertThat(resultIds).as("Should find fruits").containsAnyOf(appleId, bananaId, orangeId); + assertThat(resultIds.size()).as("Should return top 3 results").isLessThanOrEqualTo(3); + + // Test 2: Query with exact match - should find exact document + GlobalIndexResult exactMatchResult = reader.search(appleVector, 1); + assertThat(containsRowId(exactMatchResult, appleId)) + .as("Exact match query should find Apple") + .isTrue(); + + // Test 3: Query with vector similar to "Car" - should find Car first + float[] carQueryVector = new float[] {0.15f, 0.85f}; + GlobalIndexResult carSearchResult = reader.search(carQueryVector, 1); + + List<Long> carResultIds = new ArrayList<>(); + carSearchResult.results().iterator().forEachRemaining(carResultIds::add); + + assertThat(carResultIds).as("Query similar to Car should find Car").contains(carId); + + // Test 4: Search with larger k than available documents + GlobalIndexResult largeKResult = reader.search(queryVector, 10); + assertThat(largeKResult).isNotNull(); + assertThat(largeKResult.results().iterator().hasNext()).isTrue(); + + // Test 5: Verify all six documents are indexed + GlobalIndexResult allDocsResult = reader.search(new float[] {0.5f, 0.5f}, 6); + List<Long> allResultIds = new ArrayList<>(); + allDocsResult.results().iterator().forEachRemaining(allResultIds::add); + + assertThat(allResultIds.size()) + .as("Should be able to retrieve all 6 documents") + .isGreaterThanOrEqualTo(1) + .isLessThanOrEqualTo(6); + + reader.close(); + } + + @Test + public void testByteVectorIndexEndToEnd() throws IOException { + Options options = createDefaultOptions(); + int dimension = 2; + options.setInteger("vector.dim", dimension); + options.setString("vector.metric", "EUCLIDEAN"); + options.setInteger("vector.m", 16); + options.setInteger("vector.ef-construction", 100); + + // Create semantic vectors representing "documents" using byte vectors + // Apple [90, 10] and Banana [80, 20] are similar (fruits) + // Car [10, 90] is different (vehicle) + byte[] appleVector = new byte[] {90, 10}; + byte[] bananaVector = new byte[] {80, 20}; + byte[] carVector = new byte[] {10, 90}; + + DataType byteVectorType = new ArrayType(new TinyIntType()); + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = + new VectorGlobalIndexWriter(fileWriter, byteVectorType, options); + + // Write vectors with row IDs representing documents + long appleId = 0L; + long bananaId = 1L; + long carId = 2L; + + writer.write(new ByteVectorIndex(appleId, appleVector)); + writer.write(new ByteVectorIndex(bananaId, bananaVector)); + writer.write(new ByteVectorIndex(carId, carVector)); + + List<GlobalIndexWriter.ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + GlobalIndexWriter.ResultEntry result = results.get(0); + GlobalIndexFileReader fileReader = createFileReader(indexPath); + List<GlobalIndexIOMeta> metas = new ArrayList<>(); + metas.add( + new GlobalIndexIOMeta( + result.fileName(), + fileIO.getFileSize(new Path(indexPath, result.fileName())), + result.rowRange(), + result.meta())); + + VectorGlobalIndexReader reader = new VectorGlobalIndexReader(fileReader, metas); + + // Test 1: Query with vector similar to "Apple" - should find Apple and Banana + // Query: [85, 15] is between Apple and Banana + byte[] queryVector = new byte[] {85, 15}; + GlobalIndexResult searchResult = reader.search(queryVector, 2); + + assertThat(searchResult).isNotNull(); + assertThat(searchResult.results().iterator().hasNext()) + .as("Search should return results") + .isTrue(); + + // Collect all result IDs + List<Long> resultIds = new ArrayList<>(); + searchResult.results().iterator().forEachRemaining(resultIds::add); + + // Should find Apple and Banana (similar fruits), not Car + assertThat(resultIds).as("Should find Apple (most similar)").contains(appleId); + assertThat(resultIds).as("Should find Banana (second most similar)").contains(bananaId); + assertThat(resultIds.size()).as("Should return top 2 results").isLessThanOrEqualTo(2); + + // Test 2: Query with exact match - should find exact document + GlobalIndexResult exactMatchResult = reader.search(appleVector, 1); + assertThat(containsRowId(exactMatchResult, appleId)) + .as("Exact match query should find Apple") + .isTrue(); + + // Test 3: Query with vector similar to "Car" - should find Car first + byte[] carQueryVector = new byte[] {15, 85}; + GlobalIndexResult carSearchResult = reader.search(carQueryVector, 1); + + List<Long> carResultIds = new ArrayList<>(); + carSearchResult.results().iterator().forEachRemaining(carResultIds::add); + + assertThat(carResultIds).as("Query similar to Car should find Car").contains(carId); + + // Test 4: Search with larger k than available documents + GlobalIndexResult largeKResult = reader.search(queryVector, 10); + assertThat(largeKResult).isNotNull(); + assertThat(largeKResult.results().iterator().hasNext()).isTrue(); + + // Test 5: Verify all three documents are indexed + GlobalIndexResult allDocsResult = reader.search(new byte[] {50, 50}, 3); + List<Long> allResultIds = new ArrayList<>(); + allDocsResult.results().iterator().forEachRemaining(allResultIds::add); + + assertThat(allResultIds.size()) + .as("Should be able to retrieve all 3 documents") + .isGreaterThanOrEqualTo(1) + .isLessThanOrEqualTo(3); + + reader.close(); + } + + private Options createDefaultOptions() { + Options options = new Options(); + options.setInteger("vector.dim", 128); + options.setString("vector.metric", "COSINE"); + options.setInteger("vector.m", 16); + options.setInteger("vector.ef-construction", 100); + return options; + } Review Comment: The `createDefaultOptions()` method sets the metric to "COSINE" but many tests override this to use "EUCLIDEAN". Consider renaming this method to something like `createBaseOptions()` or documenting that these are base settings that tests typically override, to avoid confusion about what the "default" actually means. ########## paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexReader.java: ########## @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexReader; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.utils.RoaringNavigableMap64; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.StoredFields; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.KnnByteVectorQuery; +import org.apache.lucene.search.KnnFloatVectorQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.MMapDirectory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +/** + * Vector global index reader using Apache Lucene 9.x. + * + * <p>This implementation uses Lucene's native KnnFloatVectorQuery with HNSW graph for efficient + * approximate nearest neighbor search. + */ +public class VectorGlobalIndexReader implements GlobalIndexReader { + + private static final int BUFFER_SIZE = 8192; // 8KB buffer for streaming + + private final List<IndexSearcher> searchers; + private final List<Directory> directories; + private final List<java.nio.file.Path> tempDirs; + + public VectorGlobalIndexReader(GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files) + throws IOException { + this.searchers = new ArrayList<>(); + this.directories = new ArrayList<>(); + this.tempDirs = new ArrayList<>(); + loadIndices(fileReader, files); + } + + /** + * Search for similar vectors using Lucene KNN search. + * + * @param query query vector + * @param k number of results + * @return global index result containing row IDs + */ + public GlobalIndexResult search(float[] query, int k) { + KnnFloatVectorQuery knnQuery = new KnnFloatVectorQuery(VectorIndex.VECTOR_FIELD, query, k); + return search(knnQuery, k); + } + + public GlobalIndexResult search(byte[] query, int k) { + KnnByteVectorQuery knnQuery = new KnnByteVectorQuery(VectorIndex.VECTOR_FIELD, query, k); + return search(knnQuery, k); + } + + @Override + public void close() throws IOException { + // Close readers + for (IndexSearcher searcher : searchers) { + searcher.getIndexReader().close(); + } + searchers.clear(); + + // Close directories + for (Directory directory : directories) { + directory.close(); + } + directories.clear(); + + // Clean up temp directories + for (java.nio.file.Path tempDir : tempDirs) { + deleteDirectory(tempDir); + } + tempDirs.clear(); + } + + private GlobalIndexResult search(Query query, int k) { + RoaringNavigableMap64 roaringBitmap64 = new RoaringNavigableMap64(); + for (IndexSearcher searcher : searchers) { + try { + // Execute search + TopDocs topDocs = searcher.search(query, k); + StoredFields storedFields = searcher.storedFields(); + Set<String> fieldsToLoad = Set.of(VectorIndex.ROW_ID_FIELD); + // Collect row IDs from results + for (org.apache.lucene.search.ScoreDoc scoreDoc : topDocs.scoreDocs) { + float rawScore = scoreDoc.score; Review Comment: The variable `rawScore` is declared but never used. Consider removing it or using it for logging/debugging purposes if score information is relevant. ```suggestion ``` ########## paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexReader.java: ########## @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexReader; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.utils.RoaringNavigableMap64; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.StoredFields; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.KnnByteVectorQuery; +import org.apache.lucene.search.KnnFloatVectorQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.MMapDirectory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +/** + * Vector global index reader using Apache Lucene 9.x. + * + * <p>This implementation uses Lucene's native KnnFloatVectorQuery with HNSW graph for efficient + * approximate nearest neighbor search. + */ +public class VectorGlobalIndexReader implements GlobalIndexReader { + + private static final int BUFFER_SIZE = 8192; // 8KB buffer for streaming + + private final List<IndexSearcher> searchers; + private final List<Directory> directories; + private final List<java.nio.file.Path> tempDirs; + + public VectorGlobalIndexReader(GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files) + throws IOException { + this.searchers = new ArrayList<>(); + this.directories = new ArrayList<>(); + this.tempDirs = new ArrayList<>(); + loadIndices(fileReader, files); + } + + /** + * Search for similar vectors using Lucene KNN search. + * + * @param query query vector + * @param k number of results + * @return global index result containing row IDs + */ + public GlobalIndexResult search(float[] query, int k) { + KnnFloatVectorQuery knnQuery = new KnnFloatVectorQuery(VectorIndex.VECTOR_FIELD, query, k); + return search(knnQuery, k); + } + + public GlobalIndexResult search(byte[] query, int k) { + KnnByteVectorQuery knnQuery = new KnnByteVectorQuery(VectorIndex.VECTOR_FIELD, query, k); + return search(knnQuery, k); + } + + @Override + public void close() throws IOException { + // Close readers + for (IndexSearcher searcher : searchers) { + searcher.getIndexReader().close(); + } + searchers.clear(); + + // Close directories + for (Directory directory : directories) { + directory.close(); + } + directories.clear(); + + // Clean up temp directories + for (java.nio.file.Path tempDir : tempDirs) { + deleteDirectory(tempDir); + } + tempDirs.clear(); + } + + private GlobalIndexResult search(Query query, int k) { + RoaringNavigableMap64 roaringBitmap64 = new RoaringNavigableMap64(); + for (IndexSearcher searcher : searchers) { + try { + // Execute search + TopDocs topDocs = searcher.search(query, k); + StoredFields storedFields = searcher.storedFields(); + Set<String> fieldsToLoad = Set.of(VectorIndex.ROW_ID_FIELD); + // Collect row IDs from results + for (org.apache.lucene.search.ScoreDoc scoreDoc : topDocs.scoreDocs) { + float rawScore = scoreDoc.score; + Document doc = storedFields.document(scoreDoc.doc, fieldsToLoad); + long rowId = doc.getField(VectorIndex.ROW_ID_FIELD).numericValue().longValue(); + roaringBitmap64.add(rowId); + } + } catch (IOException e) { + throw new RuntimeException("Failed to search vector index", e); + } + } + + return GlobalIndexResult.create(() -> roaringBitmap64); + } + + private void loadIndices(GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files) + throws IOException { + for (GlobalIndexIOMeta meta : files) { + try (SeekableInputStream in = fileReader.getInputStream(meta.fileName())) { + Directory directory = deserializeDirectory(in); + directories.add(directory); + + IndexReader reader = DirectoryReader.open(directory); + IndexSearcher searcher = new IndexSearcher(reader); + searchers.add(searcher); + } + } + } + + private Directory deserializeDirectory(SeekableInputStream in) throws IOException { + // Create temporary directory for MMap + Path tempDir = Files.createTempDirectory("paimon-vector-read" + UUID.randomUUID()); + tempDirs.add(tempDir); + Directory directory = new MMapDirectory(tempDir); + + // Read number of files + int numFiles = readInt(in); + + // Reusable buffer for streaming + byte[] buffer = new byte[BUFFER_SIZE]; + + for (int i = 0; i < numFiles; i++) { + // Read file name + int nameLength = readInt(in); + byte[] nameBytes = new byte[nameLength]; + readFully(in, nameBytes); + String fileName = new String(nameBytes, StandardCharsets.UTF_8); + + // Read file content length + long fileLength = readLong(in); + + // Stream file content directly to directory + try (IndexOutput output = directory.createOutput(fileName, null)) { + long remaining = fileLength; + while (remaining > 0) { + int toRead = (int) Math.min(buffer.length, remaining); + readFully(in, buffer, 0, toRead); + output.writeBytes(buffer, 0, toRead); + remaining -= toRead; + } + } + } + + return directory; + } + + private int readInt(SeekableInputStream in) throws IOException { + byte[] bytes = new byte[4]; + readFully(in, bytes); + return ByteBuffer.wrap(bytes).getInt(); + } + + private long readLong(SeekableInputStream in) throws IOException { + byte[] bytes = new byte[8]; + readFully(in, bytes); + return ByteBuffer.wrap(bytes).getLong(); + } + + private void readFully(SeekableInputStream in, byte[] buffer) throws IOException { + readFully(in, buffer, 0, buffer.length); + } + + private void readFully(SeekableInputStream in, byte[] buffer, int offset, int length) + throws IOException { + int totalRead = 0; + while (totalRead < length) { + int read = in.read(buffer, offset + totalRead, length - totalRead); + if (read == -1) { + throw new IOException("Unexpected end of stream"); + } + totalRead += read; + } + } + + private void deleteDirectory(java.nio.file.Path path) throws IOException { + if (java.nio.file.Files.exists(path)) { + java.nio.file.Files.walk(path) + .sorted(java.util.Comparator.reverseOrder()) + .forEach( + p -> { + try { + java.nio.file.Files.delete(p); + } catch (IOException e) { + // Ignore cleanup errors + } + }); Review Comment: The lambda silently ignores cleanup errors which could mask issues during testing or debugging. Consider at least logging these errors instead of completely ignoring them, or document why silent failure is acceptable here. This is the same issue as in VectorGlobalIndexWriter. ########## paimon-vector/src/main/java/org/apache/paimon/vector/VectorIndexOptions.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.options.ConfigOptions; +import org.apache.paimon.options.Options; + +/** Options for vector index. */ +public class VectorIndexOptions { + + public static final ConfigOption<Integer> VECTOR_DIM = + ConfigOptions.key("vector.dim") + .intType() + .defaultValue(128) + .withDescription("The dimension of the vector"); + + public static final ConfigOption<String> VECTOR_METRIC = + ConfigOptions.key("vector.metric") + .stringType() + .defaultValue("EUCLIDEAN") + .withDescription( + "The similarity metric for vector search (COSINE, DOT_PRODUCT, EUCLIDEAN, MAX_INNER_PRODUCT)"); Review Comment: The default similarity metric is "EUCLIDEAN" but the documentation in the PR description mentions COSINE, DOT_PRODUCT, and EUCLIDEAN without specifying which is the default. Consider documenting why EUCLIDEAN was chosen as the default, as COSINE is often preferred for semantic search use cases mentioned in the PR purpose. ```suggestion "The similarity metric for vector search. Options are COSINE, DOT_PRODUCT, EUCLIDEAN, MAX_INNER_PRODUCT. " + "The default is EUCLIDEAN for compatibility with common vector index algorithms. " + "Note: COSINE is often preferred for semantic search use cases, but EUCLIDEAN is used here as the default for broader applicability."); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
