Copilot commented on code in PR #7330:
URL: https://github.com/apache/paimon/pull/7330#discussion_r2909066102


##########
paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexReader.java:
##########
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.VectorSearch;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import org.aliyun.lumina.LuminaFileInput;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.PriorityQueue;
+
+/**
+ * Vector global index reader using Lumina.
+ *
+ * <p>This reader loads Lumina indices from global index files and performs 
vector similarity
+ * search.
+ */
+public class LuminaVectorGlobalIndexReader implements GlobalIndexReader {
+
+    private final LuminaIndex[] indices;
+    private final LuminaIndexMeta[] indexMetas;
+    private final List<SeekableInputStream> openStreams;
+    private final List<GlobalIndexIOMeta> ioMetas;
+    private final GlobalIndexFileReader fileReader;
+    private final DataType fieldType;
+    private final LuminaVectorIndexOptions options;
+    private volatile boolean metasLoaded = false;
+    private volatile boolean indicesLoaded = false;
+
+    public LuminaVectorGlobalIndexReader(
+            GlobalIndexFileReader fileReader,
+            List<GlobalIndexIOMeta> ioMetas,
+            DataType fieldType,
+            LuminaVectorIndexOptions options) {
+        this.fileReader = fileReader;
+        this.ioMetas = ioMetas;
+        this.fieldType = fieldType;
+        this.options = options;
+        this.indices = new LuminaIndex[ioMetas.size()];
+        this.indexMetas = new LuminaIndexMeta[ioMetas.size()];
+        this.openStreams = Collections.synchronizedList(new ArrayList<>());
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitVectorSearch(VectorSearch 
vectorSearch) {
+        try {
+            ensureLoadMetas();
+
+            RoaringNavigableMap64 includeRowIds = vectorSearch.includeRowIds();
+
+            // Fix #12: Capture loaded indices snapshot under synchronized to 
ensure visibility.
+            LuminaIndex[] loadedIndices;
+            LuminaIndexMeta[] loadedMetas;
+
+            if (includeRowIds != null) {
+                List<Integer> matchingIndices = new ArrayList<>();
+                for (int i = 0; i < indexMetas.length; i++) {
+                    LuminaIndexMeta meta = indexMetas[i];
+                    // Fix #3: Renamed from containsRange to intersectsRange.
+                    if (includeRowIds.intersectsRange(meta.minId(), 
meta.maxId())) {
+                        matchingIndices.add(i);
+                    }
+                }
+                if (matchingIndices.isEmpty()) {
+                    return Optional.empty();
+                }
+                ensureLoadIndices(matchingIndices);
+            } else {
+                ensureLoadAllIndices();
+            }
+
+            // Fix #12: Take a snapshot of indices/metas under lock for 
visibility.
+            synchronized (this) {
+                loadedIndices = indices.clone();
+                loadedMetas = indexMetas.clone();
+            }
+
+            return Optional.ofNullable(search(vectorSearch, loadedIndices, 
loadedMetas));
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    String.format(
+                            "Failed to search Lumina vector index with 
fieldName=%s, limit=%d",
+                            vectorSearch.fieldName(), vectorSearch.limit()),
+                    e);
+        }
+    }
+
+    private GlobalIndexResult search(
+            VectorSearch vectorSearch, LuminaIndex[] loadedIndices, 
LuminaIndexMeta[] loadedMetas)
+            throws IOException {
+        validateVectorType(vectorSearch.vector());
+        float[] queryVector = ((float[]) vectorSearch.vector()).clone();
+        int limit = vectorSearch.limit();
+
+        // Min-heap: smallest score at head, so we can evict the weakest 
candidate efficiently.
+        PriorityQueue<ScoredRow> topK =
+                new PriorityQueue<>(limit + 1, Comparator.comparingDouble(s -> 
s.score));
+
+        RoaringNavigableMap64 includeRowIds = vectorSearch.includeRowIds();
+
+        // Fix #10: Materialize filter IDs per-shard from bitmap, avoiding 
full materialization.
+
+        Map<String, String> filterSearchOptions = null;
+        Map<String, String> plainSearchOptions = null;
+        if (includeRowIds != null) {
+            filterSearchOptions = new LinkedHashMap<>();
+            // Fix #7: Use long arithmetic to avoid integer overflow.
+            int listSize = (int) Math.min((long) limit * 
options.searchFactor(), Integer.MAX_VALUE);
+            listSize = Math.max(listSize, options.searchListSize());
+            filterSearchOptions.put("diskann.search.list_size", 
String.valueOf(listSize));
+            filterSearchOptions.put("search.thread_safe_filter", "true");
+        } else {
+            plainSearchOptions = new LinkedHashMap<>();
+            int listSize = Math.max(limit, options.searchListSize());
+            plainSearchOptions.put("diskann.search.list_size", 
String.valueOf(listSize));
+        }
+
+        for (int i = 0; i < loadedIndices.length; i++) {
+            LuminaIndex index = loadedIndices[i];
+            if (index == null) {
+                continue;
+            }
+
+            int effectiveK = (int) Math.min(limit, index.size());
+            if (effectiveK <= 0) {
+                continue;
+            }
+
+            if (includeRowIds != null) {
+                LuminaIndexMeta meta = loadedMetas[i];
+                // Fix #10: Extract filter IDs for this shard directly from 
bitmap.
+                long[] scopedIds = extractIdsInRange(includeRowIds, 
meta.minId(), meta.maxId());
+                if (scopedIds.length == 0) {
+                    continue;
+                }
+                effectiveK = (int) Math.min(effectiveK, scopedIds.length);
+
+                float[] distances = new float[effectiveK];
+                long[] labels = new long[effectiveK];
+                index.searchWithFilter(
+                        queryVector,
+                        1,
+                        effectiveK,
+                        distances,
+                        labels,
+                        scopedIds,
+                        filterSearchOptions);
+                collectResults(distances, labels, effectiveK, limit, topK);
+            } else {
+                float[] distances = new float[effectiveK];
+                long[] labels = new long[effectiveK];
+                index.search(queryVector, 1, effectiveK, distances, labels, 
plainSearchOptions);
+                collectResults(distances, labels, effectiveK, limit, topK);
+            }
+        }
+
+        RoaringNavigableMap64 roaringBitmap64 = new RoaringNavigableMap64();
+        HashMap<Long, Float> id2scores = new HashMap<>(topK.size());
+        for (ScoredRow row : topK) {
+            roaringBitmap64.add(row.rowId);
+            id2scores.put(row.rowId, row.score);
+        }
+        return new LuminaScoredGlobalIndexResult(roaringBitmap64, id2scores);
+    }
+
+    /**
+     * Collect search results into a min-heap of size {@code limit}. The heap 
keeps the top-k
+     * highest-scoring rows; rows with score lower than the current minimum 
are discarded once the
+     * heap is full.
+     */
+    private void collectResults(
+            float[] distances, long[] labels, int count, int limit, 
PriorityQueue<ScoredRow> topK) {
+        for (int i = 0; i < count; i++) {
+            long rowId = labels[i];
+            if (rowId < 0) {
+                continue;
+            }
+            float score = convertDistanceToScore(distances[i]);
+            if (topK.size() < limit) {
+                topK.offer(new ScoredRow(rowId, score));
+            } else if (score > topK.peek().score) {
+                topK.poll();
+                topK.offer(new ScoredRow(rowId, score));
+            }
+        }
+    }
+
+    /**
+     * Extract IDs from the bitmap that fall within [minId, maxId] without 
materializing the entire
+     * bitmap. Uses the bitmap iterator and collects only IDs in the given 
range.
+     */
+    private static long[] extractIdsInRange(RoaringNavigableMap64 bitmap, long 
minId, long maxId) {
+        List<Long> ids = new ArrayList<>();
+        for (long id : bitmap) {
+            if (id > maxId) {
+                break;
+            }
+            if (id >= minId) {
+                ids.add(id);
+            }
+        }
+        long[] result = new long[ids.size()];
+        for (int i = 0; i < ids.size(); i++) {
+            result[i] = ids.get(i);
+        }
+        return result;
+    }
+
+    private float convertDistanceToScore(float distance) {
+        if (options.metric() == LuminaVectorMetric.L2) {
+            return 1.0f / (1.0f + distance);
+        } else if (options.metric() == LuminaVectorMetric.COSINE) {
+            // Cosine distance is in [0, 2]; convert to similarity in [-1, 1]
+            return 1.0f - distance;
+        } else {
+            // Inner product is already a similarity
+            return distance;
+        }
+    }
+
+    private void validateVectorType(Object vector) {
+        if (!(vector instanceof float[])) {
+            throw new IllegalArgumentException(
+                    "Expected float[] vector but got: " + vector.getClass());
+        }
+        if (!(fieldType instanceof ArrayType)
+                || !(((ArrayType) fieldType).getElementType() instanceof 
FloatType)) {
+            throw new IllegalArgumentException(
+                    "Lumina currently only supports float arrays, but field 
type is: " + fieldType);
+        }
+    }
+
+    private void ensureLoadMetas() throws IOException {
+        if (!metasLoaded) {
+            synchronized (this) {
+                if (!metasLoaded) {
+                    for (int i = 0; i < ioMetas.size(); i++) {
+                        byte[] metaBytes = ioMetas.get(i).metadata();
+                        indexMetas[i] = LuminaIndexMeta.deserialize(metaBytes);
+                    }
+                    metasLoaded = true;
+                }
+            }
+        }
+    }
+
+    private void ensureLoadAllIndices() throws IOException {
+        if (!indicesLoaded) {
+            synchronized (this) {
+                if (!indicesLoaded) {
+                    for (int i = 0; i < ioMetas.size(); i++) {
+                        if (indices[i] == null) {
+                            loadIndexAt(i);
+                        }
+                    }
+                    indicesLoaded = true;
+                }
+            }
+        }
+    }
+
+    private void ensureLoadIndices(List<Integer> positions) throws IOException 
{
+        synchronized (this) {
+            for (int pos : positions) {
+                if (indices[pos] == null) {
+                    loadIndexAt(pos);
+                }
+            }
+            // Check if all indices are now loaded.
+            if (!indicesLoaded) {
+                boolean allLoaded = true;
+                for (LuminaIndex idx : indices) {
+                    if (idx == null) {
+                        allLoaded = false;
+                        break;
+                    }
+                }
+                if (allLoaded) {
+                    indicesLoaded = true;
+                }
+            }
+        }
+    }
+
+    private void loadIndexAt(int position) throws IOException {
+        GlobalIndexIOMeta ioMeta = ioMetas.get(position);
+        SeekableInputStream in = fileReader.getInputStream(ioMeta);
+        LuminaIndex index = null;
+        try {
+            index = loadIndex(in, position, ioMeta.fileSize());
+            openStreams.add(in);
+            indices[position] = index;
+        } catch (Exception e) {
+            IOUtils.closeQuietly(index);
+            IOUtils.closeQuietly(in);
+            throw e;
+        }
+    }
+
+    private LuminaIndex loadIndex(SeekableInputStream in, int position, long 
fileSize)
+            throws IOException {
+        LuminaIndexMeta meta = indexMetas[position];
+        LuminaVectorMetric metric = 
LuminaVectorMetric.fromValue(meta.metricValue());
+        LuminaIndexType indexType = meta.indexType();
+
+        // Fix #11: Reject UNKNOWN index type instead of passing it to JNI.
+        if (indexType == LuminaIndexType.UNKNOWN) {
+            throw new IOException(
+                    "Unsupported Lumina index type in metadata at position "
+                            + position
+                            + ". The index file may have been created by a 
newer version.");
+        }
+
+        LuminaFileInput fileInput = new InputStreamFileInput(in);
+
+        // Fix #5: Pass search-time options to LuminaSearcher.create().
+        Map<String, String> extraOptions = new LinkedHashMap<>();
+        extraOptions.put("diskann.search.list_size", 
String.valueOf(options.searchListSize()));
+        return LuminaIndex.fromStream(
+                fileInput, fileSize, meta.dim(), metric, indexType, 
extraOptions);

Review Comment:
   More "Fix #..." comments appear here (e.g., "Fix #11", "Fix #5") around the 
index loading logic. Please remove/replace these with self-contained 
explanations so the code doesn’t depend on undocumented issue references.



##########
paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexWriter.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.FloatType;
+
+import org.aliyun.lumina.LuminaFileOutput;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.FloatBuffer;
+import java.nio.LongBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+/** Vector global index writer using Lumina. */
+public class LuminaVectorGlobalIndexWriter implements 
GlobalIndexSingletonWriter, Closeable {
+
+    private static final String FILE_NAME_PREFIX = "lumina";
+
+    private final GlobalIndexFileWriter fileWriter;
+    private final LuminaVectorIndexOptions options;
+    private final int sizePerIndex;
+    private final int dim;
+
+    private long count = 0; // monotonically increasing global row ID across 
all index files
+    private long currentIndexMinId = Long.MAX_VALUE;
+    private long currentIndexMaxId = Long.MIN_VALUE;
+    private List<float[]> pendingVectors;
+    private final List<ResultEntry> results;
+
+    public LuminaVectorGlobalIndexWriter(
+            GlobalIndexFileWriter fileWriter,
+            DataType fieldType,
+            LuminaVectorIndexOptions options) {
+        this.fileWriter = fileWriter;
+        this.options = options;
+        this.dim = options.dimension();
+        int configuredSize = options.sizePerIndex();
+        long buildMemoryLimit = options.buildMemoryLimit();
+        int maxByDim =
+                (int) Math.min(configuredSize, buildMemoryLimit / ((long) dim 
* Float.BYTES));
+        this.sizePerIndex = Math.max(maxByDim, 1);
+        this.pendingVectors = new ArrayList<>();
+        this.results = new ArrayList<>();
+
+        validateFieldType(fieldType);
+    }
+
+    private void validateFieldType(DataType dataType) {
+        if (!(dataType instanceof ArrayType)) {
+            throw new IllegalArgumentException(
+                    "Lumina vector index requires ArrayType, but got: " + 
dataType);
+        }
+        DataType elementType = ((ArrayType) dataType).getElementType();
+        if (!(elementType instanceof FloatType)) {
+            throw new IllegalArgumentException(
+                    "Lumina vector index requires float array, but got: " + 
elementType);
+        }
+    }
+
+    @Override
+    public void write(Object fieldData) {
+        float[] vector;
+        if (fieldData instanceof float[]) {
+            vector = ((float[]) fieldData).clone();
+        } else if (fieldData instanceof InternalArray) {
+            vector = ((InternalArray) fieldData).toFloatArray();
+        } else {
+            throw new RuntimeException(
+                    "Unsupported vector type: " + 
fieldData.getClass().getName());
+        }
+        checkDimension(vector);
+
+        pendingVectors.add(vector);
+        currentIndexMinId = Math.min(currentIndexMinId, count);
+        currentIndexMaxId = Math.max(currentIndexMaxId, count);
+        count++;
+
+        try {
+            if (pendingVectors.size() >= sizePerIndex) {
+                buildAndFlushIndex();
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public List<ResultEntry> finish() {
+        try {
+            if (!pendingVectors.isEmpty()) {
+                buildAndFlushIndex();
+            }
+            return results;
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to write Lumina vector global 
index", e);
+        }
+    }
+
+    /**
+     * Build a complete DiskANN index from the accumulated vectors: create 
DirectByteBuffers on
+     * demand, pretrain, insert all vectors in a single batch, dump directly 
to the output stream,
+     * and release buffers.
+     */
+    private void buildAndFlushIndex() throws IOException {
+        int n = pendingVectors.size();
+        if (n == 0) {
+            return;
+        }
+
+        LuminaIndex index = createIndex();
+
+        try {
+            // Build the full vector buffer from accumulated vectors.
+            ByteBuffer vectorBuffer = buildVectorBuffer(pendingVectors);
+            ByteBuffer idBuffer = buildIdBuffer(n, currentIndexMinId);
+
+            // Pretrain phase.
+            int trainingSize = Math.min(n, options.trainingSize());
+            if (trainingSize == n) {
+                index.pretrain(vectorBuffer, n);
+            } else {
+                int[] sampleIndices = reservoirSample(n, trainingSize);
+                ByteBuffer trainingBuffer = 
LuminaIndex.allocateVectorBuffer(trainingSize, dim);
+                FloatBuffer trainingView = trainingBuffer.asFloatBuffer();
+                for (int i = 0; i < trainingSize; i++) {
+                    trainingView.put(pendingVectors.get(sampleIndices[i]));
+                }
+                index.pretrain(trainingBuffer, trainingSize);
+            }
+
+            // Insert phase.
+            index.insertBatch(vectorBuffer, idBuffer, n);
+
+            // Dump to output stream — direct streaming, no temp files.
+            String fileName = fileWriter.newFileName(FILE_NAME_PREFIX);
+            try (PositionOutputStream out = 
fileWriter.newOutputStream(fileName)) {
+                index.dump(new OutputStreamFileOutput(out));
+                out.flush();
+            }
+
+            LuminaIndexMeta meta =
+                    new LuminaIndexMeta(
+                            dim,
+                            options.metric().getValue(),
+                            options.indexType().name(),
+                            n,
+                            currentIndexMinId,
+                            currentIndexMaxId);
+            results.add(new ResultEntry(fileName, n, meta.serialize()));
+        } finally {
+            index.close();
+        }

Review Comment:
   In `buildAndFlushIndex`, `index.close()` is unconditionally called in 
`finally`. If `createIndex()` throws (e.g., native library missing/invalid 
options), `index` will be null and the `finally` will throw an NPE masking the 
original failure. Guard the close or restructure so `index` is only closed when 
successfully created.



##########
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import scala.collection.JavaConverters._
+
+/** Tests for Lumina vector index read/write operations. */
+class LuminaVectorIndexTest extends PaimonSparkTestBase {
+
+  private val indexType = "lumina-vector-ann"
+  private val defaultOptions = "vector.dim=3,vector.index-type=DISKANN"
+
+  // ========== Index Creation Tests ==========

Review Comment:
   Unlike the JUnit Lumina tests (which skip when the native library can't be 
loaded), this Spark suite has no guard and will fail if `lumina-jni` natives 
aren’t available in the test runtime. Please add a suite-level 
`assume(...)`/skip that attempts `Lumina.loadLibrary()` and skips when 
unavailable, to keep CI/test runs reliable on environments without the native 
binaries.



##########
paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexWriter.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.FloatType;
+
+import org.aliyun.lumina.LuminaFileOutput;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.FloatBuffer;
+import java.nio.LongBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+/** Vector global index writer using Lumina. */
+public class LuminaVectorGlobalIndexWriter implements 
GlobalIndexSingletonWriter, Closeable {
+
+    private static final String FILE_NAME_PREFIX = "lumina";
+
+    private final GlobalIndexFileWriter fileWriter;
+    private final LuminaVectorIndexOptions options;
+    private final int sizePerIndex;
+    private final int dim;
+
+    private long count = 0; // monotonically increasing global row ID across 
all index files
+    private long currentIndexMinId = Long.MAX_VALUE;
+    private long currentIndexMaxId = Long.MIN_VALUE;
+    private List<float[]> pendingVectors;
+    private final List<ResultEntry> results;
+
+    public LuminaVectorGlobalIndexWriter(
+            GlobalIndexFileWriter fileWriter,
+            DataType fieldType,
+            LuminaVectorIndexOptions options) {
+        this.fileWriter = fileWriter;
+        this.options = options;
+        this.dim = options.dimension();
+        int configuredSize = options.sizePerIndex();
+        long buildMemoryLimit = options.buildMemoryLimit();
+        int maxByDim =
+                (int) Math.min(configuredSize, buildMemoryLimit / ((long) dim 
* Float.BYTES));
+        this.sizePerIndex = Math.max(maxByDim, 1);
+        this.pendingVectors = new ArrayList<>();
+        this.results = new ArrayList<>();
+
+        validateFieldType(fieldType);
+    }
+
+    private void validateFieldType(DataType dataType) {
+        if (!(dataType instanceof ArrayType)) {
+            throw new IllegalArgumentException(
+                    "Lumina vector index requires ArrayType, but got: " + 
dataType);
+        }
+        DataType elementType = ((ArrayType) dataType).getElementType();
+        if (!(elementType instanceof FloatType)) {
+            throw new IllegalArgumentException(
+                    "Lumina vector index requires float array, but got: " + 
elementType);
+        }
+    }
+
+    @Override
+    public void write(Object fieldData) {
+        float[] vector;
+        if (fieldData instanceof float[]) {
+            vector = ((float[]) fieldData).clone();
+        } else if (fieldData instanceof InternalArray) {
+            vector = ((InternalArray) fieldData).toFloatArray();
+        } else {
+            throw new RuntimeException(
+                    "Unsupported vector type: " + 
fieldData.getClass().getName());
+        }

Review Comment:
   `GlobalIndexSingletonWriter.write` accepts `@Nullable` values, but this 
implementation dereferences `fieldData` in the error path 
(`fieldData.getClass()`) and will NPE on null vectors. Please handle `null` 
explicitly (e.g., skip indexing for null vectors or throw a clear 
IllegalArgumentException).



##########
paimon-lumina/pom.xml:
##########
@@ -0,0 +1,100 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>paimon-parent</artifactId>
+        <groupId>org.apache.paimon</groupId>
+        <version>1.4-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>paimon-lumina</artifactId>
+    <name>Paimon : Lumina Index</name>
+
+    <repositories>
+        <repository>
+            <id>lumina</id>
+            
<url>https://lumina-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/</url>
+        </repository>
+    </repositories>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>

Review Comment:
   `paimon-common` is currently a compile-scope dependency here. Other optional 
integration modules (e.g., `paimon-lance`) declare `paimon-common` as 
`provided` to avoid bundling Paimon core classes into the plugin JAR and 
creating version conflicts. Consider aligning by switching this dependency to 
`provided` (and adding test-jar deps if needed for tests).



##########
paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexReader.java:
##########
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.VectorSearch;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import org.aliyun.lumina.LuminaFileInput;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.PriorityQueue;
+
+/**
+ * Vector global index reader using Lumina.
+ *
+ * <p>This reader loads Lumina indices from global index files and performs 
vector similarity
+ * search.
+ */
+public class LuminaVectorGlobalIndexReader implements GlobalIndexReader {
+
+    private final LuminaIndex[] indices;
+    private final LuminaIndexMeta[] indexMetas;
+    private final List<SeekableInputStream> openStreams;
+    private final List<GlobalIndexIOMeta> ioMetas;
+    private final GlobalIndexFileReader fileReader;
+    private final DataType fieldType;
+    private final LuminaVectorIndexOptions options;
+    private volatile boolean metasLoaded = false;
+    private volatile boolean indicesLoaded = false;
+
+    public LuminaVectorGlobalIndexReader(
+            GlobalIndexFileReader fileReader,
+            List<GlobalIndexIOMeta> ioMetas,
+            DataType fieldType,
+            LuminaVectorIndexOptions options) {
+        this.fileReader = fileReader;
+        this.ioMetas = ioMetas;
+        this.fieldType = fieldType;
+        this.options = options;
+        this.indices = new LuminaIndex[ioMetas.size()];
+        this.indexMetas = new LuminaIndexMeta[ioMetas.size()];
+        this.openStreams = Collections.synchronizedList(new ArrayList<>());
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitVectorSearch(VectorSearch 
vectorSearch) {
+        try {
+            ensureLoadMetas();
+
+            RoaringNavigableMap64 includeRowIds = vectorSearch.includeRowIds();
+
+            // Fix #12: Capture loaded indices snapshot under synchronized to 
ensure visibility.
+            LuminaIndex[] loadedIndices;
+            LuminaIndexMeta[] loadedMetas;
+
+            if (includeRowIds != null) {

Review Comment:
   The inline comments like "Fix #12" / "Fix #3" read like references to an 
external/internal issue tracker and make the production code harder to 
maintain. Please replace them with neutral explanatory comments (or remove if 
redundant) that don’t rely on issue numbers for context.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to