Copilot commented on code in PR #6716: URL: https://github.com/apache/paimon/pull/6716#discussion_r2591981788
########## paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexWriter.java: ########## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.paimon.globalindex.GlobalIndexWriter; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.utils.Range; + +import org.apache.lucene.codecs.KnnVectorsFormat; +import org.apache.lucene.codecs.lucene912.Lucene912Codec; +import org.apache.lucene.codecs.lucene99.Lucene99HnswScalarQuantizedVectorsFormat; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.VectorSimilarityFunction; +import org.apache.lucene.store.Directory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * Vector global index writer using Apache Lucene 9.x. + * + * <p>This implementation uses Lucene's native KnnFloatVectorField with HNSW algorithm for efficient + * approximate nearest neighbor search. + */ +public class VectorGlobalIndexWriter implements GlobalIndexWriter { + + private final GlobalIndexFileWriter fileWriter; + private final DataType fieldType; + private final VectorIndexOptions vectorOptions; + private final VectorSimilarityFunction similarityFunction; + private final int sizePerIndex; + + private final List<VectorIndex> vectors; + + public VectorGlobalIndexWriter( + GlobalIndexFileWriter fileWriter, DataType fieldType, Options options) { + checkArgument( + fieldType instanceof ArrayType, + "Vector field type must be ARRAY, but was: " + fieldType); + this.fileWriter = fileWriter; + this.fieldType = fieldType; + this.vectors = new ArrayList<>(); + this.vectorOptions = new VectorIndexOptions(options); + this.similarityFunction = parseMetricToLucene(vectorOptions.metric()); + this.sizePerIndex = vectorOptions.sizePerIndex(); + } + + @Override + public void write(Object key) { + if (key instanceof FloatVectorIndex) { + FloatVectorIndex vectorKey = (FloatVectorIndex) key; + float[] vector = vectorKey.vector(); + + checkArgument( + vector.length == vectorOptions.dimension(), + "Vector dimension mismatch: expected " + + vectorOptions.dimension() + + ", but got " + + vector.length); + + vectors.add(vectorKey); + } else if (key instanceof ByteVectorIndex) { + ByteVectorIndex vectorKey = (ByteVectorIndex) key; + byte[] byteVector = vectorKey.vector(); + + checkArgument( + byteVector.length == vectorOptions.dimension(), + "Vector dimension mismatch: expected " + + vectorOptions.dimension() + + ", but got " + + byteVector.length); + + vectors.add(vectorKey); + } else { + throw new IllegalArgumentException( + "Unsupported index type: " + key.getClass().getName()); + } + } + + @Override + public List<ResultEntry> finish() { + try { + if (vectors.isEmpty()) { + return new ArrayList<>(); + } + + List<ResultEntry> results = new ArrayList<>(); + + // Split vectors into batches if size exceeds sizePerIndex + int totalVectors = vectors.size(); + int numBatches = (int) Math.ceil((double) totalVectors / sizePerIndex); + + for (int batchIndex = 0; batchIndex < numBatches; batchIndex++) { + int startIdx = batchIndex * sizePerIndex; + int endIdx = Math.min(startIdx + sizePerIndex, totalVectors); + List<VectorIndex> batchVectors = vectors.subList(startIdx, endIdx); + + // Build index + byte[] indexBytes = + buildIndex( + batchVectors, + this.vectorOptions.m(), + this.vectorOptions.efConstruction(), + this.vectorOptions.writeBufferSize()); + + // Write to file + String fileName = fileWriter.newFileName(VectorGlobalIndexerFactory.IDENTIFIER); + try (OutputStream out = fileWriter.newOutputStream(fileName)) { + out.write(indexBytes); + } + long minRowIdInBatch = batchVectors.get(0).rowId(); + long maxRowIdInBatch = batchVectors.get(batchVectors.size() - 1).rowId(); + results.add( + ResultEntry.of( + fileName, null, new Range(minRowIdInBatch, maxRowIdInBatch))); + } + + return results; + } catch (IOException e) { + throw new RuntimeException("Failed to write vector global index", e); + } + } + + private VectorSimilarityFunction parseMetricToLucene(String metric) { + switch (metric.toUpperCase()) { + case "COSINE": + return VectorSimilarityFunction.COSINE; + case "DOT_PRODUCT": + return VectorSimilarityFunction.DOT_PRODUCT; + case "EUCLIDEAN": + return VectorSimilarityFunction.EUCLIDEAN; + case "MAX_INNER_PRODUCT": + return VectorSimilarityFunction.MAXIMUM_INNER_PRODUCT; + default: + throw new IllegalArgumentException("Unsupported metric: " + metric); + } + } + + private byte[] buildIndex( + List<VectorIndex> batchVectors, int m, int efConstruction, int writeBufferSize) + throws IOException { + + try (IndexMMapDirectory indexMMapDirectory = new IndexMMapDirectory()) { + // Configure index writer + IndexWriterConfig config = getIndexWriterConfig(m, efConstruction, writeBufferSize); + + try (IndexWriter writer = new IndexWriter(indexMMapDirectory.directory(), config)) { + // Add each vector as a document + for (VectorIndex vectorIndex : batchVectors) { + Document doc = new Document(); + + // Add KNN vector field + doc.add(vectorIndex.indexableField(similarityFunction)); + + // Store row ID + doc.add(vectorIndex.rowIdStoredField()); + + writer.addDocument(doc); + } + + // Commit changes + writer.commit(); + } + + // Serialize directory to byte array + return serializeDirectory(indexMMapDirectory.directory()); + } catch (Exception e) { + throw new RuntimeException(e); Review Comment: The generic `Exception` is caught and wrapped in a `RuntimeException`, but this loses specific error information. The try-with-resources already handles `IOException` properly. Consider either removing this catch block or being more specific about which exceptions need to be converted, as the `buildIndex` method already declares `throws IOException`. ```suggestion ``` ########## paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexReader.java: ########## @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexReader; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.utils.RoaringNavigableMap64; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.StoredFields; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.KnnByteVectorQuery; +import org.apache.lucene.search.KnnFloatVectorQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.IndexOutput; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Set; + +/** + * Vector global index reader using Apache Lucene 9.x. + * + * <p>This implementation uses Lucene's native KnnFloatVectorQuery with HNSW graph for efficient + * approximate nearest neighbor search. + */ +public class VectorGlobalIndexReader implements GlobalIndexReader { + + private static final int BUFFER_SIZE = 8192; // 8KB buffer for streaming + + private final List<IndexSearcher> searchers; + private final List<IndexMMapDirectory> directories; + + public VectorGlobalIndexReader(GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files) + throws IOException { + this.searchers = new ArrayList<>(); + this.directories = new ArrayList<>(); + loadIndices(fileReader, files); + } + + /** + * Search for similar vectors using Lucene KNN search. + * + * @param query query vector + * @param k number of results + * @return global index result containing row IDs + */ + public GlobalIndexResult search(float[] query, int k) { + KnnFloatVectorQuery knnQuery = new KnnFloatVectorQuery(VectorIndex.VECTOR_FIELD, query, k); + return search(knnQuery, k); + } + + public GlobalIndexResult search(byte[] query, int k) { + KnnByteVectorQuery knnQuery = new KnnByteVectorQuery(VectorIndex.VECTOR_FIELD, query, k); + return search(knnQuery, k); + } + + @Override + public void close() throws IOException { + // Close readers + for (IndexSearcher searcher : searchers) { + searcher.getIndexReader().close(); + } + searchers.clear(); + + // Close directories + for (IndexMMapDirectory directory : directories) { + try { + directory.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + directories.clear(); + } + + private GlobalIndexResult search(Query query, int k) { + PriorityQueue<ScoredRow> topK = + new PriorityQueue<>(Comparator.comparingDouble(sr -> sr.score)); + for (IndexSearcher searcher : searchers) { + try { + TopDocs topDocs = searcher.search(query, k); + StoredFields storedFields = searcher.storedFields(); + Set<String> fieldsToLoad = Set.of(VectorIndex.ROW_ID_FIELD); + for (org.apache.lucene.search.ScoreDoc scoreDoc : topDocs.scoreDocs) { + Document doc = storedFields.document(scoreDoc.doc, fieldsToLoad); + long rowId = doc.getField(VectorIndex.ROW_ID_FIELD).numericValue().longValue(); + if (topK.size() < k) { + topK.offer(new ScoredRow(rowId, scoreDoc.score)); + } else { + if (topK.peek() != null && scoreDoc.score > topK.peek().score) { + topK.poll(); + topK.offer(new ScoredRow(rowId, scoreDoc.score)); + } + } + } + } catch (IOException e) { + throw new RuntimeException("Failed to search vector index", e); + } + } + RoaringNavigableMap64 roaringBitmap64 = new RoaringNavigableMap64(); + for (ScoredRow scoredRow : topK) { + roaringBitmap64.add(scoredRow.rowId); + } + return GlobalIndexResult.create(() -> roaringBitmap64); + } + + /** Helper class to store row ID with its score. */ + private static class ScoredRow { + final long rowId; + final float score; + + ScoredRow(long rowId, float score) { + this.rowId = rowId; + this.score = score; + } + } + + private void loadIndices(GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files) + throws IOException { + for (GlobalIndexIOMeta meta : files) { + try (SeekableInputStream in = fileReader.getInputStream(meta.fileName())) { + IndexMMapDirectory directory = deserializeDirectory(in); + directories.add(directory); + IndexReader reader = DirectoryReader.open(directory.directory()); + IndexSearcher searcher = new IndexSearcher(reader); + searchers.add(searcher); + } + } + } + + private IndexMMapDirectory deserializeDirectory(SeekableInputStream in) throws IOException { + IndexMMapDirectory indexMMapDirectory = new IndexMMapDirectory(); + + // Read number of files + int numFiles = readInt(in); + + // Reusable buffer for streaming + byte[] buffer = new byte[BUFFER_SIZE]; + + for (int i = 0; i < numFiles; i++) { + // Read file name + int nameLength = readInt(in); + byte[] nameBytes = new byte[nameLength]; + readFully(in, nameBytes); + String fileName = new String(nameBytes, StandardCharsets.UTF_8); + + // Read file content length + long fileLength = readLong(in); + + // Stream file content directly to directory + try (IndexOutput output = indexMMapDirectory.directory().createOutput(fileName, null)) { + long remaining = fileLength; + while (remaining > 0) { + int toRead = (int) Math.min(buffer.length, remaining); + readFully(in, buffer, 0, toRead); + output.writeBytes(buffer, 0, toRead); + remaining -= toRead; + } + } + } + + return indexMMapDirectory; + } + + private int readInt(SeekableInputStream in) throws IOException { + byte[] bytes = new byte[4]; + readFully(in, bytes); + return ByteBuffer.wrap(bytes).getInt(); + } + + private long readLong(SeekableInputStream in) throws IOException { + byte[] bytes = new byte[8]; + readFully(in, bytes); + return ByteBuffer.wrap(bytes).getLong(); + } + + private void readFully(SeekableInputStream in, byte[] buffer) throws IOException { + readFully(in, buffer, 0, buffer.length); + } + + private void readFully(SeekableInputStream in, byte[] buffer, int offset, int length) + throws IOException { + int totalRead = 0; + while (totalRead < length) { + int read = in.read(buffer, offset + totalRead, length - totalRead); + if (read == -1) { + throw new IOException("Unexpected end of stream"); + } + totalRead += read; + } + } + + private void deleteDirectory(java.nio.file.Path path) throws IOException { + if (java.nio.file.Files.exists(path)) { + java.nio.file.Files.walk(path) + .sorted(java.util.Comparator.reverseOrder()) + .forEach( + p -> { + try { + java.nio.file.Files.delete(p); + } catch (IOException e) { + // Ignore cleanup errors + } + }); + } + } + Review Comment: This method `deleteDirectory` is defined but never called anywhere in the class. It appears to be dead code that should be removed. ```suggestion ``` ########## paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexWriter.java: ########## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.paimon.globalindex.GlobalIndexWriter; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.utils.Range; + +import org.apache.lucene.codecs.KnnVectorsFormat; +import org.apache.lucene.codecs.lucene912.Lucene912Codec; +import org.apache.lucene.codecs.lucene99.Lucene99HnswScalarQuantizedVectorsFormat; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.VectorSimilarityFunction; +import org.apache.lucene.store.Directory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * Vector global index writer using Apache Lucene 9.x. + * + * <p>This implementation uses Lucene's native KnnFloatVectorField with HNSW algorithm for efficient + * approximate nearest neighbor search. + */ Review Comment: The class-level documentation mentions "Apache Lucene 9.x" but the actual dependency uses version 9.12.3 specifically. Consider being more specific in the documentation or verify compatibility across all 9.x versions if that's the intent. ########## paimon-vector/src/test/java/org/apache/paimon/vector/VectorGlobalIndexTest.java: ########## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.GlobalIndexWriter; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.FloatType; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link VectorGlobalIndexWriter} and {@link VectorGlobalIndexReader}. */ +public class VectorGlobalIndexTest { + + @TempDir java.nio.file.Path tempDir; + + private FileIO fileIO; + private Path indexPath; + private DataType vectorType; + + @BeforeEach + public void setup() { + fileIO = new LocalFileIO(); + indexPath = new Path(tempDir.toString()); + vectorType = new ArrayType(new FloatType()); + } + + @AfterEach + public void cleanup() throws IOException { + if (fileIO != null) { + fileIO.delete(indexPath, true); + } + } + + private GlobalIndexFileWriter createFileWriter(Path path) { + return new GlobalIndexFileWriter() { + @Override + public String newFileName(String prefix) { + return prefix + "-" + UUID.randomUUID(); + } + + @Override + public OutputStream newOutputStream(String fileName) throws IOException { + return fileIO.newOutputStream(new Path(path, fileName), false); + } + }; + } + + private GlobalIndexFileReader createFileReader(Path path) { + return fileName -> fileIO.newInputStream(new Path(path, fileName)); + } + + @Test + public void testDifferentSimilarityFunctions() throws IOException { + int dimension = 32; + int numVectors = 20; + + String[] metrics = {"COSINE", "DOT_PRODUCT", "EUCLIDEAN"}; Review Comment: The test for different similarity functions doesn't validate "MAX_INNER_PRODUCT" / "MAXIMUM_INNER_PRODUCT" metric mentioned in the documentation. Consider adding it to the `metrics` array to ensure full coverage of supported metrics. ########## paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexReader.java: ########## @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexReader; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.utils.RoaringNavigableMap64; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.StoredFields; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.KnnByteVectorQuery; +import org.apache.lucene.search.KnnFloatVectorQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.IndexOutput; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Set; + +/** + * Vector global index reader using Apache Lucene 9.x. + * + * <p>This implementation uses Lucene's native KnnFloatVectorQuery with HNSW graph for efficient + * approximate nearest neighbor search. + */ +public class VectorGlobalIndexReader implements GlobalIndexReader { + + private static final int BUFFER_SIZE = 8192; // 8KB buffer for streaming + + private final List<IndexSearcher> searchers; + private final List<IndexMMapDirectory> directories; + + public VectorGlobalIndexReader(GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files) + throws IOException { + this.searchers = new ArrayList<>(); + this.directories = new ArrayList<>(); + loadIndices(fileReader, files); + } + + /** + * Search for similar vectors using Lucene KNN search. + * + * @param query query vector + * @param k number of results + * @return global index result containing row IDs + */ + public GlobalIndexResult search(float[] query, int k) { + KnnFloatVectorQuery knnQuery = new KnnFloatVectorQuery(VectorIndex.VECTOR_FIELD, query, k); + return search(knnQuery, k); + } + + public GlobalIndexResult search(byte[] query, int k) { + KnnByteVectorQuery knnQuery = new KnnByteVectorQuery(VectorIndex.VECTOR_FIELD, query, k); + return search(knnQuery, k); + } + + @Override + public void close() throws IOException { + // Close readers + for (IndexSearcher searcher : searchers) { + searcher.getIndexReader().close(); + } + searchers.clear(); + + // Close directories + for (IndexMMapDirectory directory : directories) { + try { + directory.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } Review Comment: The generic `Exception` catch block wraps any exception from `directory.close()` in a `RuntimeException`, but the `close()` method signature already allows `IOException` to propagate. This could mask unexpected exceptions. Consider handling only `IOException` or letting exceptions propagate naturally. ```suggestion directory.close(); ``` ########## .github/workflows/utitcase-paimon-vector.yml: ########## @@ -0,0 +1,65 @@ +################################################################################ Review Comment: The filename contains a typo: "utitcase" should be "unittest" or "unitcase". This should also be corrected in the workflow name on line 19. ########## paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexReader.java: ########## @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexReader; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.utils.RoaringNavigableMap64; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.StoredFields; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.KnnByteVectorQuery; +import org.apache.lucene.search.KnnFloatVectorQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.IndexOutput; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Set; + +/** + * Vector global index reader using Apache Lucene 9.x. + * + * <p>This implementation uses Lucene's native KnnFloatVectorQuery with HNSW graph for efficient + * approximate nearest neighbor search. + */ Review Comment: The class-level documentation mentions "Apache Lucene 9.x" but the actual dependency uses version 9.12.3 specifically. Consider being more specific in the documentation or verify compatibility across all 9.x versions if that's the intent. ########## paimon-vector/src/main/java/org/apache/paimon/vector/VectorGlobalIndexWriter.java: ########## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.paimon.globalindex.GlobalIndexWriter; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.utils.Range; + +import org.apache.lucene.codecs.KnnVectorsFormat; +import org.apache.lucene.codecs.lucene912.Lucene912Codec; +import org.apache.lucene.codecs.lucene99.Lucene99HnswScalarQuantizedVectorsFormat; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.VectorSimilarityFunction; +import org.apache.lucene.store.Directory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * Vector global index writer using Apache Lucene 9.x. + * + * <p>This implementation uses Lucene's native KnnFloatVectorField with HNSW algorithm for efficient + * approximate nearest neighbor search. + */ +public class VectorGlobalIndexWriter implements GlobalIndexWriter { + + private final GlobalIndexFileWriter fileWriter; + private final DataType fieldType; + private final VectorIndexOptions vectorOptions; + private final VectorSimilarityFunction similarityFunction; + private final int sizePerIndex; + + private final List<VectorIndex> vectors; + + public VectorGlobalIndexWriter( + GlobalIndexFileWriter fileWriter, DataType fieldType, Options options) { + checkArgument( + fieldType instanceof ArrayType, + "Vector field type must be ARRAY, but was: " + fieldType); + this.fileWriter = fileWriter; + this.fieldType = fieldType; + this.vectors = new ArrayList<>(); + this.vectorOptions = new VectorIndexOptions(options); + this.similarityFunction = parseMetricToLucene(vectorOptions.metric()); + this.sizePerIndex = vectorOptions.sizePerIndex(); + } + + @Override + public void write(Object key) { + if (key instanceof FloatVectorIndex) { + FloatVectorIndex vectorKey = (FloatVectorIndex) key; + float[] vector = vectorKey.vector(); + + checkArgument( + vector.length == vectorOptions.dimension(), + "Vector dimension mismatch: expected " + + vectorOptions.dimension() + + ", but got " + + vector.length); + + vectors.add(vectorKey); + } else if (key instanceof ByteVectorIndex) { + ByteVectorIndex vectorKey = (ByteVectorIndex) key; + byte[] byteVector = vectorKey.vector(); + + checkArgument( + byteVector.length == vectorOptions.dimension(), + "Vector dimension mismatch: expected " + + vectorOptions.dimension() + + ", but got " + + byteVector.length); + + vectors.add(vectorKey); + } else { + throw new IllegalArgumentException( + "Unsupported index type: " + key.getClass().getName()); + } + } + + @Override + public List<ResultEntry> finish() { + try { + if (vectors.isEmpty()) { + return new ArrayList<>(); + } + + List<ResultEntry> results = new ArrayList<>(); + + // Split vectors into batches if size exceeds sizePerIndex + int totalVectors = vectors.size(); + int numBatches = (int) Math.ceil((double) totalVectors / sizePerIndex); + + for (int batchIndex = 0; batchIndex < numBatches; batchIndex++) { + int startIdx = batchIndex * sizePerIndex; + int endIdx = Math.min(startIdx + sizePerIndex, totalVectors); + List<VectorIndex> batchVectors = vectors.subList(startIdx, endIdx); + + // Build index + byte[] indexBytes = + buildIndex( + batchVectors, + this.vectorOptions.m(), + this.vectorOptions.efConstruction(), + this.vectorOptions.writeBufferSize()); + + // Write to file + String fileName = fileWriter.newFileName(VectorGlobalIndexerFactory.IDENTIFIER); + try (OutputStream out = fileWriter.newOutputStream(fileName)) { + out.write(indexBytes); + } + long minRowIdInBatch = batchVectors.get(0).rowId(); + long maxRowIdInBatch = batchVectors.get(batchVectors.size() - 1).rowId(); + results.add( + ResultEntry.of( + fileName, null, new Range(minRowIdInBatch, maxRowIdInBatch))); + } + + return results; + } catch (IOException e) { + throw new RuntimeException("Failed to write vector global index", e); + } + } + + private VectorSimilarityFunction parseMetricToLucene(String metric) { + switch (metric.toUpperCase()) { + case "COSINE": + return VectorSimilarityFunction.COSINE; + case "DOT_PRODUCT": + return VectorSimilarityFunction.DOT_PRODUCT; + case "EUCLIDEAN": + return VectorSimilarityFunction.EUCLIDEAN; + case "MAX_INNER_PRODUCT": + return VectorSimilarityFunction.MAXIMUM_INNER_PRODUCT; + default: + throw new IllegalArgumentException("Unsupported metric: " + metric); + } Review Comment: The case "MAX_INNER_PRODUCT" will never match because the documentation uses this name, but the method uses `toUpperCase()` on the input. However, in line 160 the constant is "MAXIMUM_INNER_PRODUCT", not "MAX_INNER_PRODUCT". Users following the documentation would pass "MAX_INNER_PRODUCT" which wouldn't match any case. Either add "MAX_INNER_PRODUCT" as an alias case or update the documentation to use "MAXIMUM_INNER_PRODUCT". ########## paimon-vector/src/main/java/org/apache/paimon/vector/VectorIndexOptions.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector; + +import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.options.ConfigOptions; +import org.apache.paimon.options.Options; + +/** Options for vector index. */ +public class VectorIndexOptions { + + public static final ConfigOption<Integer> VECTOR_DIM = + ConfigOptions.key("vector.dim") + .intType() + .defaultValue(128) + .withDescription("The dimension of the vector"); + + public static final ConfigOption<String> VECTOR_METRIC = + ConfigOptions.key("vector.metric") + .stringType() + .defaultValue("EUCLIDEAN") + .withDescription( + "The similarity metric for vector search (COSINE, DOT_PRODUCT, EUCLIDEAN, MAX_INNER_PRODUCT), and EUCLIDEAN is the default"); + + public static final ConfigOption<Integer> VECTOR_M = + ConfigOptions.key("vector.m") + .intType() + .defaultValue(16) + .withDescription( + "The maximum number of connections for each element during the index construction"); + + public static final ConfigOption<Integer> VECTOR_EF_CONSTRUCTION = + ConfigOptions.key("vector.ef-construction") + .intType() + .defaultValue(100) + .withDescription( + "The size of the dynamic candidate list during the index construction"); + + public static final ConfigOption<Integer> VECTOR_SIZE_PER_INDEX = + ConfigOptions.key("vector.size-per-index") + .intType() + .defaultValue(10000) + .withDescription("The size of vectors stored in each vector index file"); + + public static final ConfigOption<Integer> VECTOR_WRITE_BUFFER_SIZE = + ConfigOptions.key("vector.write-buffer-size") + .intType() + .defaultValue(256) + .withDescription("write buffer size in MB for vector index"); + + private final int dimension; + private final String metric; + private final int m; + private final int efConstruction; + private final int sizePerIndex; + private final int writeBufferSize; + + public VectorIndexOptions(Options options) { + this.dimension = options.get(VECTOR_DIM); + this.metric = options.get(VECTOR_METRIC); + this.m = options.get(VECTOR_M); + this.efConstruction = options.get(VECTOR_EF_CONSTRUCTION); + this.sizePerIndex = + options.get(VECTOR_SIZE_PER_INDEX) > 0 + ? options.get(VECTOR_SIZE_PER_INDEX) + : VECTOR_SIZE_PER_INDEX.defaultValue(); Review Comment: The ternary expression checks if `options.get(VECTOR_SIZE_PER_INDEX) > 0` and if so, calls `options.get(VECTOR_SIZE_PER_INDEX)` again. This redundant call could be simplified by storing the value in a variable: `int size = options.get(VECTOR_SIZE_PER_INDEX); this.sizePerIndex = size > 0 ? size : VECTOR_SIZE_PER_INDEX.defaultValue();` ```suggestion int size = options.get(VECTOR_SIZE_PER_INDEX); this.sizePerIndex = size > 0 ? size : VECTOR_SIZE_PER_INDEX.defaultValue(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
