This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ed42a28f5e [lumina] Support null vectors in Lumina index writer (#7988)
ed42a28f5e is described below

commit ed42a28f5ec07a44ed378cb03e957a820d1115aa
Author: jerry <[email protected]>
AuthorDate: Wed May 27 19:18:19 2026 +0800

    [lumina] Support null vectors in Lumina index writer (#7988)
---
 .../org/apache/paimon/globalindex/ResultEntry.java |   3 +
 .../flink/globalindex/GenericIndexTopoBuilder.java |  27 ++-
 .../index/LuminaVectorGlobalIndexWriter.java       | 143 ++++++++++----
 .../paimon/lumina/index/FileBackedDatasetTest.java | 174 +++++++++++++++++
 .../lumina/index/LuminaVectorGlobalIndexTest.java  | 215 +++++++++++++++++++++
 5 files changed, 508 insertions(+), 54 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/ResultEntry.java 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/ResultEntry.java
index 69938fdbdf..6891f25b17 100644
--- a/paimon-common/src/main/java/org/apache/paimon/globalindex/ResultEntry.java
+++ b/paimon-common/src/main/java/org/apache/paimon/globalindex/ResultEntry.java
@@ -24,7 +24,10 @@ import javax.annotation.Nullable;
 public class ResultEntry {
 
     private final String fileName;
+
+    /** Total logical rows processed by the writer, including null/skipped 
rows. */
     private final long rowCount;
+
     @Nullable private final byte[] meta;
 
     public ResultEntry(String fileName, long rowCount, @Nullable byte[] meta) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
index e37970723c..5896503ce0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
@@ -600,7 +600,7 @@ public class GenericIndexTopoBuilder {
                             createIndexWriter(table, indexType, indexField, 
mergedOptions);
 
             try {
-                long rowsWritten = 0;
+                long rowsSeen = 0;
                 long lastRowId = Long.MIN_VALUE;
                 try (RecordReader<InternalRow> reader = 
tableRead.createReader(task.split);
                         CloseableIterator<InternalRow> iter = 
reader.toCloseableIterator()) {
@@ -627,34 +627,33 @@ public class GenericIndexTopoBuilder {
                         // Only write rows within this shard's range
                         if (currentRowId >= task.shardRange.from) {
                             Object fieldData = 
indexFieldGetter.getFieldOrNull(row);
-                            if (fieldData == null) {
-                                LOG.info(
-                                        "Null vector at rowId={}, stopping 
shard [{}, {}].",
-                                        currentRowId,
-                                        task.shardRange.from,
-                                        task.shardRange.to);
-                                break;
-                            }
                             indexWriter.write(fieldData);
-                            rowsWritten++;
+                            rowsSeen++;
                         }
                     }
                 }
 
                 List<ResultEntry> resultEntries = indexWriter.finish();
+                if (!resultEntries.isEmpty() && 
resultEntries.get(0).rowCount() != rowsSeen) {
+                    LOG.warn(
+                            "rowCount mismatch: writer reported {} but caller 
saw {} rows",
+                            resultEntries.get(0).rowCount(),
+                            rowsSeen);
+                }
                 long elapsed = System.currentTimeMillis() - startTime;
                 LOG.info(
-                        "Finished shard [{}, {}]: wrote {} rows, "
+                        "Finished shard [{}, {}]: saw {} rows, "
                                 + "produced {} result entries in {} ms.",
                         task.shardRange.from,
                         task.shardRange.to,
-                        rowsWritten,
+                        rowsSeen,
                         resultEntries.size(),
                         elapsed);
 
-                if (rowsWritten == 0) {
+                if (resultEntries.isEmpty()) {
                     LOG.warn(
-                            "Shard [{}, {}] produced 0 rows, skipping index 
flush.",
+                            "Shard [{}, {}] produced no index (all null or 
empty), "
+                                    + "skipping index flush.",
                             task.shardRange.from,
                             task.shardRange.to);
                     return;
diff --git 
a/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexWriter.java
 
b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexWriter.java
index 16058f5085..6e1a31902e 100644
--- 
a/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexWriter.java
+++ 
b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexWriter.java
@@ -80,15 +80,19 @@ public class LuminaVectorGlobalIndexWriter implements 
GlobalIndexSingletonWriter
     private final LuminaVectorIndexOptions options;
     private final int dim;
 
-    /** Temporary file storing vectors as raw native-order floats. */
+    /** Temporary file storing records as [rowId (long)][float * dim] in 
native byte order. */
     private File tempVectorFile;
 
     private FileChannel writeChannel;
     private ByteBuffer writeBuf;
 
-    private int count;
+    private final int recordSizeInBytes;
+    private final float[] vectorBuf;
+    private long count;
     private boolean closed;
 
+    private long logicalRowId;
+
     public LuminaVectorGlobalIndexWriter(
             GlobalIndexFileWriter fileWriter,
             DataType fieldType,
@@ -98,6 +102,8 @@ public class LuminaVectorGlobalIndexWriter implements 
GlobalIndexSingletonWriter
         this.dim = options.dimension();
         this.count = 0;
         this.closed = false;
+        this.recordSizeInBytes = checkedRecordSize(dim, IO_BUFFER_SIZE);
+        this.vectorBuf = new float[dim];
 
         validateFieldType(fieldType);
 
@@ -139,26 +145,46 @@ public class LuminaVectorGlobalIndexWriter implements 
GlobalIndexSingletonWriter
     @Override
     public void write(Object fieldData) {
         if (fieldData == null) {
-            throw new IllegalArgumentException("Field data must not be null");
+            logicalRowId++;
+            return;
         }
 
-        int bytesNeeded = dim * Float.BYTES;
-        if (writeBuf.remaining() < bytesNeeded) {
+        // Validation must complete before any buffer/state mutation below
+        float[] src = materializeAndValidate(fieldData);
+
+        if (writeBuf.remaining() < recordSizeInBytes) {
             flushWriteBuffer();
         }
+        writeBuf.putLong(logicalRowId);
+        for (int i = 0; i < dim; i++) {
+            writeBuf.putFloat(src[i]);
+        }
+        logicalRowId++;
+        count++;
+    }
 
+    /**
+     * Validates the vector and returns a float[] ready to write. For float[] 
input, returns the
+     * input directly (zero-copy). For InternalVector/InternalArray, reads 
into the reusable
+     * vectorBuf field.
+     */
+    private float[] materializeAndValidate(Object fieldData) {
         if (fieldData instanceof float[]) {
             float[] vector = (float[]) fieldData;
             checkDimension(vector.length);
             for (int i = 0; i < dim; i++) {
-                writeBuf.putFloat(vector[i]);
+                checkFinite(vector[i], i);
             }
+            return vector;
         } else if (fieldData instanceof InternalVector) {
             InternalVector vector = (InternalVector) fieldData;
             checkDimension(vector.size());
             for (int i = 0; i < dim; i++) {
-                writeBuf.putFloat(vector.getFloat(i));
+                float v = vector.getFloat(i);
+                checkFinite(v, i);
+                vectorBuf[i] = v;
             }
+            return vectorBuf;
         } else if (fieldData instanceof InternalArray) {
             InternalArray array = (InternalArray) fieldData;
             checkDimension(array.size());
@@ -166,13 +192,15 @@ public class LuminaVectorGlobalIndexWriter implements 
GlobalIndexSingletonWriter
                 if (array.isNullAt(i)) {
                     throw new IllegalArgumentException("Vector element at 
index " + i + " is null");
                 }
-                writeBuf.putFloat(array.getFloat(i));
+                float v = array.getFloat(i);
+                checkFinite(v, i);
+                vectorBuf[i] = v;
             }
+            return vectorBuf;
         } else {
             throw new RuntimeException(
                     "Unsupported vector type: " + 
fieldData.getClass().getName());
         }
-        count++;
     }
 
     private void flushWriteBuffer() {
@@ -191,6 +219,9 @@ public class LuminaVectorGlobalIndexWriter implements 
GlobalIndexSingletonWriter
     public List<ResultEntry> finish() {
         try {
             if (count == 0) {
+                writeChannel.close();
+                writeChannel = null;
+                writeBuf = null;
                 return Collections.emptyList();
             }
             // Flush remaining data and close the write channel
@@ -258,7 +289,8 @@ public class LuminaVectorGlobalIndexWriter implements 
GlobalIndexSingletonWriter
                     System.currentTimeMillis() - buildStart);
 
             LuminaIndexMeta meta = new 
LuminaIndexMeta(options.toLuminaOptions());
-            return new ResultEntry(fileName, count, meta.serialize());
+            // rowCount = logical rows including nulls (not just indexed 
vectors)
+            return new ResultEntry(fileName, logicalRowId, meta.serialize());
         }
     }
 
@@ -322,6 +354,27 @@ public class LuminaVectorGlobalIndexWriter implements 
GlobalIndexSingletonWriter
         }
     }
 
+    private void checkFinite(float value, int elementIndex) {
+        if (!Float.isFinite(value)) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Vector element at rowId=%d, index=%d is %s",
+                            logicalRowId, elementIndex, 
Float.toString(value)));
+        }
+    }
+
+    private static int checkedRecordSize(int dim, int bufferCapacity) {
+        long recordSize = Long.BYTES + (long) dim * Float.BYTES;
+        if (recordSize > bufferCapacity || recordSize > Integer.MAX_VALUE) {
+            throw new IllegalStateException(
+                    "Vector record size "
+                            + recordSize
+                            + " exceeds buffer capacity "
+                            + bufferCapacity);
+        }
+        return (int) recordSize;
+    }
+
     @Override
     public void close() {
         if (!closed) {
@@ -348,19 +401,26 @@ public class LuminaVectorGlobalIndexWriter implements 
GlobalIndexSingletonWriter
         private final RandomAccessFile raf;
         private final FileChannel channel;
         private final int dim;
-        private final int totalCount;
-        private int cursor;
+        private final long totalCount;
+        private final int recordSizeInBytes;
+        private long cursor;
         private final ByteBuffer readBuf;
         private final String phase;
         private int lastLoggedPercent;
 
-        FileBackedDataset(File file, int dim, int totalCount, String phase) 
throws IOException {
+        FileBackedDataset(File file, int dim, long totalCount, String phase) 
throws IOException {
+            this(file, dim, totalCount, phase, IO_BUFFER_SIZE);
+        }
+
+        FileBackedDataset(File file, int dim, long totalCount, String phase, 
int bufferSize)
+                throws IOException {
             this.raf = new RandomAccessFile(file, "r");
             this.channel = raf.getChannel();
             this.dim = dim;
             this.totalCount = totalCount;
+            this.recordSizeInBytes = checkedRecordSize(dim, bufferSize);
             this.cursor = 0;
-            this.readBuf = ByteBuffer.allocateDirect(IO_BUFFER_SIZE);
+            this.readBuf = ByteBuffer.allocateDirect(bufferSize);
             this.readBuf.order(ByteOrder.nativeOrder());
             this.readBuf.limit(0); // empty initially
             this.phase = phase;
@@ -382,43 +442,26 @@ public class LuminaVectorGlobalIndexWriter implements 
GlobalIndexSingletonWriter
             if (cursor >= totalCount) {
                 return 0;
             }
-            int remaining = totalCount - cursor;
+            long remaining = totalCount - cursor;
             int maxByVectorBuf = vectorBuf.length / dim;
-            int batchSize = Math.min(Math.min(maxByVectorBuf, idBuf.length), 
remaining);
+            int batchSize = (int) Math.min(Math.min(maxByVectorBuf, 
idBuf.length), remaining);
 
-            int floatsNeeded = batchSize * dim;
-            int destOffset = 0;
             try {
-                while (destOffset < floatsNeeded) {
-                    if (readBuf.remaining() < Float.BYTES) {
-                        // Compact any partial float bytes and refill
-                        readBuf.compact();
-                        int bytesRead = channel.read(readBuf);
-                        readBuf.flip();
-                        if (bytesRead == -1 && readBuf.remaining() < 
Float.BYTES) {
-                            throw new IOException(
-                                    "Unexpected end of temp file: read "
-                                            + destOffset
-                                            + " floats but need "
-                                            + floatsNeeded);
-                        }
+                for (int i = 0; i < batchSize; i++) {
+                    ensureAvailable(recordSizeInBytes);
+                    idBuf[i] = readBuf.getLong();
+                    int base = i * dim;
+                    for (int d = 0; d < dim; d++) {
+                        vectorBuf[base + d] = readBuf.getFloat();
                     }
-                    int availableFloats = readBuf.remaining() / Float.BYTES;
-                    int toRead = Math.min(availableFloats, floatsNeeded - 
destOffset);
-                    readBuf.asFloatBuffer().get(vectorBuf, destOffset, toRead);
-                    readBuf.position(readBuf.position() + toRead * 
Float.BYTES);
-                    destOffset += toRead;
                 }
             } catch (IOException e) {
                 throw new RuntimeException("Failed to read vectors from temp 
file", e);
             }
 
-            for (int i = 0; i < batchSize; i++) {
-                idBuf[i] = cursor + i;
-            }
             cursor += batchSize;
 
-            int percent = (int) ((long) cursor * 100 / totalCount);
+            int percent = (int) (cursor * 100 / totalCount);
             if (percent / 10 > lastLoggedPercent / 10) {
                 LOG.info(
                         "Lumina {} progress: {}/{} vectors ({}%)",
@@ -429,6 +472,26 @@ public class LuminaVectorGlobalIndexWriter implements 
GlobalIndexSingletonWriter
             return batchSize;
         }
 
+        private void ensureAvailable(int minBytes) throws IOException {
+            int zeroReadCount = 0;
+            while (readBuf.remaining() < minBytes) {
+                readBuf.compact();
+                int bytesRead = channel.read(readBuf);
+                readBuf.flip();
+                if (bytesRead == -1) {
+                    throw new IOException("Unexpected end of temp file");
+                }
+                if (bytesRead == 0) {
+                    if (++zeroReadCount > 100) {
+                        throw new IOException(
+                                "Unable to read from temp file: repeated 
zero-byte reads");
+                    }
+                } else {
+                    zeroReadCount = 0;
+                }
+            }
+        }
+
         @Override
         public void close() throws IOException {
             channel.close();
diff --git 
a/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/FileBackedDatasetTest.java
 
b/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/FileBackedDatasetTest.java
new file mode 100644
index 0000000000..3fe87e1a16
--- /dev/null
+++ 
b/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/FileBackedDatasetTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test for {@link LuminaVectorGlobalIndexWriter.FileBackedDataset} temp 
file format. */
+public class FileBackedDatasetTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    public void testBasicReadback() throws IOException {
+        int dim = 3;
+        int numVectors = 4;
+        long[] rowIds = {0, 2, 5, 7};
+        float[][] vectors = {
+            {1.0f, 2.0f, 3.0f},
+            {4.0f, 5.0f, 6.0f},
+            {7.0f, 8.0f, 9.0f},
+            {10.0f, 11.0f, 12.0f}
+        };
+
+        File tempFile = writeTempFile(dim, rowIds, vectors);
+
+        try (LuminaVectorGlobalIndexWriter.FileBackedDataset ds =
+                new LuminaVectorGlobalIndexWriter.FileBackedDataset(
+                        tempFile, dim, numVectors, "test")) {
+            assertThat(ds.dim()).isEqualTo(dim);
+            assertThat(ds.totalSize()).isEqualTo(numVectors);
+
+            float[] vectorBuf = new float[dim * numVectors];
+            long[] idBuf = new long[numVectors];
+            long read = ds.getNextBatch(vectorBuf, idBuf);
+
+            assertThat(read).isEqualTo(numVectors);
+            assertThat(idBuf).containsExactly(0, 2, 5, 7);
+            assertThat(vectorBuf[0]).isEqualTo(1.0f);
+            assertThat(vectorBuf[3]).isEqualTo(4.0f);
+            assertThat(vectorBuf[9]).isEqualTo(10.0f);
+            assertThat(vectorBuf[11]).isEqualTo(12.0f);
+
+            assertThat(ds.getNextBatch(vectorBuf, idBuf)).isEqualTo(0);
+        }
+    }
+
+    @Test
+    public void testSmallBufferForcesBoundary() throws IOException {
+        int dim = 2;
+        int numVectors = 5;
+        long[] rowIds = {0, 1, 3, 6, 10};
+        float[][] vectors = {
+            {1.0f, 2.0f},
+            {3.0f, 4.0f},
+            {5.0f, 6.0f},
+            {7.0f, 8.0f},
+            {9.0f, 10.0f}
+        };
+
+        File tempFile = writeTempFile(dim, rowIds, vectors);
+
+        // Record size = 8 + 2*4 = 16 bytes. Use buffer of 20 bytes so records 
always cross
+        // boundary.
+        int smallBuffer = 20;
+        try (LuminaVectorGlobalIndexWriter.FileBackedDataset ds =
+                new LuminaVectorGlobalIndexWriter.FileBackedDataset(
+                        tempFile, dim, numVectors, "test", smallBuffer)) {
+            float[] vectorBuf = new float[dim * 2];
+            long[] idBuf = new long[2];
+
+            // Read in batches of 2
+            long read = ds.getNextBatch(vectorBuf, idBuf);
+            assertThat(read).isEqualTo(2);
+            assertThat(idBuf[0]).isEqualTo(0);
+            assertThat(idBuf[1]).isEqualTo(1);
+            assertThat(vectorBuf[0]).isEqualTo(1.0f);
+            assertThat(vectorBuf[2]).isEqualTo(3.0f);
+
+            read = ds.getNextBatch(vectorBuf, idBuf);
+            assertThat(read).isEqualTo(2);
+            assertThat(idBuf[0]).isEqualTo(3);
+            assertThat(idBuf[1]).isEqualTo(6);
+
+            read = ds.getNextBatch(vectorBuf, idBuf);
+            assertThat(read).isEqualTo(1);
+            assertThat(idBuf[0]).isEqualTo(10);
+            assertThat(vectorBuf[0]).isEqualTo(9.0f);
+            assertThat(vectorBuf[1]).isEqualTo(10.0f);
+
+            assertThat(ds.getNextBatch(vectorBuf, idBuf)).isEqualTo(0);
+        }
+    }
+
+    @Test
+    public void testSingleRecordPerBuffer() throws IOException {
+        int dim = 4;
+        int numVectors = 3;
+        long[] rowIds = {100, 200, 300};
+        float[][] vectors = {
+            {1.0f, 2.0f, 3.0f, 4.0f},
+            {5.0f, 6.0f, 7.0f, 8.0f},
+            {9.0f, 10.0f, 11.0f, 12.0f}
+        };
+
+        File tempFile = writeTempFile(dim, rowIds, vectors);
+
+        // Record size = 8 + 4*4 = 24. Buffer exactly fits one record.
+        int exactBuffer = 24;
+        try (LuminaVectorGlobalIndexWriter.FileBackedDataset ds =
+                new LuminaVectorGlobalIndexWriter.FileBackedDataset(
+                        tempFile, dim, numVectors, "test", exactBuffer)) {
+            float[] vectorBuf = new float[dim];
+            long[] idBuf = new long[1];
+
+            for (int i = 0; i < numVectors; i++) {
+                long read = ds.getNextBatch(vectorBuf, idBuf);
+                assertThat(read).isEqualTo(1);
+                assertThat(idBuf[0]).isEqualTo(rowIds[i]);
+                for (int d = 0; d < dim; d++) {
+                    assertThat(vectorBuf[d]).isEqualTo(vectors[i][d]);
+                }
+            }
+            assertThat(ds.getNextBatch(vectorBuf, idBuf)).isEqualTo(0);
+        }
+    }
+
+    private File writeTempFile(int dim, long[] rowIds, float[][] vectors) 
throws IOException {
+        File tempFile = new File(tempDir.toFile(), "test-vectors.bin");
+        try (RandomAccessFile raf = new RandomAccessFile(tempFile, "rw");
+                FileChannel channel = raf.getChannel()) {
+            int recordSize = Long.BYTES + dim * Float.BYTES;
+            ByteBuffer buf = ByteBuffer.allocate(recordSize);
+            buf.order(ByteOrder.nativeOrder());
+            for (int i = 0; i < rowIds.length; i++) {
+                buf.clear();
+                buf.putLong(rowIds[i]);
+                for (int d = 0; d < dim; d++) {
+                    buf.putFloat(vectors[i][d]);
+                }
+                buf.flip();
+                while (buf.hasRemaining()) {
+                    channel.write(buf);
+                }
+            }
+        }
+        return tempFile;
+    }
+}
diff --git 
a/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexTest.java
 
b/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexTest.java
index f690fb7342..210baab03b 100644
--- 
a/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexTest.java
+++ 
b/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexTest.java
@@ -511,6 +511,221 @@ public class LuminaVectorGlobalIndexTest {
                 .hasMessageContaining("float vector");
     }
 
+    @Test
+    public void testNullVectorSkipWithCorrectIds() throws IOException {
+        int dimension = 2;
+        Options options = createDefaultOptions(dimension);
+
+        // [vec0, null, vec2, null, null, vec5]
+        float[][] vectors =
+                new float[][] {
+                    new float[] {1.0f, 0.0f},
+                    new float[] {0.1f, 0.95f},
+                    new float[] {0.0f, 1.0f}
+                };
+
+        GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+        LuminaVectorIndexOptions indexOptions = new 
LuminaVectorIndexOptions(options);
+        LuminaVectorGlobalIndexWriter writer =
+                new LuminaVectorGlobalIndexWriter(fileWriter, vectorType, 
indexOptions);
+
+        writer.write(vectors[0]); // row 0
+        writer.write(null); // row 1 - null
+        writer.write(vectors[1]); // row 2
+        writer.write(null); // row 3 - null
+        writer.write(null); // row 4 - null
+        writer.write(vectors[2]); // row 5
+
+        List<ResultEntry> results = writer.finish();
+        assertThat(results).hasSize(1);
+        assertThat(results.get(0).rowCount()).isEqualTo(6);
+
+        List<GlobalIndexIOMeta> metas = toIOMetas(results, indexPath);
+        GlobalIndexFileReader fileReader = createFileReader(indexPath);
+        try (LuminaVectorGlobalIndexReader reader =
+                new LuminaVectorGlobalIndexReader(fileReader, metas, 
vectorType, indexOptions)) {
+            // Search for vec0=(1,0), should find ID=0
+            VectorSearch vectorSearch = new VectorSearch(vectors[0], 3, 
fieldName);
+            LuminaScoredGlobalIndexResult result =
+                    (LuminaScoredGlobalIndexResult) 
reader.visitVectorSearch(vectorSearch).get();
+            assertThat(result.results().getLongCardinality()).isEqualTo(3);
+            // IDs should be {0, 2, 5} - shard-relative with null gaps
+            assertThat(result.results().contains(0L)).isTrue();
+            assertThat(result.results().contains(2L)).isTrue();
+            assertThat(result.results().contains(5L)).isTrue();
+            // IDs at null positions must NOT appear
+            assertThat(result.results().contains(1L)).isFalse();
+            assertThat(result.results().contains(3L)).isFalse();
+            assertThat(result.results().contains(4L)).isFalse();
+        }
+    }
+
+    @Test
+    public void testAllNullReturnsEmpty() {
+        int dimension = 2;
+        Options options = createDefaultOptions(dimension);
+
+        GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+        LuminaVectorIndexOptions indexOptions = new 
LuminaVectorIndexOptions(options);
+        LuminaVectorGlobalIndexWriter writer =
+                new LuminaVectorGlobalIndexWriter(fileWriter, vectorType, 
indexOptions);
+
+        writer.write(null);
+        writer.write(null);
+        writer.write(null);
+
+        List<ResultEntry> results = writer.finish();
+        assertThat(results).isEmpty();
+    }
+
+    @Test
+    public void testNullGapWithPreFilter() throws IOException {
+        int dimension = 2;
+        Options options = createDefaultOptions(dimension);
+
+        // [vec0, null, vec2, vec3, null, vec5]
+        float[][] vectors =
+                new float[][] {
+                    new float[] {1.0f, 0.0f},
+                    new float[] {0.95f, 0.1f},
+                    new float[] {0.9f, 0.2f},
+                    new float[] {0.0f, 1.0f}
+                };
+
+        GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+        LuminaVectorIndexOptions indexOptions = new 
LuminaVectorIndexOptions(options);
+        LuminaVectorGlobalIndexWriter writer =
+                new LuminaVectorGlobalIndexWriter(fileWriter, vectorType, 
indexOptions);
+
+        writer.write(vectors[0]); // row 0
+        writer.write(null); // row 1 - null
+        writer.write(vectors[1]); // row 2
+        writer.write(vectors[2]); // row 3
+        writer.write(null); // row 4 - null
+        writer.write(vectors[3]); // row 5
+
+        List<ResultEntry> results = writer.finish();
+        List<GlobalIndexIOMeta> metas = toIOMetas(results, indexPath);
+
+        GlobalIndexFileReader fileReader = createFileReader(indexPath);
+        try (LuminaVectorGlobalIndexReader reader =
+                new LuminaVectorGlobalIndexReader(fileReader, metas, 
vectorType, indexOptions)) {
+            // Pre-filter includes null gap IDs {1, 4} and valid ID {2}
+            RoaringNavigableMap64 filter = new RoaringNavigableMap64();
+            filter.add(1L); // null position - should not match
+            filter.add(2L); // valid vector
+            filter.add(4L); // null position - should not match
+            VectorSearch search =
+                    new VectorSearch(vectors[0], 3, 
fieldName).withIncludeRowIds(filter);
+            LuminaScoredGlobalIndexResult result =
+                    (LuminaScoredGlobalIndexResult) 
reader.visitVectorSearch(search).get();
+            // Only row 2 should be in results (rows 1 and 4 are null gaps)
+            assertThat(result.results().getLongCardinality()).isEqualTo(1);
+            assertThat(result.results().contains(2L)).isTrue();
+            assertThat(result.results().contains(1L)).isFalse();
+            assertThat(result.results().contains(4L)).isFalse();
+        }
+    }
+
+    @Test
+    public void testNullAtStart() throws IOException {
+        int dimension = 2;
+        Options options = createDefaultOptions(dimension);
+
+        GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+        LuminaVectorIndexOptions indexOptions = new 
LuminaVectorIndexOptions(options);
+        LuminaVectorGlobalIndexWriter writer =
+                new LuminaVectorGlobalIndexWriter(fileWriter, vectorType, 
indexOptions);
+
+        writer.write(null); // row 0 - null
+        writer.write(new float[] {1.0f, 0.0f}); // row 1
+
+        List<ResultEntry> results = writer.finish();
+        assertThat(results).hasSize(1);
+        assertThat(results.get(0).rowCount()).isEqualTo(2);
+
+        List<GlobalIndexIOMeta> metas = toIOMetas(results, indexPath);
+        GlobalIndexFileReader fileReader = createFileReader(indexPath);
+        try (LuminaVectorGlobalIndexReader reader =
+                new LuminaVectorGlobalIndexReader(fileReader, metas, 
vectorType, indexOptions)) {
+            VectorSearch search = new VectorSearch(new float[] {1.0f, 0.0f}, 
1, fieldName);
+            LuminaScoredGlobalIndexResult result =
+                    (LuminaScoredGlobalIndexResult) 
reader.visitVectorSearch(search).get();
+            assertThat(result.results().contains(1L)).isTrue();
+            assertThat(result.results().contains(0L)).isFalse();
+        }
+    }
+
+    @Test
+    public void testNullAtEnd() throws IOException {
+        int dimension = 2;
+        Options options = createDefaultOptions(dimension);
+
+        GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+        LuminaVectorIndexOptions indexOptions = new 
LuminaVectorIndexOptions(options);
+        LuminaVectorGlobalIndexWriter writer =
+                new LuminaVectorGlobalIndexWriter(fileWriter, vectorType, 
indexOptions);
+
+        writer.write(new float[] {1.0f, 0.0f}); // row 0
+        writer.write(null); // row 1 - null
+
+        List<ResultEntry> results = writer.finish();
+        assertThat(results).hasSize(1);
+        assertThat(results.get(0).rowCount()).isEqualTo(2);
+
+        List<GlobalIndexIOMeta> metas = toIOMetas(results, indexPath);
+        GlobalIndexFileReader fileReader = createFileReader(indexPath);
+        try (LuminaVectorGlobalIndexReader reader =
+                new LuminaVectorGlobalIndexReader(fileReader, metas, 
vectorType, indexOptions)) {
+            VectorSearch search = new VectorSearch(new float[] {1.0f, 0.0f}, 
1, fieldName);
+            LuminaScoredGlobalIndexResult result =
+                    (LuminaScoredGlobalIndexResult) 
reader.visitVectorSearch(search).get();
+            assertThat(result.results().contains(0L)).isTrue();
+            assertThat(result.results().contains(1L)).isFalse();
+        }
+    }
+
+    @Test
+    public void testNanInVectorRejected() {
+        int dimension = 2;
+        Options options = createDefaultOptions(dimension);
+
+        GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+        LuminaVectorIndexOptions indexOptions = new 
LuminaVectorIndexOptions(options);
+        LuminaVectorGlobalIndexWriter writer =
+                new LuminaVectorGlobalIndexWriter(fileWriter, vectorType, 
indexOptions);
+
+        assertThatThrownBy(() -> writer.write(new float[] {1.0f, Float.NaN}))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("rowId=0")
+                .hasMessageContaining("index=1")
+                .hasMessageContaining("NaN");
+    }
+
+    @Test
+    public void testInfinityInVectorRejected() {
+        int dimension = 2;
+        Options options = createDefaultOptions(dimension);
+
+        GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+        LuminaVectorIndexOptions indexOptions = new 
LuminaVectorIndexOptions(options);
+        LuminaVectorGlobalIndexWriter writer =
+                new LuminaVectorGlobalIndexWriter(fileWriter, vectorType, 
indexOptions);
+
+        writer.write(null); // row 0 - null, advances logicalRowId
+        assertThatThrownBy(() -> writer.write(new float[] 
{Float.POSITIVE_INFINITY, 0.0f}))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("rowId=1")
+                .hasMessageContaining("index=0")
+                .hasMessageContaining("Infinity");
+
+        assertThatThrownBy(() -> writer.write(new float[] {0.0f, 
Float.NEGATIVE_INFINITY}))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("rowId=1")
+                .hasMessageContaining("index=1")
+                .hasMessageContaining("-Infinity");
+    }
+
     private Options createDefaultOptions(int dimension) {
         Options options = new Options();
         options.setInteger(LuminaVectorIndexOptions.DIMENSION.key(), 
dimension);

Reply via email to