This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ed42a28f5e [lumina] Support null vectors in Lumina index writer (#7988)
ed42a28f5e is described below
commit ed42a28f5ec07a44ed378cb03e957a820d1115aa
Author: jerry <[email protected]>
AuthorDate: Wed May 27 19:18:19 2026 +0800
[lumina] Support null vectors in Lumina index writer (#7988)
---
.../org/apache/paimon/globalindex/ResultEntry.java | 3 +
.../flink/globalindex/GenericIndexTopoBuilder.java | 27 ++-
.../index/LuminaVectorGlobalIndexWriter.java | 143 ++++++++++----
.../paimon/lumina/index/FileBackedDatasetTest.java | 174 +++++++++++++++++
.../lumina/index/LuminaVectorGlobalIndexTest.java | 215 +++++++++++++++++++++
5 files changed, 508 insertions(+), 54 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/ResultEntry.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/ResultEntry.java
index 69938fdbdf..6891f25b17 100644
--- a/paimon-common/src/main/java/org/apache/paimon/globalindex/ResultEntry.java
+++ b/paimon-common/src/main/java/org/apache/paimon/globalindex/ResultEntry.java
@@ -24,7 +24,10 @@ import javax.annotation.Nullable;
public class ResultEntry {
private final String fileName;
+
+ /** Total logical rows processed by the writer, including null/skipped
rows. */
private final long rowCount;
+
@Nullable private final byte[] meta;
public ResultEntry(String fileName, long rowCount, @Nullable byte[] meta) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
index e37970723c..5896503ce0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
@@ -600,7 +600,7 @@ public class GenericIndexTopoBuilder {
createIndexWriter(table, indexType, indexField,
mergedOptions);
try {
- long rowsWritten = 0;
+ long rowsSeen = 0;
long lastRowId = Long.MIN_VALUE;
try (RecordReader<InternalRow> reader =
tableRead.createReader(task.split);
CloseableIterator<InternalRow> iter =
reader.toCloseableIterator()) {
@@ -627,34 +627,33 @@ public class GenericIndexTopoBuilder {
// Only write rows within this shard's range
if (currentRowId >= task.shardRange.from) {
Object fieldData =
indexFieldGetter.getFieldOrNull(row);
- if (fieldData == null) {
- LOG.info(
- "Null vector at rowId={}, stopping
shard [{}, {}].",
- currentRowId,
- task.shardRange.from,
- task.shardRange.to);
- break;
- }
indexWriter.write(fieldData);
- rowsWritten++;
+ rowsSeen++;
}
}
}
List<ResultEntry> resultEntries = indexWriter.finish();
+ if (!resultEntries.isEmpty() &&
resultEntries.get(0).rowCount() != rowsSeen) {
+ LOG.warn(
+ "rowCount mismatch: writer reported {} but caller
saw {} rows",
+ resultEntries.get(0).rowCount(),
+ rowsSeen);
+ }
long elapsed = System.currentTimeMillis() - startTime;
LOG.info(
- "Finished shard [{}, {}]: wrote {} rows, "
+ "Finished shard [{}, {}]: saw {} rows, "
+ "produced {} result entries in {} ms.",
task.shardRange.from,
task.shardRange.to,
- rowsWritten,
+ rowsSeen,
resultEntries.size(),
elapsed);
- if (rowsWritten == 0) {
+ if (resultEntries.isEmpty()) {
LOG.warn(
- "Shard [{}, {}] produced 0 rows, skipping index
flush.",
+ "Shard [{}, {}] produced no index (all null or
empty), "
+ + "skipping index flush.",
task.shardRange.from,
task.shardRange.to);
return;
diff --git
a/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexWriter.java
b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexWriter.java
index 16058f5085..6e1a31902e 100644
---
a/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexWriter.java
+++
b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexWriter.java
@@ -80,15 +80,19 @@ public class LuminaVectorGlobalIndexWriter implements
GlobalIndexSingletonWriter
private final LuminaVectorIndexOptions options;
private final int dim;
- /** Temporary file storing vectors as raw native-order floats. */
+ /** Temporary file storing records as [rowId (long)][float * dim] in
native byte order. */
private File tempVectorFile;
private FileChannel writeChannel;
private ByteBuffer writeBuf;
- private int count;
+ private final int recordSizeInBytes;
+ private final float[] vectorBuf;
+ private long count;
private boolean closed;
+ private long logicalRowId;
+
public LuminaVectorGlobalIndexWriter(
GlobalIndexFileWriter fileWriter,
DataType fieldType,
@@ -98,6 +102,8 @@ public class LuminaVectorGlobalIndexWriter implements
GlobalIndexSingletonWriter
this.dim = options.dimension();
this.count = 0;
this.closed = false;
+ this.recordSizeInBytes = checkedRecordSize(dim, IO_BUFFER_SIZE);
+ this.vectorBuf = new float[dim];
validateFieldType(fieldType);
@@ -139,26 +145,46 @@ public class LuminaVectorGlobalIndexWriter implements
GlobalIndexSingletonWriter
@Override
public void write(Object fieldData) {
if (fieldData == null) {
- throw new IllegalArgumentException("Field data must not be null");
+ logicalRowId++;
+ return;
}
- int bytesNeeded = dim * Float.BYTES;
- if (writeBuf.remaining() < bytesNeeded) {
+ // Validation must complete before any buffer/state mutation below
+ float[] src = materializeAndValidate(fieldData);
+
+ if (writeBuf.remaining() < recordSizeInBytes) {
flushWriteBuffer();
}
+ writeBuf.putLong(logicalRowId);
+ for (int i = 0; i < dim; i++) {
+ writeBuf.putFloat(src[i]);
+ }
+ logicalRowId++;
+ count++;
+ }
+ /**
+ * Validates the vector and returns a float[] ready to write. For float[]
input, returns the
+ * input directly (zero-copy). For InternalVector/InternalArray, reads
into the reusable
+ * vectorBuf field.
+ */
+ private float[] materializeAndValidate(Object fieldData) {
if (fieldData instanceof float[]) {
float[] vector = (float[]) fieldData;
checkDimension(vector.length);
for (int i = 0; i < dim; i++) {
- writeBuf.putFloat(vector[i]);
+ checkFinite(vector[i], i);
}
+ return vector;
} else if (fieldData instanceof InternalVector) {
InternalVector vector = (InternalVector) fieldData;
checkDimension(vector.size());
for (int i = 0; i < dim; i++) {
- writeBuf.putFloat(vector.getFloat(i));
+ float v = vector.getFloat(i);
+ checkFinite(v, i);
+ vectorBuf[i] = v;
}
+ return vectorBuf;
} else if (fieldData instanceof InternalArray) {
InternalArray array = (InternalArray) fieldData;
checkDimension(array.size());
@@ -166,13 +192,15 @@ public class LuminaVectorGlobalIndexWriter implements
GlobalIndexSingletonWriter
if (array.isNullAt(i)) {
throw new IllegalArgumentException("Vector element at
index " + i + " is null");
}
- writeBuf.putFloat(array.getFloat(i));
+ float v = array.getFloat(i);
+ checkFinite(v, i);
+ vectorBuf[i] = v;
}
+ return vectorBuf;
} else {
throw new RuntimeException(
"Unsupported vector type: " +
fieldData.getClass().getName());
}
- count++;
}
private void flushWriteBuffer() {
@@ -191,6 +219,9 @@ public class LuminaVectorGlobalIndexWriter implements
GlobalIndexSingletonWriter
public List<ResultEntry> finish() {
try {
if (count == 0) {
+ writeChannel.close();
+ writeChannel = null;
+ writeBuf = null;
return Collections.emptyList();
}
// Flush remaining data and close the write channel
@@ -258,7 +289,8 @@ public class LuminaVectorGlobalIndexWriter implements
GlobalIndexSingletonWriter
System.currentTimeMillis() - buildStart);
LuminaIndexMeta meta = new
LuminaIndexMeta(options.toLuminaOptions());
- return new ResultEntry(fileName, count, meta.serialize());
+ // rowCount = logical rows including nulls (not just indexed
vectors)
+ return new ResultEntry(fileName, logicalRowId, meta.serialize());
}
}
@@ -322,6 +354,27 @@ public class LuminaVectorGlobalIndexWriter implements
GlobalIndexSingletonWriter
}
}
+ private void checkFinite(float value, int elementIndex) {
+ if (!Float.isFinite(value)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Vector element at rowId=%d, index=%d is %s",
+ logicalRowId, elementIndex,
Float.toString(value)));
+ }
+ }
+
+ private static int checkedRecordSize(int dim, int bufferCapacity) {
+ long recordSize = Long.BYTES + (long) dim * Float.BYTES;
+ if (recordSize > bufferCapacity || recordSize > Integer.MAX_VALUE) {
+ throw new IllegalStateException(
+ "Vector record size "
+ + recordSize
+ + " exceeds buffer capacity "
+ + bufferCapacity);
+ }
+ return (int) recordSize;
+ }
+
@Override
public void close() {
if (!closed) {
@@ -348,19 +401,26 @@ public class LuminaVectorGlobalIndexWriter implements
GlobalIndexSingletonWriter
private final RandomAccessFile raf;
private final FileChannel channel;
private final int dim;
- private final int totalCount;
- private int cursor;
+ private final long totalCount;
+ private final int recordSizeInBytes;
+ private long cursor;
private final ByteBuffer readBuf;
private final String phase;
private int lastLoggedPercent;
- FileBackedDataset(File file, int dim, int totalCount, String phase)
throws IOException {
+ FileBackedDataset(File file, int dim, long totalCount, String phase)
throws IOException {
+ this(file, dim, totalCount, phase, IO_BUFFER_SIZE);
+ }
+
+ FileBackedDataset(File file, int dim, long totalCount, String phase,
int bufferSize)
+ throws IOException {
this.raf = new RandomAccessFile(file, "r");
this.channel = raf.getChannel();
this.dim = dim;
this.totalCount = totalCount;
+ this.recordSizeInBytes = checkedRecordSize(dim, bufferSize);
this.cursor = 0;
- this.readBuf = ByteBuffer.allocateDirect(IO_BUFFER_SIZE);
+ this.readBuf = ByteBuffer.allocateDirect(bufferSize);
this.readBuf.order(ByteOrder.nativeOrder());
this.readBuf.limit(0); // empty initially
this.phase = phase;
@@ -382,43 +442,26 @@ public class LuminaVectorGlobalIndexWriter implements
GlobalIndexSingletonWriter
if (cursor >= totalCount) {
return 0;
}
- int remaining = totalCount - cursor;
+ long remaining = totalCount - cursor;
int maxByVectorBuf = vectorBuf.length / dim;
- int batchSize = Math.min(Math.min(maxByVectorBuf, idBuf.length),
remaining);
+ int batchSize = (int) Math.min(Math.min(maxByVectorBuf,
idBuf.length), remaining);
- int floatsNeeded = batchSize * dim;
- int destOffset = 0;
try {
- while (destOffset < floatsNeeded) {
- if (readBuf.remaining() < Float.BYTES) {
- // Compact any partial float bytes and refill
- readBuf.compact();
- int bytesRead = channel.read(readBuf);
- readBuf.flip();
- if (bytesRead == -1 && readBuf.remaining() <
Float.BYTES) {
- throw new IOException(
- "Unexpected end of temp file: read "
- + destOffset
- + " floats but need "
- + floatsNeeded);
- }
+ for (int i = 0; i < batchSize; i++) {
+ ensureAvailable(recordSizeInBytes);
+ idBuf[i] = readBuf.getLong();
+ int base = i * dim;
+ for (int d = 0; d < dim; d++) {
+ vectorBuf[base + d] = readBuf.getFloat();
}
- int availableFloats = readBuf.remaining() / Float.BYTES;
- int toRead = Math.min(availableFloats, floatsNeeded -
destOffset);
- readBuf.asFloatBuffer().get(vectorBuf, destOffset, toRead);
- readBuf.position(readBuf.position() + toRead *
Float.BYTES);
- destOffset += toRead;
}
} catch (IOException e) {
throw new RuntimeException("Failed to read vectors from temp
file", e);
}
- for (int i = 0; i < batchSize; i++) {
- idBuf[i] = cursor + i;
- }
cursor += batchSize;
- int percent = (int) ((long) cursor * 100 / totalCount);
+ int percent = (int) (cursor * 100 / totalCount);
if (percent / 10 > lastLoggedPercent / 10) {
LOG.info(
"Lumina {} progress: {}/{} vectors ({}%)",
@@ -429,6 +472,26 @@ public class LuminaVectorGlobalIndexWriter implements
GlobalIndexSingletonWriter
return batchSize;
}
+ private void ensureAvailable(int minBytes) throws IOException {
+ int zeroReadCount = 0;
+ while (readBuf.remaining() < minBytes) {
+ readBuf.compact();
+ int bytesRead = channel.read(readBuf);
+ readBuf.flip();
+ if (bytesRead == -1) {
+ throw new IOException("Unexpected end of temp file");
+ }
+ if (bytesRead == 0) {
+ if (++zeroReadCount > 100) {
+ throw new IOException(
+ "Unable to read from temp file: repeated
zero-byte reads");
+ }
+ } else {
+ zeroReadCount = 0;
+ }
+ }
+ }
+
@Override
public void close() throws IOException {
channel.close();
diff --git
a/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/FileBackedDatasetTest.java
b/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/FileBackedDatasetTest.java
new file mode 100644
index 0000000000..3fe87e1a16
--- /dev/null
+++
b/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/FileBackedDatasetTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test for {@link LuminaVectorGlobalIndexWriter.FileBackedDataset} temp
file format. */
+public class FileBackedDatasetTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @Test
+ public void testBasicReadback() throws IOException {
+ int dim = 3;
+ int numVectors = 4;
+ long[] rowIds = {0, 2, 5, 7};
+ float[][] vectors = {
+ {1.0f, 2.0f, 3.0f},
+ {4.0f, 5.0f, 6.0f},
+ {7.0f, 8.0f, 9.0f},
+ {10.0f, 11.0f, 12.0f}
+ };
+
+ File tempFile = writeTempFile(dim, rowIds, vectors);
+
+ try (LuminaVectorGlobalIndexWriter.FileBackedDataset ds =
+ new LuminaVectorGlobalIndexWriter.FileBackedDataset(
+ tempFile, dim, numVectors, "test")) {
+ assertThat(ds.dim()).isEqualTo(dim);
+ assertThat(ds.totalSize()).isEqualTo(numVectors);
+
+ float[] vectorBuf = new float[dim * numVectors];
+ long[] idBuf = new long[numVectors];
+ long read = ds.getNextBatch(vectorBuf, idBuf);
+
+ assertThat(read).isEqualTo(numVectors);
+ assertThat(idBuf).containsExactly(0, 2, 5, 7);
+ assertThat(vectorBuf[0]).isEqualTo(1.0f);
+ assertThat(vectorBuf[3]).isEqualTo(4.0f);
+ assertThat(vectorBuf[9]).isEqualTo(10.0f);
+ assertThat(vectorBuf[11]).isEqualTo(12.0f);
+
+ assertThat(ds.getNextBatch(vectorBuf, idBuf)).isEqualTo(0);
+ }
+ }
+
+ @Test
+ public void testSmallBufferForcesBoundary() throws IOException {
+ int dim = 2;
+ int numVectors = 5;
+ long[] rowIds = {0, 1, 3, 6, 10};
+ float[][] vectors = {
+ {1.0f, 2.0f},
+ {3.0f, 4.0f},
+ {5.0f, 6.0f},
+ {7.0f, 8.0f},
+ {9.0f, 10.0f}
+ };
+
+ File tempFile = writeTempFile(dim, rowIds, vectors);
+
+ // Record size = 8 + 2*4 = 16 bytes. Use buffer of 20 bytes so records
always cross
+ // boundary.
+ int smallBuffer = 20;
+ try (LuminaVectorGlobalIndexWriter.FileBackedDataset ds =
+ new LuminaVectorGlobalIndexWriter.FileBackedDataset(
+ tempFile, dim, numVectors, "test", smallBuffer)) {
+ float[] vectorBuf = new float[dim * 2];
+ long[] idBuf = new long[2];
+
+ // Read in batches of 2
+ long read = ds.getNextBatch(vectorBuf, idBuf);
+ assertThat(read).isEqualTo(2);
+ assertThat(idBuf[0]).isEqualTo(0);
+ assertThat(idBuf[1]).isEqualTo(1);
+ assertThat(vectorBuf[0]).isEqualTo(1.0f);
+ assertThat(vectorBuf[2]).isEqualTo(3.0f);
+
+ read = ds.getNextBatch(vectorBuf, idBuf);
+ assertThat(read).isEqualTo(2);
+ assertThat(idBuf[0]).isEqualTo(3);
+ assertThat(idBuf[1]).isEqualTo(6);
+
+ read = ds.getNextBatch(vectorBuf, idBuf);
+ assertThat(read).isEqualTo(1);
+ assertThat(idBuf[0]).isEqualTo(10);
+ assertThat(vectorBuf[0]).isEqualTo(9.0f);
+ assertThat(vectorBuf[1]).isEqualTo(10.0f);
+
+ assertThat(ds.getNextBatch(vectorBuf, idBuf)).isEqualTo(0);
+ }
+ }
+
+ @Test
+ public void testSingleRecordPerBuffer() throws IOException {
+ int dim = 4;
+ int numVectors = 3;
+ long[] rowIds = {100, 200, 300};
+ float[][] vectors = {
+ {1.0f, 2.0f, 3.0f, 4.0f},
+ {5.0f, 6.0f, 7.0f, 8.0f},
+ {9.0f, 10.0f, 11.0f, 12.0f}
+ };
+
+ File tempFile = writeTempFile(dim, rowIds, vectors);
+
+ // Record size = 8 + 4*4 = 24. Buffer exactly fits one record.
+ int exactBuffer = 24;
+ try (LuminaVectorGlobalIndexWriter.FileBackedDataset ds =
+ new LuminaVectorGlobalIndexWriter.FileBackedDataset(
+ tempFile, dim, numVectors, "test", exactBuffer)) {
+ float[] vectorBuf = new float[dim];
+ long[] idBuf = new long[1];
+
+ for (int i = 0; i < numVectors; i++) {
+ long read = ds.getNextBatch(vectorBuf, idBuf);
+ assertThat(read).isEqualTo(1);
+ assertThat(idBuf[0]).isEqualTo(rowIds[i]);
+ for (int d = 0; d < dim; d++) {
+ assertThat(vectorBuf[d]).isEqualTo(vectors[i][d]);
+ }
+ }
+ assertThat(ds.getNextBatch(vectorBuf, idBuf)).isEqualTo(0);
+ }
+ }
+
+ private File writeTempFile(int dim, long[] rowIds, float[][] vectors)
throws IOException {
+ File tempFile = new File(tempDir.toFile(), "test-vectors.bin");
+ try (RandomAccessFile raf = new RandomAccessFile(tempFile, "rw");
+ FileChannel channel = raf.getChannel()) {
+ int recordSize = Long.BYTES + dim * Float.BYTES;
+ ByteBuffer buf = ByteBuffer.allocate(recordSize);
+ buf.order(ByteOrder.nativeOrder());
+ for (int i = 0; i < rowIds.length; i++) {
+ buf.clear();
+ buf.putLong(rowIds[i]);
+ for (int d = 0; d < dim; d++) {
+ buf.putFloat(vectors[i][d]);
+ }
+ buf.flip();
+ while (buf.hasRemaining()) {
+ channel.write(buf);
+ }
+ }
+ }
+ return tempFile;
+ }
+}
diff --git
a/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexTest.java
b/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexTest.java
index f690fb7342..210baab03b 100644
---
a/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexTest.java
+++
b/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexTest.java
@@ -511,6 +511,221 @@ public class LuminaVectorGlobalIndexTest {
.hasMessageContaining("float vector");
}
+ @Test
+ public void testNullVectorSkipWithCorrectIds() throws IOException {
+ int dimension = 2;
+ Options options = createDefaultOptions(dimension);
+
+ // [vec0, null, vec2, null, null, vec5]
+ float[][] vectors =
+ new float[][] {
+ new float[] {1.0f, 0.0f},
+ new float[] {0.1f, 0.95f},
+ new float[] {0.0f, 1.0f}
+ };
+
+ GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+ LuminaVectorIndexOptions indexOptions = new
LuminaVectorIndexOptions(options);
+ LuminaVectorGlobalIndexWriter writer =
+ new LuminaVectorGlobalIndexWriter(fileWriter, vectorType,
indexOptions);
+
+ writer.write(vectors[0]); // row 0
+ writer.write(null); // row 1 - null
+ writer.write(vectors[1]); // row 2
+ writer.write(null); // row 3 - null
+ writer.write(null); // row 4 - null
+ writer.write(vectors[2]); // row 5
+
+ List<ResultEntry> results = writer.finish();
+ assertThat(results).hasSize(1);
+ assertThat(results.get(0).rowCount()).isEqualTo(6);
+
+ List<GlobalIndexIOMeta> metas = toIOMetas(results, indexPath);
+ GlobalIndexFileReader fileReader = createFileReader(indexPath);
+ try (LuminaVectorGlobalIndexReader reader =
+ new LuminaVectorGlobalIndexReader(fileReader, metas,
vectorType, indexOptions)) {
+ // Search for vec0=(1,0), should find ID=0
+ VectorSearch vectorSearch = new VectorSearch(vectors[0], 3,
fieldName);
+ LuminaScoredGlobalIndexResult result =
+ (LuminaScoredGlobalIndexResult)
reader.visitVectorSearch(vectorSearch).get();
+ assertThat(result.results().getLongCardinality()).isEqualTo(3);
+ // IDs should be {0, 2, 5} - shard-relative with null gaps
+ assertThat(result.results().contains(0L)).isTrue();
+ assertThat(result.results().contains(2L)).isTrue();
+ assertThat(result.results().contains(5L)).isTrue();
+ // IDs at null positions must NOT appear
+ assertThat(result.results().contains(1L)).isFalse();
+ assertThat(result.results().contains(3L)).isFalse();
+ assertThat(result.results().contains(4L)).isFalse();
+ }
+ }
+
+ @Test
+ public void testAllNullReturnsEmpty() {
+ int dimension = 2;
+ Options options = createDefaultOptions(dimension);
+
+ GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+ LuminaVectorIndexOptions indexOptions = new
LuminaVectorIndexOptions(options);
+ LuminaVectorGlobalIndexWriter writer =
+ new LuminaVectorGlobalIndexWriter(fileWriter, vectorType,
indexOptions);
+
+ writer.write(null);
+ writer.write(null);
+ writer.write(null);
+
+ List<ResultEntry> results = writer.finish();
+ assertThat(results).isEmpty();
+ }
+
+ @Test
+ public void testNullGapWithPreFilter() throws IOException {
+ int dimension = 2;
+ Options options = createDefaultOptions(dimension);
+
+ // [vec0, null, vec2, vec3, null, vec5]
+ float[][] vectors =
+ new float[][] {
+ new float[] {1.0f, 0.0f},
+ new float[] {0.95f, 0.1f},
+ new float[] {0.9f, 0.2f},
+ new float[] {0.0f, 1.0f}
+ };
+
+ GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+ LuminaVectorIndexOptions indexOptions = new
LuminaVectorIndexOptions(options);
+ LuminaVectorGlobalIndexWriter writer =
+ new LuminaVectorGlobalIndexWriter(fileWriter, vectorType,
indexOptions);
+
+ writer.write(vectors[0]); // row 0
+ writer.write(null); // row 1 - null
+ writer.write(vectors[1]); // row 2
+ writer.write(vectors[2]); // row 3
+ writer.write(null); // row 4 - null
+ writer.write(vectors[3]); // row 5
+
+ List<ResultEntry> results = writer.finish();
+ List<GlobalIndexIOMeta> metas = toIOMetas(results, indexPath);
+
+ GlobalIndexFileReader fileReader = createFileReader(indexPath);
+ try (LuminaVectorGlobalIndexReader reader =
+ new LuminaVectorGlobalIndexReader(fileReader, metas,
vectorType, indexOptions)) {
+ // Pre-filter includes null gap IDs {1, 4} and valid ID {2}
+ RoaringNavigableMap64 filter = new RoaringNavigableMap64();
+ filter.add(1L); // null position - should not match
+ filter.add(2L); // valid vector
+ filter.add(4L); // null position - should not match
+ VectorSearch search =
+ new VectorSearch(vectors[0], 3,
fieldName).withIncludeRowIds(filter);
+ LuminaScoredGlobalIndexResult result =
+ (LuminaScoredGlobalIndexResult)
reader.visitVectorSearch(search).get();
+ // Only row 2 should be in results (rows 1 and 4 are null gaps)
+ assertThat(result.results().getLongCardinality()).isEqualTo(1);
+ assertThat(result.results().contains(2L)).isTrue();
+ assertThat(result.results().contains(1L)).isFalse();
+ assertThat(result.results().contains(4L)).isFalse();
+ }
+ }
+
+ @Test
+ public void testNullAtStart() throws IOException {
+ int dimension = 2;
+ Options options = createDefaultOptions(dimension);
+
+ GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+ LuminaVectorIndexOptions indexOptions = new
LuminaVectorIndexOptions(options);
+ LuminaVectorGlobalIndexWriter writer =
+ new LuminaVectorGlobalIndexWriter(fileWriter, vectorType,
indexOptions);
+
+ writer.write(null); // row 0 - null
+ writer.write(new float[] {1.0f, 0.0f}); // row 1
+
+ List<ResultEntry> results = writer.finish();
+ assertThat(results).hasSize(1);
+ assertThat(results.get(0).rowCount()).isEqualTo(2);
+
+ List<GlobalIndexIOMeta> metas = toIOMetas(results, indexPath);
+ GlobalIndexFileReader fileReader = createFileReader(indexPath);
+ try (LuminaVectorGlobalIndexReader reader =
+ new LuminaVectorGlobalIndexReader(fileReader, metas,
vectorType, indexOptions)) {
+ VectorSearch search = new VectorSearch(new float[] {1.0f, 0.0f},
1, fieldName);
+ LuminaScoredGlobalIndexResult result =
+ (LuminaScoredGlobalIndexResult)
reader.visitVectorSearch(search).get();
+ assertThat(result.results().contains(1L)).isTrue();
+ assertThat(result.results().contains(0L)).isFalse();
+ }
+ }
+
+ @Test
+ public void testNullAtEnd() throws IOException {
+ int dimension = 2;
+ Options options = createDefaultOptions(dimension);
+
+ GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+ LuminaVectorIndexOptions indexOptions = new
LuminaVectorIndexOptions(options);
+ LuminaVectorGlobalIndexWriter writer =
+ new LuminaVectorGlobalIndexWriter(fileWriter, vectorType,
indexOptions);
+
+ writer.write(new float[] {1.0f, 0.0f}); // row 0
+ writer.write(null); // row 1 - null
+
+ List<ResultEntry> results = writer.finish();
+ assertThat(results).hasSize(1);
+ assertThat(results.get(0).rowCount()).isEqualTo(2);
+
+ List<GlobalIndexIOMeta> metas = toIOMetas(results, indexPath);
+ GlobalIndexFileReader fileReader = createFileReader(indexPath);
+ try (LuminaVectorGlobalIndexReader reader =
+ new LuminaVectorGlobalIndexReader(fileReader, metas,
vectorType, indexOptions)) {
+ VectorSearch search = new VectorSearch(new float[] {1.0f, 0.0f},
1, fieldName);
+ LuminaScoredGlobalIndexResult result =
+ (LuminaScoredGlobalIndexResult)
reader.visitVectorSearch(search).get();
+ assertThat(result.results().contains(0L)).isTrue();
+ assertThat(result.results().contains(1L)).isFalse();
+ }
+ }
+
+ @Test
+ public void testNanInVectorRejected() {
+ int dimension = 2;
+ Options options = createDefaultOptions(dimension);
+
+ GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+ LuminaVectorIndexOptions indexOptions = new
LuminaVectorIndexOptions(options);
+ LuminaVectorGlobalIndexWriter writer =
+ new LuminaVectorGlobalIndexWriter(fileWriter, vectorType,
indexOptions);
+
+ assertThatThrownBy(() -> writer.write(new float[] {1.0f, Float.NaN}))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("rowId=0")
+ .hasMessageContaining("index=1")
+ .hasMessageContaining("NaN");
+ }
+
+ @Test
+ public void testInfinityInVectorRejected() {
+ int dimension = 2;
+ Options options = createDefaultOptions(dimension);
+
+ GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+ LuminaVectorIndexOptions indexOptions = new
LuminaVectorIndexOptions(options);
+ LuminaVectorGlobalIndexWriter writer =
+ new LuminaVectorGlobalIndexWriter(fileWriter, vectorType,
indexOptions);
+
+ writer.write(null); // row 0 - null, advances logicalRowId
+ assertThatThrownBy(() -> writer.write(new float[]
{Float.POSITIVE_INFINITY, 0.0f}))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("rowId=1")
+ .hasMessageContaining("index=0")
+ .hasMessageContaining("Infinity");
+
+ assertThatThrownBy(() -> writer.write(new float[] {0.0f,
Float.NEGATIVE_INFINITY}))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("rowId=1")
+ .hasMessageContaining("index=1")
+ .hasMessageContaining("-Infinity");
+ }
+
private Options createDefaultOptions(int dimension) {
Options options = new Options();
options.setInteger(LuminaVectorIndexOptions.DIMENSION.key(),
dimension);