kaori-seasons commented on code in PR #6773: URL: https://github.com/apache/paimon/pull/6773#discussion_r2601268475
########## paimon-lucene/src/main/java/org/apache/paimon/lucene/index/VectorGlobalIndexReader.java: ########## @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.lucene.index; + +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexReader; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.utils.RoaringNavigableMap64; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.StoredFields; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.KnnByteVectorQuery; +import org.apache.lucene.search.KnnFloatVectorQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.IndexOutput; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Set; + +/** + * Vector global index reader using Apache Lucene 9.x. + * + * <p>This implementation uses Lucene's native KnnFloatVectorQuery with HNSW graph for efficient + * approximate nearest neighbor search. + */ +public class VectorGlobalIndexReader implements GlobalIndexReader { + + private static final int BUFFER_SIZE = 8192; // 8KB buffer for streaming + + private final List<IndexSearcher> searchers; + private final List<IndexMMapDirectory> directories; + + public VectorGlobalIndexReader(GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files) + throws IOException { + this.searchers = new ArrayList<>(); + this.directories = new ArrayList<>(); + loadIndices(fileReader, files); + } + + /** + * Search for similar vectors using Lucene KNN search. + * + * @param query query vector + * @param k number of results + * @return global index result containing row IDs + */ + public GlobalIndexResult search(float[] query, int k) { + KnnFloatVectorQuery knnQuery = new KnnFloatVectorQuery(VectorIndex.VECTOR_FIELD, query, k); + return search(knnQuery, k); + } + + public GlobalIndexResult search(byte[] query, int k) { + KnnByteVectorQuery knnQuery = new KnnByteVectorQuery(VectorIndex.VECTOR_FIELD, query, k); + return search(knnQuery, k); + } + + @Override + public void close() throws IOException { + Throwable firstException = null; + + // Close readers + for (IndexSearcher searcher : searchers) { + try { + searcher.getIndexReader().close(); + } catch (Throwable t) { + if (firstException == null) { + firstException = t; + } else { + firstException.addSuppressed(t); + } + } + } + searchers.clear(); + + // Close directories + for (IndexMMapDirectory directory : directories) { + try { + directory.close(); + } catch (Throwable t) { + if (firstException == null) { + firstException = t; + } else { + firstException.addSuppressed(t); + } + } + } + directories.clear(); + + if (firstException != null) { + if (firstException instanceof IOException) { + throw (IOException) firstException; + } else if (firstException instanceof RuntimeException) { + throw (RuntimeException) firstException; + } else { + throw new RuntimeException( + "Failed to close vector global index reader", firstException); + } + } + } + + private GlobalIndexResult search(Query query, int k) { + PriorityQueue<ScoredRow> topK = + new PriorityQueue<>(Comparator.comparingDouble(sr -> sr.score)); + for (IndexSearcher searcher : searchers) { + try { + TopDocs topDocs = searcher.search(query, k); + StoredFields storedFields = searcher.storedFields(); + Set<String> fieldsToLoad = Set.of(VectorIndex.ROW_ID_FIELD); + for (org.apache.lucene.search.ScoreDoc scoreDoc : topDocs.scoreDocs) { + Document doc = storedFields.document(scoreDoc.doc, fieldsToLoad); + long rowId = doc.getField(VectorIndex.ROW_ID_FIELD).numericValue().longValue(); + if (topK.size() < k) { + topK.offer(new ScoredRow(rowId, scoreDoc.score)); + } else { + if (topK.peek() != null && scoreDoc.score > topK.peek().score) { + topK.poll(); + topK.offer(new ScoredRow(rowId, scoreDoc.score)); + } + } + } + } catch (IOException e) { + throw new RuntimeException("Failed to search vector index", e); + } + } + RoaringNavigableMap64 roaringBitmap64 = new RoaringNavigableMap64(); + for (ScoredRow scoredRow : topK) { + roaringBitmap64.add(scoredRow.rowId); + } + return GlobalIndexResult.create(() -> roaringBitmap64); + } + + /** Helper class to store row ID with its score. */ + private static class ScoredRow { + final long rowId; + final float score; + + ScoredRow(long rowId, float score) { + this.rowId = rowId; + this.score = score; + } + } + + private void loadIndices(GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files) + throws IOException { + for (GlobalIndexIOMeta meta : files) { + try (SeekableInputStream in = fileReader.getInputStream(meta.fileName())) { + IndexMMapDirectory directory = null; + IndexReader reader = null; + boolean success = false; + try { + directory = deserializeDirectory(in); Review Comment: Current situation: `serializeDirectory` / `deserializeDirectory` only write the number of files, the length of each filename, the filename itself, and the file length, then directly write the bytes. There is no magic number, version number, or verification/checksum mechanism. Impact: If the serialization structure is modified in the future, older versions will not be able to recognize it; it's difficult to pinpoint which part is corrupted if the stream is damaged; security/robustness is poor (prone to silent corruption). Recommendation: Add a magic number, version number, and CRC32 for each file (or an overall checksum), and use standard `DataOutputStream`/`DataInputStream` to write `int`/`long`, avoiding manual `ByteBuffer` wrapping. This will be more robust and easier to maintain backward compatibility (using a version field). examples: ``` private static final int MAGIC = 0x50414D4F; // 'PAMO' or any magic private static final short VERSION = 1; private void serializeDirectory(Directory directory, OutputStream out) throws IOException { DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(out)); dos.writeInt(MAGIC); dos.writeShort(VERSION); String[] files = directory.listAll(); dos.writeInt(files.length); for (String fileName : files) { byte[] nameBytes = fileName.getBytes(StandardCharsets.UTF_8); dos.writeInt(nameBytes.length); dos.write(nameBytes); long fileLength = directory.fileLength(fileName); dos.writeLong(fileLength); try (IndexInput input = directory.openInput(fileName, IOContext.DEFAULT)) { byte[] buffer = new byte[32 * 1024]; long remaining = fileLength; CRC32 crc = new CRC32(); while (remaining > 0) { int toRead = (int) Math.min(buffer.length, remaining); input.readBytes(buffer, 0, toRead); dos.write(buffer, 0, toRead); crc.update(buffer, 0, toRead); remaining -= toRead; } dos.writeLong(crc.getValue()); // per-file checksum } } dos.flush(); } ``` And the corresponding `deserializeDirectory` function (which reads the magic number/version and verifies the CRC). ``` private IndexMMapDirectory deserializeDirectory(SeekableInputStream in) throws IOException { DataInputStream dis = new DataInputStream(new BufferedInputStream(in)); int magic = dis.readInt(); if (magic != MAGIC) { throw new IOException("Invalid vector index file magic: " + Integer.toHexString(magic)); } short version = dis.readShort(); if (version != VERSION) { throw new IOException("Unsupported vector index version: " + version); } IndexMMapDirectory indexMMapDirectory = new IndexMMapDirectory(); try { int numFiles = dis.readInt(); byte[] buffer = new byte[BUFFER_SIZE]; for (int i = 0; i < numFiles; i++) { int nameLength = dis.readInt(); byte[] nameBytes = new byte[nameLength]; dis.readFully(nameBytes); String fileName = new String(nameBytes, StandardCharsets.UTF_8); long fileLength = dis.readLong(); long expectedCrc = -1L; // We need to stream file data to index output, and compute CRC while reading try (IndexOutput output = indexMMapDirectory.directory().createOutput(fileName, IOContext.DEFAULT)) { long remaining = fileLength; CRC32 crc = new CRC32(); while (remaining > 0) { int toRead = (int) Math.min(buffer.length, remaining); dis.readFully(buffer, 0, toRead); output.writeBytes(buffer, 0, toRead); crc.update(buffer, 0, toRead); remaining -= toRead; } expectedCrc = dis.readLong(); if (crc.getValue() != expectedCrc) { throw new IOException("CRC mismatch for file " + fileName); } } } return indexMMapDirectory; } catch (Exception e) { try { indexMMapDirectory.close(); } catch (Exception ee) { // ignore or add suppressed e.addSuppressed(ee); } if (e instanceof IOException) { throw (IOException) e; } else { throw new IOException("Failed to deserialize directory", e); } } } ``` ########## paimon-lucene/src/main/java/org/apache/paimon/lucene/index/IndexMMapDirectory.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.lucene.index; + +import org.apache.lucene.store.MMapDirectory; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.UUID; + +/** A wrapper of MMapDirectory for vector index. */ +public class IndexMMapDirectory implements AutoCloseable { + private final Path path; + private final MMapDirectory mmapDirectory; + + public IndexMMapDirectory() throws IOException { + this.path = java.nio.file.Files.createTempDirectory("paimon-lucene-" + UUID.randomUUID()); + this.mmapDirectory = new MMapDirectory(path); + } + + public MMapDirectory directory() { + return mmapDirectory; + } + + public void close() throws Exception { Review Comment: Current situation: IndexMMapDirectory creates a temporary directory. `close` calls `mmapDirectory.close()` and then `walk` to delete it. However: On Windows, the mmap file might be occupied and cannot be deleted, and `close` might not completely release the mapping (explicit unmap is usually not possible at the JVM level). The `close` method signature throws an `Exception(AutoCloseable)` and swallows the deletion exception (ignoring cleanup errors), without warnings or rollbacks. Recommendation: Throw an `IOException` (or at least log/throw) with `close`, instead of swallowing the exception completely. As a fallback when deletion fails: call `Files.walk(...).forEach(p -> p.toFile().deleteOnExit())`. This guarantees cleanup at process termination (although not ideal). On Windows platforms, you can fall back to using FSDirectory (not mmap) or retry several times and wait a short time if deletion fails. Create the temporary directory in a more controllable parent directory or allow injection of a base temp dir (for easier testing and cleanup). example ``` public class IndexMMapDirectory implements AutoCloseable { private final Path path; private final MMapDirectory mmapDirectory; public IndexMMapDirectory() throws IOException { this.path = Files.createTempDirectory("paimon-lucene-" + UUID.randomUUID()); this.mmapDirectory = new MMapDirectory(path); this.path.toFile().deleteOnExit(); // best-effort } public MMapDirectory directory() { return mmapDirectory; } @Override public void close() throws IOException { IOException firstEx = null; try { mmapDirectory.close(); } catch (IOException e) { firstEx = e; } // attempt to delete files; if fails, mark deleteOnExit try { if (Files.exists(path)) { Files.walk(path) .sorted(Comparator.reverseOrder()) .forEach(p -> { try { Files.delete(p); } catch (IOException e) { // fallback: schedule delete on exit p.toFile().deleteOnExit(); } }); } } catch (IOException e) { if (firstEx == null) { firstEx = e; } else { firstEx.addSuppressed(e); } } if (firstEx != null) { throw firstEx; } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
