Copilot commented on code in PR #7330: URL: https://github.com/apache/paimon/pull/7330#discussion_r2909066102
########## paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexReader.java: ########## @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.lumina.index; + +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexReader; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.VectorSearch; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.utils.IOUtils; +import org.apache.paimon.utils.RoaringNavigableMap64; + +import org.aliyun.lumina.LuminaFileInput; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.PriorityQueue; + +/** + * Vector global index reader using Lumina. + * + * <p>This reader loads Lumina indices from global index files and performs vector similarity + * search. + */ +public class LuminaVectorGlobalIndexReader implements GlobalIndexReader { + + private final LuminaIndex[] indices; + private final LuminaIndexMeta[] indexMetas; + private final List<SeekableInputStream> openStreams; + private final List<GlobalIndexIOMeta> ioMetas; + private final GlobalIndexFileReader fileReader; + private final DataType fieldType; + private final LuminaVectorIndexOptions options; + private volatile boolean metasLoaded = false; + private volatile boolean indicesLoaded = false; + + public LuminaVectorGlobalIndexReader( + GlobalIndexFileReader fileReader, + List<GlobalIndexIOMeta> ioMetas, + DataType fieldType, + LuminaVectorIndexOptions options) { + this.fileReader = fileReader; + this.ioMetas = ioMetas; + this.fieldType = fieldType; + this.options = options; + this.indices = new LuminaIndex[ioMetas.size()]; + this.indexMetas = new LuminaIndexMeta[ioMetas.size()]; + this.openStreams = Collections.synchronizedList(new ArrayList<>()); + } + + @Override + public Optional<GlobalIndexResult> visitVectorSearch(VectorSearch vectorSearch) { + try { + ensureLoadMetas(); + + RoaringNavigableMap64 includeRowIds = vectorSearch.includeRowIds(); + + // Fix #12: Capture loaded indices snapshot under synchronized to ensure visibility. + LuminaIndex[] loadedIndices; + LuminaIndexMeta[] loadedMetas; + + if (includeRowIds != null) { + List<Integer> matchingIndices = new ArrayList<>(); + for (int i = 0; i < indexMetas.length; i++) { + LuminaIndexMeta meta = indexMetas[i]; + // Fix #3: Renamed from containsRange to intersectsRange. + if (includeRowIds.intersectsRange(meta.minId(), meta.maxId())) { + matchingIndices.add(i); + } + } + if (matchingIndices.isEmpty()) { + return Optional.empty(); + } + ensureLoadIndices(matchingIndices); + } else { + ensureLoadAllIndices(); + } + + // Fix #12: Take a snapshot of indices/metas under lock for visibility. + synchronized (this) { + loadedIndices = indices.clone(); + loadedMetas = indexMetas.clone(); + } + + return Optional.ofNullable(search(vectorSearch, loadedIndices, loadedMetas)); + } catch (IOException e) { + throw new RuntimeException( + String.format( + "Failed to search Lumina vector index with fieldName=%s, limit=%d", + vectorSearch.fieldName(), vectorSearch.limit()), + e); + } + } + + private GlobalIndexResult search( + VectorSearch vectorSearch, LuminaIndex[] loadedIndices, LuminaIndexMeta[] loadedMetas) + throws IOException { + validateVectorType(vectorSearch.vector()); + float[] queryVector = ((float[]) vectorSearch.vector()).clone(); + int limit = vectorSearch.limit(); + + // Min-heap: smallest score at head, so we can evict the weakest candidate efficiently. + PriorityQueue<ScoredRow> topK = + new PriorityQueue<>(limit + 1, Comparator.comparingDouble(s -> s.score)); + + RoaringNavigableMap64 includeRowIds = vectorSearch.includeRowIds(); + + // Fix #10: Materialize filter IDs per-shard from bitmap, avoiding full materialization. + + Map<String, String> filterSearchOptions = null; + Map<String, String> plainSearchOptions = null; + if (includeRowIds != null) { + filterSearchOptions = new LinkedHashMap<>(); + // Fix #7: Use long arithmetic to avoid integer overflow. + int listSize = (int) Math.min((long) limit * options.searchFactor(), Integer.MAX_VALUE); + listSize = Math.max(listSize, options.searchListSize()); + filterSearchOptions.put("diskann.search.list_size", String.valueOf(listSize)); + filterSearchOptions.put("search.thread_safe_filter", "true"); + } else { + plainSearchOptions = new LinkedHashMap<>(); + int listSize = Math.max(limit, options.searchListSize()); + plainSearchOptions.put("diskann.search.list_size", String.valueOf(listSize)); + } + + for (int i = 0; i < loadedIndices.length; i++) { + LuminaIndex index = loadedIndices[i]; + if (index == null) { + continue; + } + + int effectiveK = (int) Math.min(limit, index.size()); + if (effectiveK <= 0) { + continue; + } + + if (includeRowIds != null) { + LuminaIndexMeta meta = loadedMetas[i]; + // Fix #10: Extract filter IDs for this shard directly from bitmap. + long[] scopedIds = extractIdsInRange(includeRowIds, meta.minId(), meta.maxId()); + if (scopedIds.length == 0) { + continue; + } + effectiveK = (int) Math.min(effectiveK, scopedIds.length); + + float[] distances = new float[effectiveK]; + long[] labels = new long[effectiveK]; + index.searchWithFilter( + queryVector, + 1, + effectiveK, + distances, + labels, + scopedIds, + filterSearchOptions); + collectResults(distances, labels, effectiveK, limit, topK); + } else { + float[] distances = new float[effectiveK]; + long[] labels = new long[effectiveK]; + index.search(queryVector, 1, effectiveK, distances, labels, plainSearchOptions); + collectResults(distances, labels, effectiveK, limit, topK); + } + } + + RoaringNavigableMap64 roaringBitmap64 = new RoaringNavigableMap64(); + HashMap<Long, Float> id2scores = new HashMap<>(topK.size()); + for (ScoredRow row : topK) { + roaringBitmap64.add(row.rowId); + id2scores.put(row.rowId, row.score); + } + return new LuminaScoredGlobalIndexResult(roaringBitmap64, id2scores); + } + + /** + * Collect search results into a min-heap of size {@code limit}. The heap keeps the top-k + * highest-scoring rows; rows with score lower than the current minimum are discarded once the + * heap is full. + */ + private void collectResults( + float[] distances, long[] labels, int count, int limit, PriorityQueue<ScoredRow> topK) { + for (int i = 0; i < count; i++) { + long rowId = labels[i]; + if (rowId < 0) { + continue; + } + float score = convertDistanceToScore(distances[i]); + if (topK.size() < limit) { + topK.offer(new ScoredRow(rowId, score)); + } else if (score > topK.peek().score) { + topK.poll(); + topK.offer(new ScoredRow(rowId, score)); + } + } + } + + /** + * Extract IDs from the bitmap that fall within [minId, maxId] without materializing the entire + * bitmap. Uses the bitmap iterator and collects only IDs in the given range. + */ + private static long[] extractIdsInRange(RoaringNavigableMap64 bitmap, long minId, long maxId) { + List<Long> ids = new ArrayList<>(); + for (long id : bitmap) { + if (id > maxId) { + break; + } + if (id >= minId) { + ids.add(id); + } + } + long[] result = new long[ids.size()]; + for (int i = 0; i < ids.size(); i++) { + result[i] = ids.get(i); + } + return result; + } + + private float convertDistanceToScore(float distance) { + if (options.metric() == LuminaVectorMetric.L2) { + return 1.0f / (1.0f + distance); + } else if (options.metric() == LuminaVectorMetric.COSINE) { + // Cosine distance is in [0, 2]; convert to similarity in [-1, 1] + return 1.0f - distance; + } else { + // Inner product is already a similarity + return distance; + } + } + + private void validateVectorType(Object vector) { + if (!(vector instanceof float[])) { + throw new IllegalArgumentException( + "Expected float[] vector but got: " + vector.getClass()); + } + if (!(fieldType instanceof ArrayType) + || !(((ArrayType) fieldType).getElementType() instanceof FloatType)) { + throw new IllegalArgumentException( + "Lumina currently only supports float arrays, but field type is: " + fieldType); + } + } + + private void ensureLoadMetas() throws IOException { + if (!metasLoaded) { + synchronized (this) { + if (!metasLoaded) { + for (int i = 0; i < ioMetas.size(); i++) { + byte[] metaBytes = ioMetas.get(i).metadata(); + indexMetas[i] = LuminaIndexMeta.deserialize(metaBytes); + } + metasLoaded = true; + } + } + } + } + + private void ensureLoadAllIndices() throws IOException { + if (!indicesLoaded) { + synchronized (this) { + if (!indicesLoaded) { + for (int i = 0; i < ioMetas.size(); i++) { + if (indices[i] == null) { + loadIndexAt(i); + } + } + indicesLoaded = true; + } + } + } + } + + private void ensureLoadIndices(List<Integer> positions) throws IOException { + synchronized (this) { + for (int pos : positions) { + if (indices[pos] == null) { + loadIndexAt(pos); + } + } + // Check if all indices are now loaded. + if (!indicesLoaded) { + boolean allLoaded = true; + for (LuminaIndex idx : indices) { + if (idx == null) { + allLoaded = false; + break; + } + } + if (allLoaded) { + indicesLoaded = true; + } + } + } + } + + private void loadIndexAt(int position) throws IOException { + GlobalIndexIOMeta ioMeta = ioMetas.get(position); + SeekableInputStream in = fileReader.getInputStream(ioMeta); + LuminaIndex index = null; + try { + index = loadIndex(in, position, ioMeta.fileSize()); + openStreams.add(in); + indices[position] = index; + } catch (Exception e) { + IOUtils.closeQuietly(index); + IOUtils.closeQuietly(in); + throw e; + } + } + + private LuminaIndex loadIndex(SeekableInputStream in, int position, long fileSize) + throws IOException { + LuminaIndexMeta meta = indexMetas[position]; + LuminaVectorMetric metric = LuminaVectorMetric.fromValue(meta.metricValue()); + LuminaIndexType indexType = meta.indexType(); + + // Fix #11: Reject UNKNOWN index type instead of passing it to JNI. + if (indexType == LuminaIndexType.UNKNOWN) { + throw new IOException( + "Unsupported Lumina index type in metadata at position " + + position + + ". The index file may have been created by a newer version."); + } + + LuminaFileInput fileInput = new InputStreamFileInput(in); + + // Fix #5: Pass search-time options to LuminaSearcher.create(). + Map<String, String> extraOptions = new LinkedHashMap<>(); + extraOptions.put("diskann.search.list_size", String.valueOf(options.searchListSize())); + return LuminaIndex.fromStream( + fileInput, fileSize, meta.dim(), metric, indexType, extraOptions); Review Comment: More "Fix #..." comments appear here (e.g., "Fix #11", "Fix #5") around the index loading logic. Please remove/replace these with self-contained explanations so the code doesn’t depend on undocumented issue references. ########## paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexWriter.java: ########## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.lumina.index; + +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.globalindex.GlobalIndexSingletonWriter; +import org.apache.paimon.globalindex.ResultEntry; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.FloatType; + +import org.aliyun.lumina.LuminaFileOutput; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.FloatBuffer; +import java.nio.LongBuffer; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +/** Vector global index writer using Lumina. */ +public class LuminaVectorGlobalIndexWriter implements GlobalIndexSingletonWriter, Closeable { + + private static final String FILE_NAME_PREFIX = "lumina"; + + private final GlobalIndexFileWriter fileWriter; + private final LuminaVectorIndexOptions options; + private final int sizePerIndex; + private final int dim; + + private long count = 0; // monotonically increasing global row ID across all index files + private long currentIndexMinId = Long.MAX_VALUE; + private long currentIndexMaxId = Long.MIN_VALUE; + private List<float[]> pendingVectors; + private final List<ResultEntry> results; + + public LuminaVectorGlobalIndexWriter( + GlobalIndexFileWriter fileWriter, + DataType fieldType, + LuminaVectorIndexOptions options) { + this.fileWriter = fileWriter; + this.options = options; + this.dim = options.dimension(); + int configuredSize = options.sizePerIndex(); + long buildMemoryLimit = options.buildMemoryLimit(); + int maxByDim = + (int) Math.min(configuredSize, buildMemoryLimit / ((long) dim * Float.BYTES)); + this.sizePerIndex = Math.max(maxByDim, 1); + this.pendingVectors = new ArrayList<>(); + this.results = new ArrayList<>(); + + validateFieldType(fieldType); + } + + private void validateFieldType(DataType dataType) { + if (!(dataType instanceof ArrayType)) { + throw new IllegalArgumentException( + "Lumina vector index requires ArrayType, but got: " + dataType); + } + DataType elementType = ((ArrayType) dataType).getElementType(); + if (!(elementType instanceof FloatType)) { + throw new IllegalArgumentException( + "Lumina vector index requires float array, but got: " + elementType); + } + } + + @Override + public void write(Object fieldData) { + float[] vector; + if (fieldData instanceof float[]) { + vector = ((float[]) fieldData).clone(); + } else if (fieldData instanceof InternalArray) { + vector = ((InternalArray) fieldData).toFloatArray(); + } else { + throw new RuntimeException( + "Unsupported vector type: " + fieldData.getClass().getName()); + } + checkDimension(vector); + + pendingVectors.add(vector); + currentIndexMinId = Math.min(currentIndexMinId, count); + currentIndexMaxId = Math.max(currentIndexMaxId, count); + count++; + + try { + if (pendingVectors.size() >= sizePerIndex) { + buildAndFlushIndex(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public List<ResultEntry> finish() { + try { + if (!pendingVectors.isEmpty()) { + buildAndFlushIndex(); + } + return results; + } catch (IOException e) { + throw new RuntimeException("Failed to write Lumina vector global index", e); + } + } + + /** + * Build a complete DiskANN index from the accumulated vectors: create DirectByteBuffers on + * demand, pretrain, insert all vectors in a single batch, dump directly to the output stream, + * and release buffers. + */ + private void buildAndFlushIndex() throws IOException { + int n = pendingVectors.size(); + if (n == 0) { + return; + } + + LuminaIndex index = createIndex(); + + try { + // Build the full vector buffer from accumulated vectors. + ByteBuffer vectorBuffer = buildVectorBuffer(pendingVectors); + ByteBuffer idBuffer = buildIdBuffer(n, currentIndexMinId); + + // Pretrain phase. + int trainingSize = Math.min(n, options.trainingSize()); + if (trainingSize == n) { + index.pretrain(vectorBuffer, n); + } else { + int[] sampleIndices = reservoirSample(n, trainingSize); + ByteBuffer trainingBuffer = LuminaIndex.allocateVectorBuffer(trainingSize, dim); + FloatBuffer trainingView = trainingBuffer.asFloatBuffer(); + for (int i = 0; i < trainingSize; i++) { + trainingView.put(pendingVectors.get(sampleIndices[i])); + } + index.pretrain(trainingBuffer, trainingSize); + } + + // Insert phase. + index.insertBatch(vectorBuffer, idBuffer, n); + + // Dump to output stream — direct streaming, no temp files. + String fileName = fileWriter.newFileName(FILE_NAME_PREFIX); + try (PositionOutputStream out = fileWriter.newOutputStream(fileName)) { + index.dump(new OutputStreamFileOutput(out)); + out.flush(); + } + + LuminaIndexMeta meta = + new LuminaIndexMeta( + dim, + options.metric().getValue(), + options.indexType().name(), + n, + currentIndexMinId, + currentIndexMaxId); + results.add(new ResultEntry(fileName, n, meta.serialize())); + } finally { + index.close(); + } Review Comment: In `buildAndFlushIndex`, `index.close()` is unconditionally called in `finally`. If `createIndex()` throws (e.g., native library missing/invalid options), `index` will be null and the `finally` will throw an NPE masking the original failure. Guard the close or restructure so `index` is only closed when successfully created. ########## paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala: ########## @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.spark.PaimonSparkTestBase + +import scala.collection.JavaConverters._ + +/** Tests for Lumina vector index read/write operations. */ +class LuminaVectorIndexTest extends PaimonSparkTestBase { + + private val indexType = "lumina-vector-ann" + private val defaultOptions = "vector.dim=3,vector.index-type=DISKANN" + + // ========== Index Creation Tests ========== Review Comment: Unlike the JUnit Lumina tests (which skip when the native library can't be loaded), this Spark suite has no guard and will fail if `lumina-jni` natives aren’t available in the test runtime. Please add a suite-level `assume(...)`/skip that attempts `Lumina.loadLibrary()` and skips when unavailable, to keep CI/test runs reliable on environments without the native binaries. ########## paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexWriter.java: ########## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.lumina.index; + +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.globalindex.GlobalIndexSingletonWriter; +import org.apache.paimon.globalindex.ResultEntry; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.FloatType; + +import org.aliyun.lumina.LuminaFileOutput; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.FloatBuffer; +import java.nio.LongBuffer; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +/** Vector global index writer using Lumina. */ +public class LuminaVectorGlobalIndexWriter implements GlobalIndexSingletonWriter, Closeable { + + private static final String FILE_NAME_PREFIX = "lumina"; + + private final GlobalIndexFileWriter fileWriter; + private final LuminaVectorIndexOptions options; + private final int sizePerIndex; + private final int dim; + + private long count = 0; // monotonically increasing global row ID across all index files + private long currentIndexMinId = Long.MAX_VALUE; + private long currentIndexMaxId = Long.MIN_VALUE; + private List<float[]> pendingVectors; + private final List<ResultEntry> results; + + public LuminaVectorGlobalIndexWriter( + GlobalIndexFileWriter fileWriter, + DataType fieldType, + LuminaVectorIndexOptions options) { + this.fileWriter = fileWriter; + this.options = options; + this.dim = options.dimension(); + int configuredSize = options.sizePerIndex(); + long buildMemoryLimit = options.buildMemoryLimit(); + int maxByDim = + (int) Math.min(configuredSize, buildMemoryLimit / ((long) dim * Float.BYTES)); + this.sizePerIndex = Math.max(maxByDim, 1); + this.pendingVectors = new ArrayList<>(); + this.results = new ArrayList<>(); + + validateFieldType(fieldType); + } + + private void validateFieldType(DataType dataType) { + if (!(dataType instanceof ArrayType)) { + throw new IllegalArgumentException( + "Lumina vector index requires ArrayType, but got: " + dataType); + } + DataType elementType = ((ArrayType) dataType).getElementType(); + if (!(elementType instanceof FloatType)) { + throw new IllegalArgumentException( + "Lumina vector index requires float array, but got: " + elementType); + } + } + + @Override + public void write(Object fieldData) { + float[] vector; + if (fieldData instanceof float[]) { + vector = ((float[]) fieldData).clone(); + } else if (fieldData instanceof InternalArray) { + vector = ((InternalArray) fieldData).toFloatArray(); + } else { + throw new RuntimeException( + "Unsupported vector type: " + fieldData.getClass().getName()); + } Review Comment: `GlobalIndexSingletonWriter.write` accepts `@Nullable` values, but this implementation dereferences `fieldData` in the error path (`fieldData.getClass()`) and will NPE on null vectors. Please handle `null` explicitly (e.g., skip indexing for null vectors or throw a clear IllegalArgumentException). ########## paimon-lumina/pom.xml: ########## @@ -0,0 +1,100 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>paimon-parent</artifactId> + <groupId>org.apache.paimon</groupId> + <version>1.4-SNAPSHOT</version> + </parent> + + <artifactId>paimon-lumina</artifactId> + <name>Paimon : Lumina Index</name> + + <repositories> + <repository> + <id>lumina</id> + <url>https://lumina-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/</url> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-common</artifactId> + <version>${project.version}</version> + </dependency> Review Comment: `paimon-common` is currently a compile-scope dependency here. Other optional integration modules (e.g., `paimon-lance`) declare `paimon-common` as `provided` to avoid bundling Paimon core classes into the plugin JAR and creating version conflicts. Consider aligning by switching this dependency to `provided` (and adding test-jar deps if needed for tests). ########## paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexReader.java: ########## @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.lumina.index; + +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexReader; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.VectorSearch; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.utils.IOUtils; +import org.apache.paimon.utils.RoaringNavigableMap64; + +import org.aliyun.lumina.LuminaFileInput; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.PriorityQueue; + +/** + * Vector global index reader using Lumina. + * + * <p>This reader loads Lumina indices from global index files and performs vector similarity + * search. + */ +public class LuminaVectorGlobalIndexReader implements GlobalIndexReader { + + private final LuminaIndex[] indices; + private final LuminaIndexMeta[] indexMetas; + private final List<SeekableInputStream> openStreams; + private final List<GlobalIndexIOMeta> ioMetas; + private final GlobalIndexFileReader fileReader; + private final DataType fieldType; + private final LuminaVectorIndexOptions options; + private volatile boolean metasLoaded = false; + private volatile boolean indicesLoaded = false; + + public LuminaVectorGlobalIndexReader( + GlobalIndexFileReader fileReader, + List<GlobalIndexIOMeta> ioMetas, + DataType fieldType, + LuminaVectorIndexOptions options) { + this.fileReader = fileReader; + this.ioMetas = ioMetas; + this.fieldType = fieldType; + this.options = options; + this.indices = new LuminaIndex[ioMetas.size()]; + this.indexMetas = new LuminaIndexMeta[ioMetas.size()]; + this.openStreams = Collections.synchronizedList(new ArrayList<>()); + } + + @Override + public Optional<GlobalIndexResult> visitVectorSearch(VectorSearch vectorSearch) { + try { + ensureLoadMetas(); + + RoaringNavigableMap64 includeRowIds = vectorSearch.includeRowIds(); + + // Fix #12: Capture loaded indices snapshot under synchronized to ensure visibility. + LuminaIndex[] loadedIndices; + LuminaIndexMeta[] loadedMetas; + + if (includeRowIds != null) { Review Comment: The inline comments like "Fix #12" / "Fix #3" read like references to an external/internal issue tracker and make the production code harder to maintain. Please replace them with neutral explanatory comments (or remove if redundant) that don’t rely on issue numbers for context. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
