Copilot commented on code in PR #7330:
URL: https://github.com/apache/paimon/pull/7330#discussion_r2908907726


##########
paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorIndexOptions.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+
+/** Options for Lumina vector index. */
+public class LuminaVectorIndexOptions {
+
+    public static final ConfigOption<Integer> VECTOR_DIM =
+            ConfigOptions.key("vector.dim")
+                    .intType()
+                    .defaultValue(128)
+                    .withDescription("The dimension of the vector");
+
+    public static final ConfigOption<LuminaVectorMetric> VECTOR_METRIC =
+            ConfigOptions.key("vector.metric")
+                    .enumType(LuminaVectorMetric.class)
+                    .defaultValue(LuminaVectorMetric.L2)
+                    .withDescription(
+                            "The distance metric for vector search (L2, 
COSINE, INNER_PRODUCT)");
+
+    public static final ConfigOption<LuminaIndexType> VECTOR_INDEX_TYPE =
+            ConfigOptions.key("vector.index-type")
+                    .enumType(LuminaIndexType.class)
+                    .defaultValue(LuminaIndexType.DISKANN)
+                    .withDescription("The type of Lumina index (DISKANN)");
+
+    public static final ConfigOption<String> VECTOR_ENCODING_TYPE =
+            ConfigOptions.key("vector.encoding-type")
+                    .stringType()
+                    .defaultValue("rawf32")
+                    .withDescription("The encoding type for vectors (rawf32, 
sq8, pq)");
+
+    public static final ConfigOption<Integer> VECTOR_SIZE_PER_INDEX =
+            ConfigOptions.key("vector.size-per-index")
+                    .intType()
+                    .defaultValue(2_000_000)
+                    .withDescription("The number of vectors stored in each 
index file");
+
+    public static final ConfigOption<Integer> VECTOR_TRAINING_SIZE =
+            ConfigOptions.key("vector.training-size")
+                    .intType()
+                    .defaultValue(500_000)
+                    .withDescription(
+                            "The number of vectors to use for pretraining 
DiskANN indices");
+
+    public static final ConfigOption<Integer> VECTOR_SEARCH_FACTOR =
+            ConfigOptions.key("vector.search-factor")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription(
+                            "The multiplier for the search limit when 
filtering is applied");
+
+    public static final ConfigOption<Integer> VECTOR_DISKANN_SEARCH_LIST_SIZE =
+            ConfigOptions.key("vector.diskann.search-list-size")
+                    .intType()
+                    .defaultValue(100)
+                    .withDescription("The search list size for DiskANN search 
(list_size)");
+
+    public static final ConfigOption<Double> PRETRAIN_SAMPLE_RATIO =
+            ConfigOptions.key("vector.pretrain-sample-ratio")
+                    .doubleType()
+                    .defaultValue(1.0)
+                    .withDescription(
+                            "The sample ratio for pretraining (Lumina's 
pretrain.sample_ratio)");
+
+    public static final ConfigOption<Integer> DISKANN_EF_CONSTRUCTION =
+            ConfigOptions.key("vector.diskann.ef-construction")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "DiskANN build ef_construction parameter. "
+                                    + "Controls the size of the dynamic 
candidate list during graph construction.");
+
+    public static final ConfigOption<Integer> DISKANN_NEIGHBOR_COUNT =
+            ConfigOptions.key("vector.diskann.neighbor-count")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "DiskANN build neighbor count. "
+                                    + "Maximum number of neighbors per node in 
the graph.");
+
+    public static final ConfigOption<Integer> DISKANN_BUILD_THREAD_COUNT =
+            ConfigOptions.key("vector.diskann.build-thread-count")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("Number of threads used for DiskANN index 
building.");
+
+    public static final ConfigOption<Long> VECTOR_BUILD_MEMORY_LIMIT =
+            ConfigOptions.key("vector.build-memory-limit")
+                    .longType()
+                    .defaultValue(2L * 1024 * 1024 * 1024)
+                    .withDescription(
+                            "Maximum bytes of vector data buffered in memory 
per index during building. "
+                                    + "When the configured sizePerIndex would 
exceed this limit for a given "
+                                    + "dimension, it is automatically 
reduced.");
+
+    private final int dimension;
+    private final LuminaVectorMetric metric;
+    private final LuminaIndexType indexType;
+    private final String encodingType;
+    private final int sizePerIndex;
+    private final int trainingSize;
+    private final int searchFactor;
+    private final int searchListSize;
+    private final double pretrainSampleRatio;
+    private final Integer diskannEfConstruction;
+    private final Integer diskannNeighborCount;
+    private final Integer diskannBuildThreadCount;
+    private final long buildMemoryLimit;
+
+    public LuminaVectorIndexOptions(Options options) {
+        this.dimension = options.get(VECTOR_DIM);
+        this.metric = options.get(VECTOR_METRIC);
+        this.indexType = options.get(VECTOR_INDEX_TYPE);

Review Comment:
   `vector.dim` isn’t validated. A configured value of 0 (or negative) will 
cause division-by-zero / invalid buffer sizing in the writer and can lead to 
invalid JNI calls. Please validate `vector.dim > 0` in 
`LuminaVectorIndexOptions` (and consider validating other numeric options are 
positive as well).



##########
paimon-common/src/main/java/org/apache/paimon/utils/RoaringNavigableMap64.java:
##########
@@ -142,6 +142,22 @@ public List<Range> toRangeList() {
         return Range.toRanges(roaring64NavigableMap::iterator);
     }
 
+    /**
+     * Returns true if there is at least one value in the range [rangeMin, 
rangeMax] (inclusive on
+     * both ends) contained in this bitmap.
+     *
+     * <p>Uses {@code rankLong} for O(log N) performance instead of iterating 
all values.
+     */
+    public boolean intersectsRange(long rangeMin, long rangeMax) {
+        if (rangeMin > rangeMax) {
+            throw new IllegalArgumentException(
+                    "rangeMin (" + rangeMin + ") must be <= rangeMax (" + 
rangeMax + ")");
+        }
+        long countUpToMax = roaring64NavigableMap.rankLong(rangeMax);
+        long countBeforeMin = rangeMin <= 0 ? 0 : 
roaring64NavigableMap.rankLong(rangeMin - 1);
+        return countUpToMax > countBeforeMin;

Review Comment:
   `intersectsRange` is helpful for fast range checks, but callers (e.g., 
Lumina’s filtered search) also need an efficient way to extract IDs within a 
range without scanning from the beginning. Consider exposing rank/select or a 
range-aware iterator in `RoaringNavigableMap64` so range slicing can be 
implemented efficiently.



##########
paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexReader.java:
##########
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.VectorSearch;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import org.aliyun.lumina.LuminaFileInput;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.PriorityQueue;
+
+/**
+ * Vector global index reader using Lumina.
+ *
+ * <p>This reader loads Lumina indices from global index files and performs 
vector similarity
+ * search.
+ */
+public class LuminaVectorGlobalIndexReader implements GlobalIndexReader {
+
+    private final LuminaIndex[] indices;
+    private final LuminaIndexMeta[] indexMetas;
+    private final List<SeekableInputStream> openStreams;
+    private final List<GlobalIndexIOMeta> ioMetas;
+    private final GlobalIndexFileReader fileReader;
+    private final DataType fieldType;
+    private final LuminaVectorIndexOptions options;
+    private volatile boolean metasLoaded = false;
+    private volatile boolean indicesLoaded = false;
+
+    public LuminaVectorGlobalIndexReader(
+            GlobalIndexFileReader fileReader,
+            List<GlobalIndexIOMeta> ioMetas,
+            DataType fieldType,
+            LuminaVectorIndexOptions options) {
+        this.fileReader = fileReader;
+        this.ioMetas = ioMetas;
+        this.fieldType = fieldType;
+        this.options = options;
+        this.indices = new LuminaIndex[ioMetas.size()];
+        this.indexMetas = new LuminaIndexMeta[ioMetas.size()];
+        this.openStreams = Collections.synchronizedList(new ArrayList<>());
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitVectorSearch(VectorSearch 
vectorSearch) {
+        try {
+            ensureLoadMetas();
+
+            RoaringNavigableMap64 includeRowIds = vectorSearch.includeRowIds();
+
+            // Fix #12: Capture loaded indices snapshot under synchronized to 
ensure visibility.
+            LuminaIndex[] loadedIndices;
+            LuminaIndexMeta[] loadedMetas;
+
+            if (includeRowIds != null) {
+                List<Integer> matchingIndices = new ArrayList<>();
+                for (int i = 0; i < indexMetas.length; i++) {
+                    LuminaIndexMeta meta = indexMetas[i];
+                    // Fix #3: Renamed from containsRange to intersectsRange.
+                    if (includeRowIds.intersectsRange(meta.minId(), 
meta.maxId())) {
+                        matchingIndices.add(i);
+                    }
+                }
+                if (matchingIndices.isEmpty()) {
+                    return Optional.empty();
+                }
+                ensureLoadIndices(matchingIndices);
+            } else {
+                ensureLoadAllIndices();
+            }
+
+            // Fix #12: Take a snapshot of indices/metas under lock for 
visibility.
+            synchronized (this) {
+                loadedIndices = indices.clone();
+                loadedMetas = indexMetas.clone();
+            }
+
+            return Optional.ofNullable(search(vectorSearch, loadedIndices, 
loadedMetas));
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    String.format(
+                            "Failed to search Lumina vector index with 
fieldName=%s, limit=%d",
+                            vectorSearch.fieldName(), vectorSearch.limit()),
+                    e);
+        }
+    }
+
+    private GlobalIndexResult search(
+            VectorSearch vectorSearch, LuminaIndex[] loadedIndices, 
LuminaIndexMeta[] loadedMetas)
+            throws IOException {
+        validateVectorType(vectorSearch.vector());
+        float[] queryVector = ((float[]) vectorSearch.vector()).clone();
+        int limit = vectorSearch.limit();
+
+        // Min-heap: smallest score at head, so we can evict the weakest 
candidate efficiently.
+        PriorityQueue<ScoredRow> topK =
+                new PriorityQueue<>(limit + 1, Comparator.comparingDouble(s -> 
s.score));
+
+        RoaringNavigableMap64 includeRowIds = vectorSearch.includeRowIds();
+
+        // Fix #10: Materialize filter IDs per-shard from bitmap, avoiding 
full materialization.
+
+        Map<String, String> filterSearchOptions = null;
+        Map<String, String> plainSearchOptions = null;
+        if (includeRowIds != null) {
+            filterSearchOptions = new LinkedHashMap<>();
+            // Fix #7: Use long arithmetic to avoid integer overflow.
+            int listSize = (int) Math.min((long) limit * 
options.searchFactor(), Integer.MAX_VALUE);
+            listSize = Math.max(listSize, options.searchListSize());
+            filterSearchOptions.put("diskann.search.list_size", 
String.valueOf(listSize));
+            filterSearchOptions.put("search.thread_safe_filter", "true");
+        } else {
+            plainSearchOptions = new LinkedHashMap<>();
+            int listSize = Math.max(limit, options.searchListSize());
+            plainSearchOptions.put("diskann.search.list_size", 
String.valueOf(listSize));
+        }
+
+        for (int i = 0; i < loadedIndices.length; i++) {
+            LuminaIndex index = loadedIndices[i];
+            if (index == null) {
+                continue;
+            }
+
+            int effectiveK = (int) Math.min(limit, index.size());
+            if (effectiveK <= 0) {
+                continue;
+            }
+
+            if (includeRowIds != null) {
+                LuminaIndexMeta meta = loadedMetas[i];
+                // Fix #10: Extract filter IDs for this shard directly from 
bitmap.
+                long[] scopedIds = extractIdsInRange(includeRowIds, 
meta.minId(), meta.maxId());
+                if (scopedIds.length == 0) {
+                    continue;
+                }
+                effectiveK = (int) Math.min(effectiveK, scopedIds.length);
+
+                float[] distances = new float[effectiveK];
+                long[] labels = new long[effectiveK];
+                index.searchWithFilter(
+                        queryVector,
+                        1,
+                        effectiveK,
+                        distances,
+                        labels,
+                        scopedIds,
+                        filterSearchOptions);
+                collectResults(distances, labels, effectiveK, limit, topK);
+            } else {
+                float[] distances = new float[effectiveK];
+                long[] labels = new long[effectiveK];
+                index.search(queryVector, 1, effectiveK, distances, labels, 
plainSearchOptions);
+                collectResults(distances, labels, effectiveK, limit, topK);
+            }
+        }
+
+        RoaringNavigableMap64 roaringBitmap64 = new RoaringNavigableMap64();
+        HashMap<Long, Float> id2scores = new HashMap<>(topK.size());
+        for (ScoredRow row : topK) {
+            roaringBitmap64.add(row.rowId);
+            id2scores.put(row.rowId, row.score);
+        }
+        return new LuminaScoredGlobalIndexResult(roaringBitmap64, id2scores);
+    }
+
+    /**
+     * Collect search results into a min-heap of size {@code limit}. The heap 
keeps the top-k
+     * highest-scoring rows; rows with score lower than the current minimum 
are discarded once the
+     * heap is full.
+     */
+    private void collectResults(
+            float[] distances, long[] labels, int count, int limit, 
PriorityQueue<ScoredRow> topK) {
+        for (int i = 0; i < count; i++) {
+            long rowId = labels[i];
+            if (rowId < 0) {
+                continue;
+            }
+            float score = convertDistanceToScore(distances[i]);
+            if (topK.size() < limit) {
+                topK.offer(new ScoredRow(rowId, score));
+            } else if (score > topK.peek().score) {
+                topK.poll();
+                topK.offer(new ScoredRow(rowId, score));
+            }
+        }
+    }
+
+    /**
+     * Extract IDs from the bitmap that fall within [minId, maxId] without 
materializing the entire
+     * bitmap. Uses the bitmap iterator and collects only IDs in the given 
range.
+     */
+    private static long[] extractIdsInRange(RoaringNavigableMap64 bitmap, long 
minId, long maxId) {
+        List<Long> ids = new ArrayList<>();
+        for (long id : bitmap) {
+            if (id > maxId) {
+                break;

Review Comment:
   `extractIdsInRange` iterates the bitmap from the beginning for every index 
file and only stops after `id > maxId`. With large `includeRowIds` and many 
index files this becomes O(numFiles * cardinality) and can be a significant 
bottleneck. Consider adding a range-aware extraction method (e.g., 
rank/select-based) to `RoaringNavigableMap64`, or pre-partitioning the filter 
once per query, to avoid rescanning early IDs.



##########
paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexReader.java:
##########
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.VectorSearch;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import org.aliyun.lumina.LuminaFileInput;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.PriorityQueue;
+
+/**
+ * Vector global index reader using Lumina.
+ *
+ * <p>This reader loads Lumina indices from global index files and performs 
vector similarity
+ * search.
+ */
+public class LuminaVectorGlobalIndexReader implements GlobalIndexReader {
+
+    private final LuminaIndex[] indices;
+    private final LuminaIndexMeta[] indexMetas;
+    private final List<SeekableInputStream> openStreams;
+    private final List<GlobalIndexIOMeta> ioMetas;
+    private final GlobalIndexFileReader fileReader;
+    private final DataType fieldType;
+    private final LuminaVectorIndexOptions options;
+    private volatile boolean metasLoaded = false;
+    private volatile boolean indicesLoaded = false;
+
+    public LuminaVectorGlobalIndexReader(
+            GlobalIndexFileReader fileReader,
+            List<GlobalIndexIOMeta> ioMetas,
+            DataType fieldType,
+            LuminaVectorIndexOptions options) {
+        this.fileReader = fileReader;
+        this.ioMetas = ioMetas;
+        this.fieldType = fieldType;
+        this.options = options;
+        this.indices = new LuminaIndex[ioMetas.size()];
+        this.indexMetas = new LuminaIndexMeta[ioMetas.size()];
+        this.openStreams = Collections.synchronizedList(new ArrayList<>());
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitVectorSearch(VectorSearch 
vectorSearch) {
+        try {
+            ensureLoadMetas();
+
+            RoaringNavigableMap64 includeRowIds = vectorSearch.includeRowIds();
+
+            // Fix #12: Capture loaded indices snapshot under synchronized to 
ensure visibility.
+            LuminaIndex[] loadedIndices;
+            LuminaIndexMeta[] loadedMetas;
+
+            if (includeRowIds != null) {
+                List<Integer> matchingIndices = new ArrayList<>();
+                for (int i = 0; i < indexMetas.length; i++) {
+                    LuminaIndexMeta meta = indexMetas[i];
+                    // Fix #3: Renamed from containsRange to intersectsRange.
+                    if (includeRowIds.intersectsRange(meta.minId(), 
meta.maxId())) {
+                        matchingIndices.add(i);
+                    }
+                }
+                if (matchingIndices.isEmpty()) {
+                    return Optional.empty();
+                }
+                ensureLoadIndices(matchingIndices);
+            } else {
+                ensureLoadAllIndices();
+            }
+
+            // Fix #12: Take a snapshot of indices/metas under lock for 
visibility.
+            synchronized (this) {
+                loadedIndices = indices.clone();
+                loadedMetas = indexMetas.clone();
+            }
+
+            return Optional.ofNullable(search(vectorSearch, loadedIndices, 
loadedMetas));
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    String.format(
+                            "Failed to search Lumina vector index with 
fieldName=%s, limit=%d",
+                            vectorSearch.fieldName(), vectorSearch.limit()),
+                    e);
+        }
+    }
+
+    private GlobalIndexResult search(
+            VectorSearch vectorSearch, LuminaIndex[] loadedIndices, 
LuminaIndexMeta[] loadedMetas)
+            throws IOException {
+        validateVectorType(vectorSearch.vector());
+        float[] queryVector = ((float[]) vectorSearch.vector()).clone();
+        int limit = vectorSearch.limit();
+

Review Comment:
   The query vector’s dimension isn’t validated against the index dimension. If 
a caller passes a `float[]` of the wrong length, the JNI 
`search`/`searchWithFilter` call may fail or crash the JVM. Please compare the 
query length against the expected dimension (from metadata or options) and 
throw an `IllegalArgumentException` with a clear message before invoking Lumina.



##########
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import scala.collection.JavaConverters._
+
+/** Tests for Lumina vector index read/write operations. */
+class LuminaVectorIndexTest extends PaimonSparkTestBase {
+

Review Comment:
   These Spark UTs unconditionally exercise Lumina index creation/search, but 
(unlike the JUnit tests in `paimon-lumina`) they don’t guard against the Lumina 
JNI native library being unavailable. In environments where 
`Lumina.loadLibrary()` fails, this suite will fail rather than being skipped. 
Consider checking `Lumina.isLibraryLoaded()`/`Lumina.loadLibrary()` in the test 
setup and using a ScalaTest assumption to skip when the native library can’t be 
loaded.



##########
paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorIndexOptions.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+
+/** Options for Lumina vector index. */
+public class LuminaVectorIndexOptions {
+
+    public static final ConfigOption<Integer> VECTOR_DIM =
+            ConfigOptions.key("vector.dim")
+                    .intType()
+                    .defaultValue(128)
+                    .withDescription("The dimension of the vector");
+
+    public static final ConfigOption<LuminaVectorMetric> VECTOR_METRIC =
+            ConfigOptions.key("vector.metric")
+                    .enumType(LuminaVectorMetric.class)
+                    .defaultValue(LuminaVectorMetric.L2)
+                    .withDescription(
+                            "The distance metric for vector search (L2, 
COSINE, INNER_PRODUCT)");
+
+    public static final ConfigOption<LuminaIndexType> VECTOR_INDEX_TYPE =
+            ConfigOptions.key("vector.index-type")
+                    .enumType(LuminaIndexType.class)
+                    .defaultValue(LuminaIndexType.DISKANN)
+                    .withDescription("The type of Lumina index (DISKANN)");

Review Comment:
   `VECTOR_INDEX_TYPE` is configured via `enumType(LuminaIndexType.class)`, 
which means users can set `vector.index-type=UNKNOWN`. That value is meant as a 
deserialization fallback and will likely fail when passed into Lumina’s 
builder/searcher. Please reject `UNKNOWN` (or make it non-configurable) and 
throw a clear `IllegalArgumentException` when configured.



##########
paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexReader.java:
##########
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.VectorSearch;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import org.aliyun.lumina.LuminaFileInput;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.PriorityQueue;
+
+/**
+ * Vector global index reader using Lumina.
+ *
+ * <p>This reader loads Lumina indices from global index files and performs 
vector similarity
+ * search.
+ */
+public class LuminaVectorGlobalIndexReader implements GlobalIndexReader {
+
+    private final LuminaIndex[] indices;
+    private final LuminaIndexMeta[] indexMetas;
+    private final List<SeekableInputStream> openStreams;
+    private final List<GlobalIndexIOMeta> ioMetas;
+    private final GlobalIndexFileReader fileReader;
+    private final DataType fieldType;
+    private final LuminaVectorIndexOptions options;
+    private volatile boolean metasLoaded = false;
+    private volatile boolean indicesLoaded = false;
+
+    public LuminaVectorGlobalIndexReader(
+            GlobalIndexFileReader fileReader,
+            List<GlobalIndexIOMeta> ioMetas,
+            DataType fieldType,
+            LuminaVectorIndexOptions options) {
+        this.fileReader = fileReader;
+        this.ioMetas = ioMetas;
+        this.fieldType = fieldType;
+        this.options = options;
+        this.indices = new LuminaIndex[ioMetas.size()];
+        this.indexMetas = new LuminaIndexMeta[ioMetas.size()];
+        this.openStreams = Collections.synchronizedList(new ArrayList<>());
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitVectorSearch(VectorSearch 
vectorSearch) {
+        try {
+            ensureLoadMetas();
+
+            RoaringNavigableMap64 includeRowIds = vectorSearch.includeRowIds();
+
+            // Fix #12: Capture loaded indices snapshot under synchronized to 
ensure visibility.
+            LuminaIndex[] loadedIndices;
+            LuminaIndexMeta[] loadedMetas;
+
+            if (includeRowIds != null) {
+                List<Integer> matchingIndices = new ArrayList<>();
+                for (int i = 0; i < indexMetas.length; i++) {
+                    LuminaIndexMeta meta = indexMetas[i];
+                    // Fix #3: Renamed from containsRange to intersectsRange.
+                    if (includeRowIds.intersectsRange(meta.minId(), 
meta.maxId())) {
+                        matchingIndices.add(i);
+                    }
+                }
+                if (matchingIndices.isEmpty()) {
+                    return Optional.empty();
+                }
+                ensureLoadIndices(matchingIndices);
+            } else {
+                ensureLoadAllIndices();
+            }
+
+            // Fix #12: Take a snapshot of indices/metas under lock for 
visibility.
+            synchronized (this) {
+                loadedIndices = indices.clone();
+                loadedMetas = indexMetas.clone();
+            }
+
+            return Optional.ofNullable(search(vectorSearch, loadedIndices, 
loadedMetas));
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    String.format(
+                            "Failed to search Lumina vector index with 
fieldName=%s, limit=%d",
+                            vectorSearch.fieldName(), vectorSearch.limit()),
+                    e);
+        }
+    }
+
+    private GlobalIndexResult search(
+            VectorSearch vectorSearch, LuminaIndex[] loadedIndices, 
LuminaIndexMeta[] loadedMetas)
+            throws IOException {
+        validateVectorType(vectorSearch.vector());
+        float[] queryVector = ((float[]) vectorSearch.vector()).clone();
+        int limit = vectorSearch.limit();
+
+        // Min-heap: smallest score at head, so we can evict the weakest 
candidate efficiently.
+        PriorityQueue<ScoredRow> topK =
+                new PriorityQueue<>(limit + 1, Comparator.comparingDouble(s -> 
s.score));
+
+        RoaringNavigableMap64 includeRowIds = vectorSearch.includeRowIds();
+
+        // Fix #10: Materialize filter IDs per-shard from bitmap, avoiding 
full materialization.
+
+        Map<String, String> filterSearchOptions = null;
+        Map<String, String> plainSearchOptions = null;
+        if (includeRowIds != null) {
+            filterSearchOptions = new LinkedHashMap<>();
+            // Fix #7: Use long arithmetic to avoid integer overflow.
+            int listSize = (int) Math.min((long) limit * 
options.searchFactor(), Integer.MAX_VALUE);
+            listSize = Math.max(listSize, options.searchListSize());
+            filterSearchOptions.put("diskann.search.list_size", 
String.valueOf(listSize));
+            filterSearchOptions.put("search.thread_safe_filter", "true");
+        } else {
+            plainSearchOptions = new LinkedHashMap<>();
+            int listSize = Math.max(limit, options.searchListSize());
+            plainSearchOptions.put("diskann.search.list_size", 
String.valueOf(listSize));
+        }
+
+        for (int i = 0; i < loadedIndices.length; i++) {
+            LuminaIndex index = loadedIndices[i];
+            if (index == null) {
+                continue;
+            }
+
+            int effectiveK = (int) Math.min(limit, index.size());
+            if (effectiveK <= 0) {
+                continue;
+            }
+
+            if (includeRowIds != null) {
+                LuminaIndexMeta meta = loadedMetas[i];
+                // Fix #10: Extract filter IDs for this shard directly from 
bitmap.
+                long[] scopedIds = extractIdsInRange(includeRowIds, 
meta.minId(), meta.maxId());
+                if (scopedIds.length == 0) {
+                    continue;
+                }
+                effectiveK = (int) Math.min(effectiveK, scopedIds.length);
+
+                float[] distances = new float[effectiveK];
+                long[] labels = new long[effectiveK];
+                index.searchWithFilter(
+                        queryVector,
+                        1,
+                        effectiveK,
+                        distances,
+                        labels,
+                        scopedIds,
+                        filterSearchOptions);
+                collectResults(distances, labels, effectiveK, limit, topK);
+            } else {
+                float[] distances = new float[effectiveK];
+                long[] labels = new long[effectiveK];
+                index.search(queryVector, 1, effectiveK, distances, labels, 
plainSearchOptions);
+                collectResults(distances, labels, effectiveK, limit, topK);
+            }
+        }
+
+        RoaringNavigableMap64 roaringBitmap64 = new RoaringNavigableMap64();
+        HashMap<Long, Float> id2scores = new HashMap<>(topK.size());
+        for (ScoredRow row : topK) {
+            roaringBitmap64.add(row.rowId);
+            id2scores.put(row.rowId, row.score);
+        }
+        return new LuminaScoredGlobalIndexResult(roaringBitmap64, id2scores);
+    }
+
+    /**
+     * Collect search results into a min-heap of size {@code limit}. The heap 
keeps the top-k
+     * highest-scoring rows; rows with score lower than the current minimum 
are discarded once the
+     * heap is full.
+     */
+    private void collectResults(
+            float[] distances, long[] labels, int count, int limit, 
PriorityQueue<ScoredRow> topK) {
+        for (int i = 0; i < count; i++) {
+            long rowId = labels[i];
+            if (rowId < 0) {
+                continue;
+            }
+            float score = convertDistanceToScore(distances[i]);
+            if (topK.size() < limit) {
+                topK.offer(new ScoredRow(rowId, score));
+            } else if (score > topK.peek().score) {
+                topK.poll();
+                topK.offer(new ScoredRow(rowId, score));
+            }
+        }
+    }
+
+    /**
+     * Extract IDs from the bitmap that fall within [minId, maxId] without 
materializing the entire
+     * bitmap. Uses the bitmap iterator and collects only IDs in the given 
range.
+     */
+    private static long[] extractIdsInRange(RoaringNavigableMap64 bitmap, long 
minId, long maxId) {
+        List<Long> ids = new ArrayList<>();
+        for (long id : bitmap) {
+            if (id > maxId) {
+                break;
+            }
+            if (id >= minId) {
+                ids.add(id);
+            }
+        }
+        long[] result = new long[ids.size()];
+        for (int i = 0; i < ids.size(); i++) {
+            result[i] = ids.get(i);
+        }
+        return result;
+    }
+
+    private float convertDistanceToScore(float distance) {
+        if (options.metric() == LuminaVectorMetric.L2) {
+            return 1.0f / (1.0f + distance);
+        } else if (options.metric() == LuminaVectorMetric.COSINE) {
+            // Cosine distance is in [0, 2]; convert to similarity in [-1, 1]

Review Comment:
   Distance→score conversion is based on `options.metric()`, but the true 
metric for an index file is recorded in its serialized metadata 
(`LuminaIndexMeta.metricValue()`) and table options may change over time. If 
options and metadata diverge, score conversion (and ordering) becomes 
incorrect. Consider using the metric from the loaded `LuminaIndexMeta` (or 
validating consistency and using that) when converting distances.



##########
paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexWriter.java:
##########
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.FloatType;
+
+import org.aliyun.lumina.LuminaFileOutput;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.FloatBuffer;
+import java.nio.LongBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Vector global index writer using Lumina.
+ *
+ * <p>Vectors are collected until the current index reaches {@code 
sizePerIndex} vectors, then
+ * pretrained, inserted in a single batch, and dumped to a file. DiskANN 
requires exactly one
+ * pretrain and one insertBatch call per index.
+ *
+ * <p>Each written vector is assigned a monotonically increasing 64-bit row ID 
({@code count}) that
+ * spans across all produced index files. The second index file's IDs 
therefore start from {@code
+ * sizePerIndex}, not from 0. The min/max IDs stored in {@link 
LuminaIndexMeta} reflect this global
+ * range, enabling the reader to skip index files that have no overlap with a 
given filter set.
+ */
+public class LuminaVectorGlobalIndexWriter implements 
GlobalIndexSingletonWriter, Closeable {
+
+    /** Initial buffer capacity to avoid over-allocating for small datasets. */
+    private static final int INITIAL_BUFFER_CAPACITY = 4096;
+
+    private final GlobalIndexFileWriter fileWriter;
+    private final LuminaVectorIndexOptions options;
+    private final int sizePerIndex;
+    private final int dim;
+
+    private long count = 0; // monotonically increasing global row ID across 
all index files
+    private long currentIndexMinId = Long.MAX_VALUE;
+    private long currentIndexMaxId = Long.MIN_VALUE;
+    private ByteBuffer pendingVectors;
+    private ByteBuffer pendingIds;
+    private FloatBuffer pendingFloatView;
+    private LongBuffer pendingLongView;
+    private int pendingCount = 0;
+    private int bufferCapacity;
+    private final List<ResultEntry> results;
+
+    public LuminaVectorGlobalIndexWriter(
+            GlobalIndexFileWriter fileWriter,
+            DataType fieldType,
+            LuminaVectorIndexOptions options) {
+        this.fileWriter = fileWriter;
+        this.options = options;
+        this.dim = options.dimension();
+        int configuredSize = options.sizePerIndex();
+        long buildMemoryLimit = options.buildMemoryLimit();
+        int maxByDim =
+                (int) Math.min(configuredSize, buildMemoryLimit / ((long) dim 
* Float.BYTES));
+        this.sizePerIndex = Math.max(maxByDim, 1);
+        // Fix #8: Start with a smaller buffer, grow on demand up to 
sizePerIndex.
+        this.bufferCapacity = Math.min(INITIAL_BUFFER_CAPACITY, sizePerIndex);
+        this.pendingVectors = LuminaIndex.allocateVectorBuffer(bufferCapacity, 
dim);

Review Comment:
   Please remove the in-code review artifacts like `Fix #8` / `Fix #6` etc. 
These references won’t be meaningful to future readers and will become stale; 
replace with neutral rationale comments where needed, or drop them if the code 
is self-explanatory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to