Copilot commented on code in PR #7330:
URL: https://github.com/apache/paimon/pull/7330#discussion_r2902866446


##########
paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorIndexOptions.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+
+/** Options for Lumina vector index. */
+public class LuminaVectorIndexOptions {
+
+    public static final ConfigOption<Integer> VECTOR_DIM =
+            ConfigOptions.key("vector.dim")
+                    .intType()
+                    .defaultValue(128)
+                    .withDescription("The dimension of the vector");
+
+    public static final ConfigOption<LuminaVectorMetric> VECTOR_METRIC =
+            ConfigOptions.key("vector.metric")
+                    .enumType(LuminaVectorMetric.class)
+                    .defaultValue(LuminaVectorMetric.L2)
+                    .withDescription(
+                            "The distance metric for vector search (L2, 
COSINE, INNER_PRODUCT)");
+
+    public static final ConfigOption<LuminaIndexType> VECTOR_INDEX_TYPE =
+            ConfigOptions.key("vector.index-type")
+                    .enumType(LuminaIndexType.class)
+                    .defaultValue(LuminaIndexType.DISKANN)
+                    .withDescription("The type of Lumina index (DISKANN)");
+
+    public static final ConfigOption<String> VECTOR_ENCODING_TYPE =
+            ConfigOptions.key("vector.encoding-type")
+                    .stringType()
+                    .defaultValue("rawf32")
+                    .withDescription("The encoding type for vectors (rawf32, 
sq8, pq)");
+
+    public static final ConfigOption<Integer> VECTOR_SIZE_PER_INDEX =
+            ConfigOptions.key("vector.size-per-index")
+                    .intType()
+                    .defaultValue(2_000_000)
+                    .withDescription("The number of vectors stored in each 
index file");
+
+    public static final ConfigOption<Integer> VECTOR_TRAINING_SIZE =
+            ConfigOptions.key("vector.training-size")
+                    .intType()
+                    .defaultValue(500_000)
+                    .withDescription(
+                            "The number of vectors to use for pretraining 
DiskANN indices");
+
+    public static final ConfigOption<Integer> VECTOR_SEARCH_FACTOR =
+            ConfigOptions.key("vector.search-factor")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription(
+                            "The multiplier for the search limit when 
filtering is applied");
+
+    public static final ConfigOption<Boolean> VECTOR_NORMALIZE =
+            ConfigOptions.key("vector.normalize")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to L2 normalize vectors before indexing 
and searching");
+
+    public static final ConfigOption<Integer> VECTOR_DISKANN_SEARCH_LIST_SIZE =
+            ConfigOptions.key("vector.diskann.search-list-size")
+                    .intType()
+                    .defaultValue(100)
+                    .withDescription("The search list size for DiskANN search 
(list_size)");
+
+    public static final ConfigOption<Double> PRETRAIN_SAMPLE_RATIO =
+            ConfigOptions.key("vector.pretrain-sample-ratio")
+                    .doubleType()
+                    .defaultValue(1.0)
+                    .withDescription(
+                            "The sample ratio for pretraining (Lumina's 
pretrain.sample_ratio)");
+
+    private final int dimension;
+    private final LuminaVectorMetric metric;
+    private final LuminaIndexType indexType;
+    private final String encodingType;
+    private final int sizePerIndex;
+    private final int trainingSize;
+    private final int searchFactor;
+    private final int searchListSize;
+    private final boolean normalize;
+    private final double pretrainSampleRatio;
+
+    public LuminaVectorIndexOptions(Options options) {
+        this.dimension = options.get(VECTOR_DIM);
+        this.metric = options.get(VECTOR_METRIC);
+        this.indexType = options.get(VECTOR_INDEX_TYPE);
+        this.encodingType = options.get(VECTOR_ENCODING_TYPE);
+
+        int sizePerIndexValue = options.get(VECTOR_SIZE_PER_INDEX);
+        if (sizePerIndexValue <= 0) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Invalid value for '%s': %d. Must be a positive 
integer.",
+                            VECTOR_SIZE_PER_INDEX.key(), sizePerIndexValue));
+        }
+        this.sizePerIndex = sizePerIndexValue;
+
+        this.trainingSize = options.get(VECTOR_TRAINING_SIZE);
+        this.searchFactor = options.get(VECTOR_SEARCH_FACTOR);
+        this.searchListSize = options.get(VECTOR_DISKANN_SEARCH_LIST_SIZE);
+        this.normalize = options.get(VECTOR_NORMALIZE);
+        this.pretrainSampleRatio = options.get(PRETRAIN_SAMPLE_RATIO);

Review Comment:
   Several numeric options are read without validation (e.g., 
`vector.training-size`, `vector.search-factor`, 
`vector.diskann.search-list-size`, `vector.pretrain-sample-ratio`). Invalid 
values like 0/negative can lead to invalid native calls (e.g., `pretrain(..., 
0)`) or unexpected allocations. Add bounds checks with clear 
IllegalArgumentException messages similar to `vector.size-per-index`.
   ```suggestion
           int trainingSizeValue = options.get(VECTOR_TRAINING_SIZE);
           if (trainingSizeValue <= 0) {
               throw new IllegalArgumentException(
                       String.format(
                               "Invalid value for '%s': %d. Must be a positive 
integer.",
                               VECTOR_TRAINING_SIZE.key(), trainingSizeValue));
           }
           this.trainingSize = trainingSizeValue;
   
           int searchFactorValue = options.get(VECTOR_SEARCH_FACTOR);
           if (searchFactorValue <= 0) {
               throw new IllegalArgumentException(
                       String.format(
                               "Invalid value for '%s': %d. Must be a positive 
integer.",
                               VECTOR_SEARCH_FACTOR.key(), searchFactorValue));
           }
           this.searchFactor = searchFactorValue;
   
           int searchListSizeValue = 
options.get(VECTOR_DISKANN_SEARCH_LIST_SIZE);
           if (searchListSizeValue <= 0) {
               throw new IllegalArgumentException(
                       String.format(
                               "Invalid value for '%s': %d. Must be a positive 
integer.",
                               VECTOR_DISKANN_SEARCH_LIST_SIZE.key(), 
searchListSizeValue));
           }
           this.searchListSize = searchListSizeValue;
   
           this.normalize = options.get(VECTOR_NORMALIZE);
   
           double pretrainSampleRatioValue = options.get(PRETRAIN_SAMPLE_RATIO);
           if (pretrainSampleRatioValue <= 0.0 || pretrainSampleRatioValue > 
1.0) {
               throw new IllegalArgumentException(
                       String.format(
                               "Invalid value for '%s': %s. Must be > 0.0 and 
<= 1.0.",
                               PRETRAIN_SAMPLE_RATIO.key(), 
pretrainSampleRatioValue));
           }
           this.pretrainSampleRatio = pretrainSampleRatioValue;
   ```



##########
paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexReader.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.VectorSearch;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.UUID;
+
+/**
+ * Vector global index reader using Lumina.
+ *
+ * <p>This reader loads Lumina indices from global index files and performs 
vector similarity
+ * search.
+ */
+public class LuminaVectorGlobalIndexReader implements GlobalIndexReader {
+
+    /**
+     * Upper bound for searchK to prevent allocating excessively large arrays 
when includeRowIds
+     * cardinality is very high.
+     */
+    private static final int MAX_SEARCH_K = 100_000;
+
+    private final LuminaIndex[] indices;
+    private final LuminaIndexMeta[] indexMetas;
+    private final List<File> localIndexFiles;
+    private final List<GlobalIndexIOMeta> ioMetas;
+    private final GlobalIndexFileReader fileReader;
+    private final DataType fieldType;
+    private final LuminaVectorIndexOptions options;
+    private volatile boolean metasLoaded = false;
+    private volatile boolean indicesLoaded = false;
+
+    public LuminaVectorGlobalIndexReader(
+            GlobalIndexFileReader fileReader,
+            List<GlobalIndexIOMeta> ioMetas,
+            DataType fieldType,
+            LuminaVectorIndexOptions options) {
+        this.fileReader = fileReader;
+        this.ioMetas = ioMetas;
+        this.fieldType = fieldType;
+        this.options = options;
+        this.indices = new LuminaIndex[ioMetas.size()];
+        this.indexMetas = new LuminaIndexMeta[ioMetas.size()];
+        this.localIndexFiles = Collections.synchronizedList(new ArrayList<>());
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitVectorSearch(VectorSearch 
vectorSearch) {
+        try {
+            ensureLoadMetas();
+
+            RoaringNavigableMap64 includeRowIds = vectorSearch.includeRowIds();
+
+            if (includeRowIds != null) {
+                List<Integer> matchingIndices = new ArrayList<>();
+                for (int i = 0; i < indexMetas.length; i++) {
+                    LuminaIndexMeta meta = indexMetas[i];
+                    if (includeRowIds.containsRange(meta.minId(), 
meta.maxId())) {
+                        matchingIndices.add(i);
+                    }
+                }
+                if (matchingIndices.isEmpty()) {
+                    return Optional.empty();
+                }
+                ensureLoadIndices(matchingIndices);
+            } else {
+                ensureLoadAllIndices();
+            }
+
+            return Optional.ofNullable(search(vectorSearch));
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    String.format(
+                            "Failed to search Lumina vector index with 
fieldName=%s, limit=%d",
+                            vectorSearch.fieldName(), vectorSearch.limit()),
+                    e);
+        }
+    }
+
+    private GlobalIndexResult search(VectorSearch vectorSearch) throws 
IOException {
+        validateVectorType(vectorSearch.vector());
+        float[] queryVector = ((float[]) vectorSearch.vector()).clone();
+        if (options.normalize()) {
+            LuminaVectorUtils.normalizeL2(queryVector);
+        }
+        int limit = vectorSearch.limit();
+
+        PriorityQueue<ScoredRow> result =
+                new PriorityQueue<>(Comparator.comparingDouble(sr -> 
sr.score));
+
+        RoaringNavigableMap64 includeRowIds = vectorSearch.includeRowIds();
+
+        int searchK = limit;
+        if (includeRowIds != null) {
+            searchK =
+                    Math.min(
+                            MAX_SEARCH_K,
+                            Math.max(
+                                    limit * options.searchFactor(),
+                                    (int) includeRowIds.getLongCardinality()));
+        }

Review Comment:
   `searchK` computation casts `includeRowIds.getLongCardinality()` to `int`. 
If cardinality exceeds `Integer.MAX_VALUE`, the cast overflows and can 
incorrectly reduce `searchK` (instead of clamping to `MAX_SEARCH_K`). Compute 
with `long` and clamp to `[limit, MAX_SEARCH_K]` before casting.
   ```suggestion
           long searchKLong = limit;
           if (includeRowIds != null) {
               long limitTimesFactor = (long) limit * options.searchFactor();
               long includeCardinality = includeRowIds.getLongCardinality();
               long candidate = Math.max(limitTimesFactor, includeCardinality);
               // Ensure searchK is at least the requested limit
               candidate = Math.max(candidate, limit);
               long maxSearchKLong = MAX_SEARCH_K;
               searchKLong = Math.min(candidate, maxSearchKLong);
           }
           int searchK = (int) searchKLong;
   ```



##########
paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorIndexOptions.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+
+/** Options for Lumina vector index. */
+public class LuminaVectorIndexOptions {
+
+    public static final ConfigOption<Integer> VECTOR_DIM =
+            ConfigOptions.key("vector.dim")
+                    .intType()
+                    .defaultValue(128)
+                    .withDescription("The dimension of the vector");
+
+    public static final ConfigOption<LuminaVectorMetric> VECTOR_METRIC =
+            ConfigOptions.key("vector.metric")
+                    .enumType(LuminaVectorMetric.class)
+                    .defaultValue(LuminaVectorMetric.L2)
+                    .withDescription(
+                            "The distance metric for vector search (L2, 
COSINE, INNER_PRODUCT)");
+
+    public static final ConfigOption<LuminaIndexType> VECTOR_INDEX_TYPE =
+            ConfigOptions.key("vector.index-type")
+                    .enumType(LuminaIndexType.class)
+                    .defaultValue(LuminaIndexType.DISKANN)
+                    .withDescription("The type of Lumina index (DISKANN)");
+
+    public static final ConfigOption<String> VECTOR_ENCODING_TYPE =
+            ConfigOptions.key("vector.encoding-type")
+                    .stringType()
+                    .defaultValue("rawf32")
+                    .withDescription("The encoding type for vectors (rawf32, 
sq8, pq)");
+
+    public static final ConfigOption<Integer> VECTOR_SIZE_PER_INDEX =
+            ConfigOptions.key("vector.size-per-index")
+                    .intType()
+                    .defaultValue(2_000_000)
+                    .withDescription("The number of vectors stored in each 
index file");
+
+    public static final ConfigOption<Integer> VECTOR_TRAINING_SIZE =
+            ConfigOptions.key("vector.training-size")
+                    .intType()
+                    .defaultValue(500_000)
+                    .withDescription(
+                            "The number of vectors to use for pretraining 
DiskANN indices");
+
+    public static final ConfigOption<Integer> VECTOR_SEARCH_FACTOR =
+            ConfigOptions.key("vector.search-factor")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription(
+                            "The multiplier for the search limit when 
filtering is applied");
+
+    public static final ConfigOption<Boolean> VECTOR_NORMALIZE =
+            ConfigOptions.key("vector.normalize")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to L2 normalize vectors before indexing 
and searching");
+
+    public static final ConfigOption<Integer> VECTOR_DISKANN_SEARCH_LIST_SIZE =
+            ConfigOptions.key("vector.diskann.search-list-size")
+                    .intType()
+                    .defaultValue(100)
+                    .withDescription("The search list size for DiskANN search 
(list_size)");
+
+    public static final ConfigOption<Double> PRETRAIN_SAMPLE_RATIO =
+            ConfigOptions.key("vector.pretrain-sample-ratio")
+                    .doubleType()
+                    .defaultValue(1.0)
+                    .withDescription(
+                            "The sample ratio for pretraining (Lumina's 
pretrain.sample_ratio)");
+
+    private final int dimension;
+    private final LuminaVectorMetric metric;
+    private final LuminaIndexType indexType;
+    private final String encodingType;
+    private final int sizePerIndex;
+    private final int trainingSize;
+    private final int searchFactor;
+    private final int searchListSize;
+    private final boolean normalize;
+    private final double pretrainSampleRatio;
+
+    public LuminaVectorIndexOptions(Options options) {
+        this.dimension = options.get(VECTOR_DIM);

Review Comment:
   `vector.dim` is read without validation. A non-positive dimension will later 
lead to invalid Lumina builder/searcher setup and/or invalid buffer sizing. 
Consider validating `vector.dim > 0` up-front and failing fast with an 
IllegalArgumentException.
   ```suggestion
           int dimensionValue = options.get(VECTOR_DIM);
           if (dimensionValue <= 0) {
               throw new IllegalArgumentException(
                       String.format(
                               "Invalid value for '%s': %d. Must be a positive 
integer.",
                               VECTOR_DIM.key(), dimensionValue));
           }
           this.dimension = dimensionValue;
   ```



##########
paimon-lumina/pom.xml:
##########
@@ -0,0 +1,100 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>paimon-parent</artifactId>
+        <groupId>org.apache.paimon</groupId>
+        <version>1.4-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>paimon-lumina</artifactId>
+    <name>Paimon : Lumina Index</name>
+
+    <repositories>
+        <repository>
+            <id>lumina</id>
+            
<url>https://lumina-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/</url>
+        </repository>
+    </repositories>

Review Comment:
   This module adds a custom Maven repository 
(`https://lumina-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/`). This has 
build reproducibility and supply-chain implications (and may violate 
ASF/release expectations if artifacts aren't in Maven Central / ASF repos). If 
possible, depend on artifacts published to Maven Central (or an ASF-managed 
repo), or gate this repository behind an explicit Maven profile so default 
builds don’t rely on an extra remote repository.
   ```suggestion
       <profiles>
           <profile>
               <id>lumina-repo</id>
               <activation>
                   <activeByDefault>false</activeByDefault>
               </activation>
               <repositories>
                   <repository>
                       <id>lumina</id>
                       
<url>https://lumina-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/</url>
                   </repository>
               </repositories>
           </profile>
       </profiles>
   ```



##########
paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexWriter.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.FloatType;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.FloatBuffer;
+import java.nio.LongBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Vector global index writer using Lumina.
+ *
+ * <p>Vectors are collected until the current index reaches {@code 
sizePerIndex} vectors, then
+ * pretrained, inserted in a single batch, and dumped to a file. DiskANN 
requires exactly one
+ * pretrain and one insertBatch call per index.
+ *
+ * <p>Each written vector is assigned a monotonically increasing 64-bit row ID 
({@code count}) that
+ * spans across all produced index files. The second index file's IDs 
therefore start from {@code
+ * sizePerIndex}, not from 0. The min/max IDs stored in {@link 
LuminaIndexMeta} reflect this global
+ * range, enabling the reader to skip index files that have no overlap with a 
given filter set.
+ */
+public class LuminaVectorGlobalIndexWriter implements 
GlobalIndexSingletonWriter, Closeable {
+
+    private final GlobalIndexFileWriter fileWriter;
+    private final LuminaVectorIndexOptions options;
+    private final int sizePerIndex;
+    private final int dim;
+    private final DataType fieldType;
+
+    private long count = 0; // monotonically increasing global row ID across 
all index files
+    private long currentIndexMinId = Long.MAX_VALUE;
+    private long currentIndexMaxId = Long.MIN_VALUE;
+    private final List<VectorEntry> pendingBatch;
+    private final List<ResultEntry> results;
+
+    public LuminaVectorGlobalIndexWriter(
+            GlobalIndexFileWriter fileWriter,
+            DataType fieldType,
+            LuminaVectorIndexOptions options) {
+        this.fileWriter = fileWriter;
+        this.fieldType = fieldType;
+        this.options = options;
+        this.sizePerIndex = options.sizePerIndex();
+        this.dim = options.dimension();
+        this.pendingBatch = new ArrayList<>();
+        this.results = new ArrayList<>();
+
+        validateFieldType(fieldType);
+    }
+
+    private void validateFieldType(DataType dataType) {
+        if (!(dataType instanceof ArrayType)) {
+            throw new IllegalArgumentException(
+                    "Lumina vector index requires ArrayType, but got: " + 
dataType);
+        }
+        DataType elementType = ((ArrayType) dataType).getElementType();
+        if (!(elementType instanceof FloatType)) {
+            throw new IllegalArgumentException(
+                    "Lumina vector index requires float array, but got: " + 
elementType);
+        }
+    }
+
+    @Override
+    public void write(Object fieldData) {
+        float[] vector;
+        if (fieldData instanceof float[]) {
+            vector = (float[]) fieldData;
+        } else if (fieldData instanceof InternalArray) {
+            vector = ((InternalArray) fieldData).toFloatArray();
+        } else {
+            throw new RuntimeException(
+                    "Unsupported vector type: " + 
fieldData.getClass().getName());
+        }
+        checkDimension(vector);
+        if (options.normalize()) {
+            LuminaVectorUtils.normalizeL2(vector);
+        }
+        currentIndexMinId = Math.min(currentIndexMinId, count);
+        currentIndexMaxId = Math.max(currentIndexMaxId, count);
+        pendingBatch.add(new VectorEntry(count, vector));
+        count++;
+
+        try {
+            if (pendingBatch.size() >= sizePerIndex) {
+                buildAndFlushIndex();
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);

Review Comment:
   `loadIndexAt` wraps IOExceptions, but `write` rethrows them as `new 
RuntimeException(e)` without any context. Please include a descriptive message 
(e.g., that flushing/building the Lumina index failed and which stage/file was 
involved) to make failures diagnosable in production logs.
   ```suggestion
               throw new RuntimeException(
                       "Failed to build or flush Lumina vector global index 
during write", e);
   ```



##########
paimon-lumina/README.md:
##########
@@ -0,0 +1,36 @@
+## Paimon Lumina
+
+This module integrates 
[Lumina](https://github.com/alibaba/paimon-cpp/tree/main/third_party/lumina)
+as a vector index for Apache Paimon's global index framework.
+
+Lumina vector search library is derived from an internal repository maintained 
by
+Alibaba Storage Service Team. It is accessed via JNI through the `lumina-jni` 
artifact.
+
+### Supported Index Types
+
+| Index Type | Description |
+|------------|-------------|
+| **DISKANN** | DiskANN graph-based index (default) |
+

Review Comment:
   The markdown tables use a leading double pipe (`|| ...`) which renders as an 
extra empty column in most markdown renderers. Switch to standard table syntax 
with a single leading pipe (`| ...`) for proper formatting.



##########
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import scala.collection.JavaConverters._
+
+/** Tests for Lumina vector index read/write operations. */
+class LuminaVectorIndexTest extends PaimonSparkTestBase {
+
+  private val indexType = "lumina-vector-ann"
+  private val defaultOptions = "vector.dim=3,vector.index-type=DISKANN"
+

Review Comment:
   These Spark UTs don’t guard against missing Lumina native libraries (unlike 
the JUnit tests in `paimon-lumina`, which call `Lumina.loadLibrary()` and 
`assumeTrue(false, ...)` on failure). Without a similar check/skip here, CI 
environments lacking the native library will fail hard with 
UnsatisfiedLinkError. Add a class-level setup that attempts to load Lumina and 
skips the suite when unavailable.



##########
paimon-common/src/main/java/org/apache/paimon/utils/RoaringNavigableMap64.java:
##########
@@ -142,6 +142,22 @@ public List<Range> toRangeList() {
         return Range.toRanges(roaring64NavigableMap::iterator);
     }
 
+    /**
+     * Returns true if there is at least one value in the range [minId, maxId] 
(inclusive on both
+     * ends) contained in this bitmap.
+     *
+     * <p>Uses {@code rankLong} for O(log N) performance instead of iterating 
all values.
+     */
+    public boolean containsRange(long minId, long maxId) {
+        if (minId > maxId) {
+            throw new IllegalArgumentException(
+                    "minId (" + minId + ") must be <= maxId (" + maxId + ")");
+        }
+        long countUpToMax = roaring64NavigableMap.rankLong(maxId);
+        long countBeforeMin = minId > 0 ? roaring64NavigableMap.rankLong(minId 
- 1) : 0;

Review Comment:
   `containsRange` is incorrect for negative `minId` (and for `minId == 
Long.MIN_VALUE` it also avoids evaluating `minId - 1`). This can produce false 
positives/negatives when the range starts below 0. Consider computing 
`countBeforeMin` via `rankLong(minId - 1)` for all `minId` except 
`Long.MIN_VALUE` (where it must be 0), instead of the current `minId > 0` 
shortcut.
   ```suggestion
           long countBeforeMin =
                   (minId == Long.MIN_VALUE) ? 0 : 
roaring64NavigableMap.rankLong(minId - 1);
   ```



##########
paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexReader.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.VectorSearch;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.UUID;
+
+/**
+ * Vector global index reader using Lumina.
+ *
+ * <p>This reader loads Lumina indices from global index files and performs 
vector similarity
+ * search.
+ */
+public class LuminaVectorGlobalIndexReader implements GlobalIndexReader {
+
+    /**
+     * Upper bound for searchK to prevent allocating excessively large arrays 
when includeRowIds
+     * cardinality is very high.
+     */
+    private static final int MAX_SEARCH_K = 100_000;
+
+    private final LuminaIndex[] indices;
+    private final LuminaIndexMeta[] indexMetas;
+    private final List<File> localIndexFiles;
+    private final List<GlobalIndexIOMeta> ioMetas;
+    private final GlobalIndexFileReader fileReader;
+    private final DataType fieldType;
+    private final LuminaVectorIndexOptions options;
+    private volatile boolean metasLoaded = false;
+    private volatile boolean indicesLoaded = false;
+
+    public LuminaVectorGlobalIndexReader(
+            GlobalIndexFileReader fileReader,
+            List<GlobalIndexIOMeta> ioMetas,
+            DataType fieldType,
+            LuminaVectorIndexOptions options) {
+        this.fileReader = fileReader;
+        this.ioMetas = ioMetas;
+        this.fieldType = fieldType;
+        this.options = options;
+        this.indices = new LuminaIndex[ioMetas.size()];
+        this.indexMetas = new LuminaIndexMeta[ioMetas.size()];
+        this.localIndexFiles = Collections.synchronizedList(new ArrayList<>());
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitVectorSearch(VectorSearch 
vectorSearch) {
+        try {
+            ensureLoadMetas();
+
+            RoaringNavigableMap64 includeRowIds = vectorSearch.includeRowIds();
+
+            if (includeRowIds != null) {
+                List<Integer> matchingIndices = new ArrayList<>();
+                for (int i = 0; i < indexMetas.length; i++) {
+                    LuminaIndexMeta meta = indexMetas[i];
+                    if (includeRowIds.containsRange(meta.minId(), 
meta.maxId())) {
+                        matchingIndices.add(i);
+                    }
+                }
+                if (matchingIndices.isEmpty()) {
+                    return Optional.empty();
+                }
+                ensureLoadIndices(matchingIndices);
+            } else {
+                ensureLoadAllIndices();
+            }
+
+            return Optional.ofNullable(search(vectorSearch));
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    String.format(
+                            "Failed to search Lumina vector index with 
fieldName=%s, limit=%d",
+                            vectorSearch.fieldName(), vectorSearch.limit()),
+                    e);
+        }
+    }
+
+    private GlobalIndexResult search(VectorSearch vectorSearch) throws 
IOException {
+        validateVectorType(vectorSearch.vector());
+        float[] queryVector = ((float[]) vectorSearch.vector()).clone();
+        if (options.normalize()) {
+            LuminaVectorUtils.normalizeL2(queryVector);
+        }
+        int limit = vectorSearch.limit();

Review Comment:
   There is no dimension check for the query vector before calling into Lumina 
native search. If a user passes a vector of the wrong length, this may fail 
with a native error or undefined behavior. Add an explicit check that 
`((float[]) vectorSearch.vector()).length` matches the index dimension (from 
`options.dimension()` and/or `LuminaIndexMeta.dim()`) and throw a clear 
IllegalArgumentException.



##########
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import scala.collection.JavaConverters._
+
+/** Tests for Lumina vector index read/write operations. */
+class LuminaVectorIndexTest extends PaimonSparkTestBase {
+
+  private val indexType = "lumina-vector-ann"
+  private val defaultOptions = "vector.dim=3,vector.index-type=DISKANN"
+
+  // ========== Index Creation Tests ==========
+
+  test("create lumina vector index - basic") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, v ARRAY<FLOAT>)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |""".stripMargin)
+
+      val values = (0 until 100)
+        .map(
+          i => s"($i, array(cast($i as float), cast(${i + 1} as float), 
cast(${i + 2} as float)))")
+        .mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      val output = spark
+        .sql(
+          s"CALL sys.create_global_index(table => 'test.T', index_column => 
'v', index_type => '$indexType', options => '$defaultOptions')")
+        .collect()
+        .head
+      assert(output.getBoolean(0))
+
+      val table = loadTable("T")
+      val indexEntries = table
+        .store()
+        .newIndexFileHandler()
+        .scanEntries()
+        .asScala
+        .filter(_.indexFile().indexType() == indexType)
+
+      assert(indexEntries.nonEmpty)
+      val totalRowCount = indexEntries.map(_.indexFile().rowCount()).sum
+      assert(totalRowCount == 100L)
+    }
+  }
+
+  test("create lumina vector index - with different index types") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, v ARRAY<FLOAT>)
+                  |TBLPROPERTIES (

Review Comment:
   Test name says "with different index types", but the test only creates a 
single index using the hard-coded `vector.index-type=DISKANN` (via 
`defaultOptions`). Either extend the test to actually cover multiple supported 
index types or rename it to match what it verifies.



##########
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import scala.collection.JavaConverters._
+
+/** Tests for Lumina vector index read/write operations. */
+class LuminaVectorIndexTest extends PaimonSparkTestBase {
+
+  private val indexType = "lumina-vector-ann"
+  private val defaultOptions = "vector.dim=3,vector.index-type=DISKANN"
+
+  // ========== Index Creation Tests ==========
+
+  test("create lumina vector index - basic") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, v ARRAY<FLOAT>)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |""".stripMargin)
+
+      val values = (0 until 100)
+        .map(
+          i => s"($i, array(cast($i as float), cast(${i + 1} as float), 
cast(${i + 2} as float)))")
+        .mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      val output = spark
+        .sql(
+          s"CALL sys.create_global_index(table => 'test.T', index_column => 
'v', index_type => '$indexType', options => '$defaultOptions')")
+        .collect()
+        .head
+      assert(output.getBoolean(0))
+
+      val table = loadTable("T")
+      val indexEntries = table
+        .store()
+        .newIndexFileHandler()
+        .scanEntries()
+        .asScala
+        .filter(_.indexFile().indexType() == indexType)
+
+      assert(indexEntries.nonEmpty)
+      val totalRowCount = indexEntries.map(_.indexFile().rowCount()).sum
+      assert(totalRowCount == 100L)
+    }
+  }
+
+  test("create lumina vector index - with different index types") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, v ARRAY<FLOAT>)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |""".stripMargin)
+
+      val values = (0 until 50)
+        .map(
+          i => s"($i, array(cast($i as float), cast(${i + 1} as float), 
cast(${i + 2} as float)))")
+        .mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      val output = spark
+        .sql(
+          s"CALL sys.create_global_index(table => 'test.T', index_column => 
'v', index_type => '$indexType', options => '$defaultOptions')")
+        .collect()
+        .head
+      assert(output.getBoolean(0))
+
+      val table = loadTable("T")
+      val indexEntries = table
+        .store()
+        .newIndexFileHandler()
+        .scanEntries()
+        .asScala
+        .filter(_.indexFile().indexType() == indexType)
+
+      assert(indexEntries.nonEmpty)
+    }
+  }
+
+  test("create lumina vector index - with partitioned table") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, v ARRAY<FLOAT>, pt STRING)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |  PARTITIONED BY (pt)
+                  |""".stripMargin)
+
+      var values = (0 until 500)
+        .map(
+          i =>
+            s"($i, array(cast($i as float), cast(${i + 1} as float), cast(${i 
+ 2} as float)), 'p0')")
+        .mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 300)
+        .map(
+          i =>
+            s"($i, array(cast($i as float), cast(${i + 1} as float), cast(${i 
+ 2} as float)), 'p1')")
+        .mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      val output = spark
+        .sql(
+          s"CALL sys.create_global_index(table => 'test.T', index_column => 
'v', index_type => '$indexType', options => '$defaultOptions')")
+        .collect()
+        .head
+      assert(output.getBoolean(0))
+
+      val table = loadTable("T")
+      val indexEntries = table
+        .store()
+        .newIndexFileHandler()
+        .scanEntries()
+        .asScala
+        .filter(_.indexFile().indexType() == indexType)
+
+      assert(indexEntries.nonEmpty)
+      val totalRowCount = indexEntries.map(_.indexFile().rowCount()).sum
+      assert(totalRowCount == 800L)
+    }
+  }
+
+  // ========== Index Write Tests ==========
+
+  test("write vectors - large dataset") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, v ARRAY<FLOAT>)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |""".stripMargin)
+
+      val values = (0 until 10000)
+        .map(
+          i => s"($i, array(cast($i as float), cast(${i + 1} as float), 
cast(${i + 2} as float)))")
+        .mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      val output = spark
+        .sql(
+          s"CALL sys.create_global_index(table => 'test.T', index_column => 
'v', index_type => '$indexType', options => '$defaultOptions')")
+        .collect()
+        .head
+      assert(output.getBoolean(0))
+
+      val table = loadTable("T")
+      val indexEntries = table
+        .store()
+        .newIndexFileHandler()
+        .scanEntries()
+        .asScala
+        .filter(_.indexFile().indexType() == indexType)
+
+      val totalRowCount = indexEntries.map(_.indexFile().rowCount()).sum
+      assert(totalRowCount == 10000L)
+    }
+  }
+
+  // ========== Index Read/Search Tests ==========
+
+  test("read vectors - basic search") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, v ARRAY<FLOAT>)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |""".stripMargin)
+
+      val values = (0 until 100)
+        .map(
+          i => s"($i, array(cast($i as float), cast(${i + 1} as float), 
cast(${i + 2} as float)))")
+        .mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      spark
+        .sql(
+          s"CALL sys.create_global_index(table => 'test.T', index_column => 
'v', index_type => '$indexType', options => '$defaultOptions')")
+        .collect()
+
+      val result = spark
+        .sql("""
+               |SELECT * FROM vector_search('T', 'v', array(50.0f, 51.0f, 
52.0f), 5)
+               |""".stripMargin)
+        .collect()
+      assert(result.length == 5)
+    }
+  }
+
+  test("read vectors - top-k search with different k values") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, v ARRAY<FLOAT>)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |""".stripMargin)
+
+      val values = (0 until 200)
+        .map(
+          i => s"($i, array(cast($i as float), cast(${i + 1} as float), 
cast(${i + 2} as float)))")
+        .mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      spark
+        .sql(
+          s"CALL sys.create_global_index(table => 'test.T', index_column => 
'v', index_type => '$indexType', options => '$defaultOptions')")
+        .collect()
+
+      // Test with k=1
+      var result = spark
+        .sql("""
+               |SELECT * FROM vector_search('T', 'v', array(100.0f, 101.0f, 
102.0f), 1)
+               |""".stripMargin)
+        .collect()
+      assert(result.length == 1)
+
+      // Test with k=10
+      result = spark
+        .sql("""
+               |SELECT * FROM vector_search('T', 'v', array(100.0f, 101.0f, 
102.0f), 10)
+               |""".stripMargin)
+        .collect()
+      assert(result.length == 10)
+    }
+  }
+
+  test("read vectors - multiple concurrent searches") {

Review Comment:
   This test is labeled "multiple concurrent searches", but the searches are 
executed sequentially on the same thread. Consider renaming the test (or 
implementing actual concurrency) to avoid giving a false sense of thread-safety 
coverage.
   ```suggestion
     test("read vectors - multiple searches with different queries") {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to