This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-vector-index.git
The following commit(s) were added to refs/heads/main by this push:
new a95b679 Guard JNI native handle lifecycle (#27)
a95b679 is described below
commit a95b6793e2df94948eb3dd22d322543e167d5ca8
Author: QuakeWang <[email protected]>
AuthorDate: Wed Jun 10 17:57:32 2026 +0800
Guard JNI native handle lifecycle (#27)
---
.github/workflows/ci.yml | 2 +
.../paimon/index/ivfpq/VectorIndexJavaApiTest.java | 71 ++++++++
.../ivfpq/VectorIndexNativeHandleSafetyTest.java | 198 +++++++++++++++++++++
.../paimon/index/ivfpq/VectorIndexReader.java | 84 +++++++--
.../paimon/index/ivfpq/VectorIndexWriter.java | 56 +++++-
5 files changed, 391 insertions(+), 20 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 307394b..2293a62 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -117,6 +117,8 @@ jobs:
run: |
java -cp target/java-api-test
org.apache.paimon.index.ivfpq.VectorIndexNativePanicBoundaryTest \
"$(pwd)/target/release/libpaimon_vindex_jni.so"
+ java -cp target/java-api-test
org.apache.paimon.index.ivfpq.VectorIndexNativeHandleSafetyTest \
+ "$(pwd)/target/release/libpaimon_vindex_jni.so"
python-build:
runs-on: ubuntu-latest
diff --git
a/jni/java-test/org/apache/paimon/index/ivfpq/VectorIndexJavaApiTest.java
b/jni/java-test/org/apache/paimon/index/ivfpq/VectorIndexJavaApiTest.java
index 4ea2a68..d24badc 100644
--- a/jni/java-test/org/apache/paimon/index/ivfpq/VectorIndexJavaApiTest.java
+++ b/jni/java-test/org/apache/paimon/index/ivfpq/VectorIndexJavaApiTest.java
@@ -28,6 +28,8 @@ public class VectorIndexJavaApiTest {
testBatchResultCopiesArraysAndSlicesRows();
testConfigValidation();
testMetadata();
+ testClosedReaderRejectsOperations();
+ testClosedWriterRejectsOperations();
testReaderAndWriterApiCompile();
}
@@ -139,6 +141,75 @@ public class VectorIndexJavaApiTest {
assertEquals(20, metadata.hnsw().m());
}
+ private static void testClosedReaderRejectsOperations() {
+ final VectorIndexReader reader =
VectorIndexReader.fromNativePointerForTesting(0L);
+ reader.close();
+ reader.close();
+
+ assertThrows(IllegalStateException.class, new ThrowingRunnable() {
+ @Override
+ public void run() {
+ reader.metadata();
+ }
+ });
+ assertThrows(IllegalStateException.class, new ThrowingRunnable() {
+ @Override
+ public void run() {
+ reader.indexType();
+ }
+ });
+ assertThrows(IllegalStateException.class, new ThrowingRunnable() {
+ @Override
+ public void run() {
+ reader.dimension();
+ }
+ });
+ assertThrows(IllegalStateException.class, new ThrowingRunnable() {
+ @Override
+ public void run() {
+ reader.totalVectors();
+ }
+ });
+ assertThrows(IllegalStateException.class, new ThrowingRunnable() {
+ @Override
+ public void run() {
+ reader.search(new float[] {0.0f}, 1, 1);
+ }
+ });
+ assertThrows(IllegalStateException.class, new ThrowingRunnable() {
+ @Override
+ public void run() {
+ reader.searchBatch(new float[] {0.0f}, 1, 1, 1);
+ }
+ });
+ }
+
+ private static void testClosedWriterRejectsOperations() {
+ VectorIndexConfig config = VectorIndexConfig.ivfPq(2, 4, 1, Metric.L2,
false);
+ final VectorIndexWriter writer =
VectorIndexWriter.fromNativePointerForTesting(0L, config);
+ writer.close();
+ writer.close();
+
+ assertThrows(IllegalStateException.class, new ThrowingRunnable() {
+ @Override
+ public void run() {
+ writer.train(new float[] {0.0f, 1.0f}, 1);
+ }
+ });
+ assertThrows(IllegalStateException.class, new ThrowingRunnable() {
+ @Override
+ public void run() {
+ writer.addVectors(new long[] {1L}, new float[] {0.0f, 1.0f},
1);
+ }
+ });
+ assertThrows(IllegalStateException.class, new ThrowingRunnable() {
+ @Override
+ public void run() {
+ writer.writeIndex(new Object());
+ }
+ });
+ }
+
private static void testReaderAndWriterApiCompile() {
VectorIndexConfig config = VectorIndexConfig.ivfPq(2, 4, 1, Metric.L2,
false);
VectorIndexReader closedReader =
VectorIndexReader.fromNativePointerForTesting(0L);
diff --git
a/jni/java-test/org/apache/paimon/index/ivfpq/VectorIndexNativeHandleSafetyTest.java
b/jni/java-test/org/apache/paimon/index/ivfpq/VectorIndexNativeHandleSafetyTest.java
new file mode 100644
index 0000000..7dc93f4
--- /dev/null
+++
b/jni/java-test/org/apache/paimon/index/ivfpq/VectorIndexNativeHandleSafetyTest.java
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.paimon.index.ivfpq;
+
+import java.io.ByteArrayOutputStream;
+
+public class VectorIndexNativeHandleSafetyTest {
+
+ public static void main(String[] args) {
+ if (args.length != 1) {
+ throw new IllegalArgumentException("native library path is
required");
+ }
+
+ System.load(args[0]);
+
+ testWriterRejectsReentrantCloseDuringNativeCall();
+ testReaderRejectsReentrantCloseDuringNativeCall();
+ }
+
+ private static void testWriterRejectsReentrantCloseDuringNativeCall() {
+ VectorIndexWriter writer = newPopulatedWriter();
+ SelfClosingOutputStream output = new SelfClosingOutputStream(writer);
+ try {
+ writer.writeIndex(output);
+ assertTrue(output.closeAttempted());
+ assertTrue(output.closeRejected());
+ assertTrue(output.toByteArray().length > 0);
+ } finally {
+ writer.close();
+ }
+ }
+
+ private static void testReaderRejectsReentrantCloseDuringNativeCall() {
+ SelfClosingSeekableInputStream input =
+ new SelfClosingSeekableInputStream(buildIndexBytes());
+ VectorIndexReader reader = new VectorIndexReader(input);
+ input.setReader(reader);
+ try {
+ VectorSearchResult result = reader.search(new float[] {0.0f}, 1,
1);
+ assertEquals(1, result.ids().length);
+ assertTrue(input.closeAttempted());
+ assertTrue(input.closeRejected());
+ assertEquals(2L, reader.totalVectors());
+ } finally {
+ reader.close();
+ }
+ }
+
+ private static byte[] buildIndexBytes() {
+ VectorIndexWriter writer = newPopulatedWriter();
+ ByteArrayPositionOutputStream output = new
ByteArrayPositionOutputStream();
+ try {
+ writer.writeIndex(output);
+ return output.toByteArray();
+ } finally {
+ writer.close();
+ }
+ }
+
+ private static VectorIndexWriter newPopulatedWriter() {
+ VectorIndexWriter writer =
+ new VectorIndexWriter(VectorIndexConfig.ivfFlat(1, 1,
Metric.L2));
+ writer.train(new float[] {0.0f, 1.0f}, 2);
+ writer.addVectors(new long[] {1L, 2L}, new float[] {0.0f, 1.0f}, 2);
+ return writer;
+ }
+
+ private static void assertEquals(int expected, int actual) {
+ if (expected != actual) {
+ throw new AssertionError("expected " + expected + " but got " +
actual);
+ }
+ }
+
+ private static void assertEquals(long expected, long actual) {
+ if (expected != actual) {
+ throw new AssertionError("expected " + expected + " but got " +
actual);
+ }
+ }
+
+ private static void assertTrue(boolean value) {
+ if (!value) {
+ throw new AssertionError("expected true");
+ }
+ }
+
+ public static final class ByteArrayPositionOutputStream {
+ private final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ public void write(byte[] bytes) {
+ out.write(bytes, 0, bytes.length);
+ }
+
+ public byte[] toByteArray() {
+ return out.toByteArray();
+ }
+ }
+
+ public static final class SelfClosingOutputStream {
+ private final VectorIndexWriter writer;
+ private final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ private boolean closeAttempted;
+ private boolean closeRejected;
+
+ SelfClosingOutputStream(VectorIndexWriter writer) {
+ this.writer = writer;
+ }
+
+ public void write(byte[] bytes) {
+ if (!closeAttempted) {
+ closeAttempted = true;
+ try {
+ writer.close();
+ } catch (IllegalStateException expected) {
+ closeRejected = true;
+ }
+ }
+ out.write(bytes, 0, bytes.length);
+ }
+
+ boolean closeAttempted() {
+ return closeAttempted;
+ }
+
+ boolean closeRejected() {
+ return closeRejected;
+ }
+
+ byte[] toByteArray() {
+ return out.toByteArray();
+ }
+ }
+
+ public static final class SelfClosingSeekableInputStream implements
VectorIndexInput {
+ private final byte[] data;
+ private VectorIndexReader reader;
+ private boolean closeAttempted;
+ private boolean closeRejected;
+
+ SelfClosingSeekableInputStream(byte[] data) {
+ this.data = data.clone();
+ }
+
+ void setReader(VectorIndexReader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public void pread(long[] positions, byte[][] buffers) {
+ tryCloseReaderOnce();
+ if (positions.length != buffers.length) {
+ throw new IllegalArgumentException("positions and buffers
length mismatch");
+ }
+ for (int i = 0; i < positions.length; i++) {
+ long readPosition = positions[i];
+ byte[] buffer = buffers[i];
+ if (readPosition < 0 || readPosition + buffer.length >
data.length) {
+ throw new IllegalArgumentException("read out of range: " +
readPosition);
+ }
+ System.arraycopy(data, (int) readPosition, buffer, 0,
buffer.length);
+ }
+ }
+
+ boolean closeAttempted() {
+ return closeAttempted;
+ }
+
+ boolean closeRejected() {
+ return closeRejected;
+ }
+
+ private void tryCloseReaderOnce() {
+ if (reader == null || closeAttempted) {
+ return;
+ }
+ closeAttempted = true;
+ try {
+ reader.close();
+ } catch (IllegalStateException expected) {
+ closeRejected = true;
+ }
+ }
+ }
+}
diff --git a/jni/java/org/apache/paimon/index/ivfpq/VectorIndexReader.java
b/jni/java/org/apache/paimon/index/ivfpq/VectorIndexReader.java
index d0da1f4..f6a5214 100644
--- a/jni/java/org/apache/paimon/index/ivfpq/VectorIndexReader.java
+++ b/jni/java/org/apache/paimon/index/ivfpq/VectorIndexReader.java
@@ -19,7 +19,9 @@ package org.apache.paimon.index.ivfpq;
public final class VectorIndexReader implements AutoCloseable {
+ private final Object nativeHandleLock = new Object();
private long nativePtr;
+ private Thread nativeHandleOwner;
private VectorIndexMetadata metadata;
public VectorIndexReader(VectorIndexInput input) {
@@ -38,10 +40,18 @@ public final class VectorIndexReader implements
AutoCloseable {
}
public VectorIndexMetadata metadata() {
- if (metadata == null) {
- metadata = VectorIndexNative.metadata(requireOpen());
+ synchronized (nativeHandleLock) {
+ enterNativeHandle();
+ try {
+ requireOpen();
+ if (metadata == null) {
+ metadata = VectorIndexNative.metadata(nativePtr);
+ }
+ return metadata;
+ } finally {
+ exitNativeHandle();
+ }
}
- return metadata;
}
public IndexType indexType() {
@@ -63,7 +73,14 @@ public final class VectorIndexReader implements
AutoCloseable {
public VectorSearchResult search(float[] query, int topK, int nprobe, int
efSearch) {
validateQuery(query);
validateSearchParams(topK, nprobe, efSearch);
- return VectorIndexNative.search(requireOpen(), query, topK, nprobe,
efSearch);
+ synchronized (nativeHandleLock) {
+ enterNativeHandle();
+ try {
+ return VectorIndexNative.search(requireOpen(), query, topK,
nprobe, efSearch);
+ } finally {
+ exitNativeHandle();
+ }
+ }
}
public VectorSearchResult search(float[] query, int topK, int nprobe,
byte[] roaringFilter) {
@@ -77,8 +94,15 @@ public final class VectorIndexReader implements
AutoCloseable {
throw new NullPointerException("roaringFilter");
}
validateSearchParams(topK, nprobe, efSearch);
- return VectorIndexNative.searchWithRoaringFilter(
- requireOpen(), query, topK, nprobe, efSearch, roaringFilter);
+ synchronized (nativeHandleLock) {
+ enterNativeHandle();
+ try {
+ return VectorIndexNative.searchWithRoaringFilter(
+ requireOpen(), query, topK, nprobe, efSearch,
roaringFilter);
+ } finally {
+ exitNativeHandle();
+ }
+ }
}
public VectorSearchBatchResult searchBatch(
@@ -90,7 +114,15 @@ public final class VectorIndexReader implements
AutoCloseable {
float[] queries, int queryCount, int topK, int nprobe, int
efSearch) {
validateQueries(queries, queryCount);
validateSearchParams(topK, nprobe, efSearch);
- return VectorIndexNative.searchBatch(requireOpen(), queries,
queryCount, topK, nprobe, efSearch);
+ synchronized (nativeHandleLock) {
+ enterNativeHandle();
+ try {
+ return VectorIndexNative.searchBatch(
+ requireOpen(), queries, queryCount, topK, nprobe,
efSearch);
+ } finally {
+ exitNativeHandle();
+ }
+ }
}
public VectorSearchBatchResult searchBatch(
@@ -110,16 +142,30 @@ public final class VectorIndexReader implements
AutoCloseable {
throw new NullPointerException("roaringFilter");
}
validateSearchParams(topK, nprobe, efSearch);
- return VectorIndexNative.searchBatchWithRoaringFilter(
- requireOpen(), queries, queryCount, topK, nprobe, efSearch,
roaringFilter);
+ synchronized (nativeHandleLock) {
+ enterNativeHandle();
+ try {
+ return VectorIndexNative.searchBatchWithRoaringFilter(
+ requireOpen(), queries, queryCount, topK, nprobe,
efSearch, roaringFilter);
+ } finally {
+ exitNativeHandle();
+ }
+ }
}
@Override
public void close() {
- long ptr = nativePtr;
- nativePtr = 0L;
- if (ptr != 0L) {
- VectorIndexNative.freeReader(ptr);
+ synchronized (nativeHandleLock) {
+ enterNativeHandle();
+ try {
+ long ptr = nativePtr;
+ nativePtr = 0L;
+ if (ptr != 0L) {
+ VectorIndexNative.freeReader(ptr);
+ }
+ } finally {
+ exitNativeHandle();
+ }
}
}
@@ -162,4 +208,16 @@ public final class VectorIndexReader implements
AutoCloseable {
}
return nativePtr;
}
+
+ private void enterNativeHandle() {
+ Thread current = Thread.currentThread();
+ if (nativeHandleOwner == current) {
+ throw new IllegalStateException("VectorIndexReader native handle
is already in use");
+ }
+ nativeHandleOwner = current;
+ }
+
+ private void exitNativeHandle() {
+ nativeHandleOwner = null;
+ }
}
diff --git a/jni/java/org/apache/paimon/index/ivfpq/VectorIndexWriter.java
b/jni/java/org/apache/paimon/index/ivfpq/VectorIndexWriter.java
index f62d195..5bebe50 100644
--- a/jni/java/org/apache/paimon/index/ivfpq/VectorIndexWriter.java
+++ b/jni/java/org/apache/paimon/index/ivfpq/VectorIndexWriter.java
@@ -20,7 +20,9 @@ package org.apache.paimon.index.ivfpq;
public final class VectorIndexWriter implements AutoCloseable {
private final VectorIndexConfig config;
+ private final Object nativeHandleLock = new Object();
private long nativePtr;
+ private Thread nativeHandleOwner;
public VectorIndexWriter(VectorIndexConfig config) {
if (config == null) {
@@ -60,7 +62,14 @@ public final class VectorIndexWriter implements
AutoCloseable {
public void train(float[] data, int vectorCount) {
validateVectors(data, vectorCount);
- VectorIndexNative.train(requireOpen(), data, vectorCount);
+ synchronized (nativeHandleLock) {
+ enterNativeHandle();
+ try {
+ VectorIndexNative.train(requireOpen(), data, vectorCount);
+ } finally {
+ exitNativeHandle();
+ }
+ }
}
public void addVectors(long[] ids, float[] data, int vectorCount) {
@@ -72,22 +81,43 @@ public final class VectorIndexWriter implements
AutoCloseable {
throw new IllegalArgumentException(
"ids length " + ids.length + " < vectorCount " +
vectorCount);
}
- VectorIndexNative.addVectors(requireOpen(), ids, data, vectorCount);
+ synchronized (nativeHandleLock) {
+ enterNativeHandle();
+ try {
+ VectorIndexNative.addVectors(requireOpen(), ids, data,
vectorCount);
+ } finally {
+ exitNativeHandle();
+ }
+ }
}
public void writeIndex(Object output) {
if (output == null) {
throw new NullPointerException("output");
}
- VectorIndexNative.writeIndex(requireOpen(), output);
+ synchronized (nativeHandleLock) {
+ enterNativeHandle();
+ try {
+ VectorIndexNative.writeIndex(requireOpen(), output);
+ } finally {
+ exitNativeHandle();
+ }
+ }
}
@Override
public void close() {
- long ptr = nativePtr;
- nativePtr = 0L;
- if (ptr != 0L) {
- VectorIndexNative.freeWriter(ptr);
+ synchronized (nativeHandleLock) {
+ enterNativeHandle();
+ try {
+ long ptr = nativePtr;
+ nativePtr = 0L;
+ if (ptr != 0L) {
+ VectorIndexNative.freeWriter(ptr);
+ }
+ } finally {
+ exitNativeHandle();
+ }
}
}
@@ -112,4 +142,16 @@ public final class VectorIndexWriter implements
AutoCloseable {
}
return nativePtr;
}
+
+ private void enterNativeHandle() {
+ Thread current = Thread.currentThread();
+ if (nativeHandleOwner == current) {
+ throw new IllegalStateException("VectorIndexWriter native handle
is already in use");
+ }
+ nativeHandleOwner = current;
+ }
+
+ private void exitNativeHandle() {
+ nativeHandleOwner = null;
+ }
}