This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-vector-index.git


The following commit(s) were added to refs/heads/main by this push:
     new a95b679  Guard JNI native handle lifecycle (#27)
a95b679 is described below

commit a95b6793e2df94948eb3dd22d322543e167d5ca8
Author: QuakeWang <[email protected]>
AuthorDate: Wed Jun 10 17:57:32 2026 +0800

    Guard JNI native handle lifecycle (#27)
---
 .github/workflows/ci.yml                           |   2 +
 .../paimon/index/ivfpq/VectorIndexJavaApiTest.java |  71 ++++++++
 .../ivfpq/VectorIndexNativeHandleSafetyTest.java   | 198 +++++++++++++++++++++
 .../paimon/index/ivfpq/VectorIndexReader.java      |  84 +++++++--
 .../paimon/index/ivfpq/VectorIndexWriter.java      |  56 +++++-
 5 files changed, 391 insertions(+), 20 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 307394b..2293a62 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -117,6 +117,8 @@ jobs:
         run: |
           java -cp target/java-api-test 
org.apache.paimon.index.ivfpq.VectorIndexNativePanicBoundaryTest \
             "$(pwd)/target/release/libpaimon_vindex_jni.so"
+          java -cp target/java-api-test 
org.apache.paimon.index.ivfpq.VectorIndexNativeHandleSafetyTest \
+            "$(pwd)/target/release/libpaimon_vindex_jni.so"
 
   python-build:
     runs-on: ubuntu-latest
diff --git 
a/jni/java-test/org/apache/paimon/index/ivfpq/VectorIndexJavaApiTest.java 
b/jni/java-test/org/apache/paimon/index/ivfpq/VectorIndexJavaApiTest.java
index 4ea2a68..d24badc 100644
--- a/jni/java-test/org/apache/paimon/index/ivfpq/VectorIndexJavaApiTest.java
+++ b/jni/java-test/org/apache/paimon/index/ivfpq/VectorIndexJavaApiTest.java
@@ -28,6 +28,8 @@ public class VectorIndexJavaApiTest {
         testBatchResultCopiesArraysAndSlicesRows();
         testConfigValidation();
         testMetadata();
+        testClosedReaderRejectsOperations();
+        testClosedWriterRejectsOperations();
         testReaderAndWriterApiCompile();
     }
 
@@ -139,6 +141,75 @@ public class VectorIndexJavaApiTest {
         assertEquals(20, metadata.hnsw().m());
     }
 
+    private static void testClosedReaderRejectsOperations() {
+        final VectorIndexReader reader = 
VectorIndexReader.fromNativePointerForTesting(0L);
+        reader.close();
+        reader.close();
+
+        assertThrows(IllegalStateException.class, new ThrowingRunnable() {
+            @Override
+            public void run() {
+                reader.metadata();
+            }
+        });
+        assertThrows(IllegalStateException.class, new ThrowingRunnable() {
+            @Override
+            public void run() {
+                reader.indexType();
+            }
+        });
+        assertThrows(IllegalStateException.class, new ThrowingRunnable() {
+            @Override
+            public void run() {
+                reader.dimension();
+            }
+        });
+        assertThrows(IllegalStateException.class, new ThrowingRunnable() {
+            @Override
+            public void run() {
+                reader.totalVectors();
+            }
+        });
+        assertThrows(IllegalStateException.class, new ThrowingRunnable() {
+            @Override
+            public void run() {
+                reader.search(new float[] {0.0f}, 1, 1);
+            }
+        });
+        assertThrows(IllegalStateException.class, new ThrowingRunnable() {
+            @Override
+            public void run() {
+                reader.searchBatch(new float[] {0.0f}, 1, 1, 1);
+            }
+        });
+    }
+
+    private static void testClosedWriterRejectsOperations() {
+        VectorIndexConfig config = VectorIndexConfig.ivfPq(2, 4, 1, Metric.L2, 
false);
+        final VectorIndexWriter writer = 
VectorIndexWriter.fromNativePointerForTesting(0L, config);
+        writer.close();
+        writer.close();
+
+        assertThrows(IllegalStateException.class, new ThrowingRunnable() {
+            @Override
+            public void run() {
+                writer.train(new float[] {0.0f, 1.0f}, 1);
+            }
+        });
+        assertThrows(IllegalStateException.class, new ThrowingRunnable() {
+            @Override
+            public void run() {
+                writer.addVectors(new long[] {1L}, new float[] {0.0f, 1.0f}, 
1);
+            }
+        });
+        assertThrows(IllegalStateException.class, new ThrowingRunnable() {
+            @Override
+            public void run() {
+                writer.writeIndex(new Object());
+            }
+        });
+    }
+
     private static void testReaderAndWriterApiCompile() {
         VectorIndexConfig config = VectorIndexConfig.ivfPq(2, 4, 1, Metric.L2, 
false);
         VectorIndexReader closedReader = 
VectorIndexReader.fromNativePointerForTesting(0L);
diff --git 
a/jni/java-test/org/apache/paimon/index/ivfpq/VectorIndexNativeHandleSafetyTest.java
 
b/jni/java-test/org/apache/paimon/index/ivfpq/VectorIndexNativeHandleSafetyTest.java
new file mode 100644
index 0000000..7dc93f4
--- /dev/null
+++ 
b/jni/java-test/org/apache/paimon/index/ivfpq/VectorIndexNativeHandleSafetyTest.java
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.paimon.index.ivfpq;
+
+import java.io.ByteArrayOutputStream;
+
+public class VectorIndexNativeHandleSafetyTest {
+
+    public static void main(String[] args) {
+        if (args.length != 1) {
+            throw new IllegalArgumentException("native library path is 
required");
+        }
+
+        System.load(args[0]);
+
+        testWriterRejectsReentrantCloseDuringNativeCall();
+        testReaderRejectsReentrantCloseDuringNativeCall();
+    }
+
+    private static void testWriterRejectsReentrantCloseDuringNativeCall() {
+        VectorIndexWriter writer = newPopulatedWriter();
+        SelfClosingOutputStream output = new SelfClosingOutputStream(writer);
+        try {
+            writer.writeIndex(output);
+            assertTrue(output.closeAttempted());
+            assertTrue(output.closeRejected());
+            assertTrue(output.toByteArray().length > 0);
+        } finally {
+            writer.close();
+        }
+    }
+
+    private static void testReaderRejectsReentrantCloseDuringNativeCall() {
+        SelfClosingSeekableInputStream input =
+                new SelfClosingSeekableInputStream(buildIndexBytes());
+        VectorIndexReader reader = new VectorIndexReader(input);
+        input.setReader(reader);
+        try {
+            VectorSearchResult result = reader.search(new float[] {0.0f}, 1, 
1);
+            assertEquals(1, result.ids().length);
+            assertTrue(input.closeAttempted());
+            assertTrue(input.closeRejected());
+            assertEquals(2L, reader.totalVectors());
+        } finally {
+            reader.close();
+        }
+    }
+
+    private static byte[] buildIndexBytes() {
+        VectorIndexWriter writer = newPopulatedWriter();
+        ByteArrayPositionOutputStream output = new 
ByteArrayPositionOutputStream();
+        try {
+            writer.writeIndex(output);
+            return output.toByteArray();
+        } finally {
+            writer.close();
+        }
+    }
+
+    private static VectorIndexWriter newPopulatedWriter() {
+        VectorIndexWriter writer =
+                new VectorIndexWriter(VectorIndexConfig.ivfFlat(1, 1, 
Metric.L2));
+        writer.train(new float[] {0.0f, 1.0f}, 2);
+        writer.addVectors(new long[] {1L, 2L}, new float[] {0.0f, 1.0f}, 2);
+        return writer;
+    }
+
+    private static void assertEquals(int expected, int actual) {
+        if (expected != actual) {
+            throw new AssertionError("expected " + expected + " but got " + 
actual);
+        }
+    }
+
+    private static void assertEquals(long expected, long actual) {
+        if (expected != actual) {
+            throw new AssertionError("expected " + expected + " but got " + 
actual);
+        }
+    }
+
+    private static void assertTrue(boolean value) {
+        if (!value) {
+            throw new AssertionError("expected true");
+        }
+    }
+
+    public static final class ByteArrayPositionOutputStream {
+        private final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+        public void write(byte[] bytes) {
+            out.write(bytes, 0, bytes.length);
+        }
+
+        public byte[] toByteArray() {
+            return out.toByteArray();
+        }
+    }
+
+    public static final class SelfClosingOutputStream {
+        private final VectorIndexWriter writer;
+        private final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        private boolean closeAttempted;
+        private boolean closeRejected;
+
+        SelfClosingOutputStream(VectorIndexWriter writer) {
+            this.writer = writer;
+        }
+
+        public void write(byte[] bytes) {
+            if (!closeAttempted) {
+                closeAttempted = true;
+                try {
+                    writer.close();
+                } catch (IllegalStateException expected) {
+                    closeRejected = true;
+                }
+            }
+            out.write(bytes, 0, bytes.length);
+        }
+
+        boolean closeAttempted() {
+            return closeAttempted;
+        }
+
+        boolean closeRejected() {
+            return closeRejected;
+        }
+
+        byte[] toByteArray() {
+            return out.toByteArray();
+        }
+    }
+
+    public static final class SelfClosingSeekableInputStream implements 
VectorIndexInput {
+        private final byte[] data;
+        private VectorIndexReader reader;
+        private boolean closeAttempted;
+        private boolean closeRejected;
+
+        SelfClosingSeekableInputStream(byte[] data) {
+            this.data = data.clone();
+        }
+
+        void setReader(VectorIndexReader reader) {
+            this.reader = reader;
+        }
+
+        @Override
+        public void pread(long[] positions, byte[][] buffers) {
+            tryCloseReaderOnce();
+            if (positions.length != buffers.length) {
+                throw new IllegalArgumentException("positions and buffers 
length mismatch");
+            }
+            for (int i = 0; i < positions.length; i++) {
+                long readPosition = positions[i];
+                byte[] buffer = buffers[i];
+                if (readPosition < 0 || readPosition + buffer.length > 
data.length) {
+                    throw new IllegalArgumentException("read out of range: " + 
readPosition);
+                }
+                System.arraycopy(data, (int) readPosition, buffer, 0, 
buffer.length);
+            }
+        }
+
+        boolean closeAttempted() {
+            return closeAttempted;
+        }
+
+        boolean closeRejected() {
+            return closeRejected;
+        }
+
+        private void tryCloseReaderOnce() {
+            if (reader == null || closeAttempted) {
+                return;
+            }
+            closeAttempted = true;
+            try {
+                reader.close();
+            } catch (IllegalStateException expected) {
+                closeRejected = true;
+            }
+        }
+    }
+}
diff --git a/jni/java/org/apache/paimon/index/ivfpq/VectorIndexReader.java 
b/jni/java/org/apache/paimon/index/ivfpq/VectorIndexReader.java
index d0da1f4..f6a5214 100644
--- a/jni/java/org/apache/paimon/index/ivfpq/VectorIndexReader.java
+++ b/jni/java/org/apache/paimon/index/ivfpq/VectorIndexReader.java
@@ -19,7 +19,9 @@ package org.apache.paimon.index.ivfpq;
 
 public final class VectorIndexReader implements AutoCloseable {
 
+    private final Object nativeHandleLock = new Object();
     private long nativePtr;
+    private Thread nativeHandleOwner;
     private VectorIndexMetadata metadata;
 
     public VectorIndexReader(VectorIndexInput input) {
@@ -38,10 +40,18 @@ public final class VectorIndexReader implements 
AutoCloseable {
     }
 
     public VectorIndexMetadata metadata() {
-        if (metadata == null) {
-            metadata = VectorIndexNative.metadata(requireOpen());
+        synchronized (nativeHandleLock) {
+            enterNativeHandle();
+            try {
+                requireOpen();
+                if (metadata == null) {
+                    metadata = VectorIndexNative.metadata(nativePtr);
+                }
+                return metadata;
+            } finally {
+                exitNativeHandle();
+            }
         }
-        return metadata;
     }
 
     public IndexType indexType() {
@@ -63,7 +73,14 @@ public final class VectorIndexReader implements 
AutoCloseable {
     public VectorSearchResult search(float[] query, int topK, int nprobe, int 
efSearch) {
         validateQuery(query);
         validateSearchParams(topK, nprobe, efSearch);
-        return VectorIndexNative.search(requireOpen(), query, topK, nprobe, 
efSearch);
+        synchronized (nativeHandleLock) {
+            enterNativeHandle();
+            try {
+                return VectorIndexNative.search(requireOpen(), query, topK, 
nprobe, efSearch);
+            } finally {
+                exitNativeHandle();
+            }
+        }
     }
 
     public VectorSearchResult search(float[] query, int topK, int nprobe, 
byte[] roaringFilter) {
@@ -77,8 +94,15 @@ public final class VectorIndexReader implements 
AutoCloseable {
             throw new NullPointerException("roaringFilter");
         }
         validateSearchParams(topK, nprobe, efSearch);
-        return VectorIndexNative.searchWithRoaringFilter(
-                requireOpen(), query, topK, nprobe, efSearch, roaringFilter);
+        synchronized (nativeHandleLock) {
+            enterNativeHandle();
+            try {
+                return VectorIndexNative.searchWithRoaringFilter(
+                        requireOpen(), query, topK, nprobe, efSearch, 
roaringFilter);
+            } finally {
+                exitNativeHandle();
+            }
+        }
     }
 
     public VectorSearchBatchResult searchBatch(
@@ -90,7 +114,15 @@ public final class VectorIndexReader implements 
AutoCloseable {
             float[] queries, int queryCount, int topK, int nprobe, int 
efSearch) {
         validateQueries(queries, queryCount);
         validateSearchParams(topK, nprobe, efSearch);
-        return VectorIndexNative.searchBatch(requireOpen(), queries, 
queryCount, topK, nprobe, efSearch);
+        synchronized (nativeHandleLock) {
+            enterNativeHandle();
+            try {
+                return VectorIndexNative.searchBatch(
+                        requireOpen(), queries, queryCount, topK, nprobe, 
efSearch);
+            } finally {
+                exitNativeHandle();
+            }
+        }
     }
 
     public VectorSearchBatchResult searchBatch(
@@ -110,16 +142,30 @@ public final class VectorIndexReader implements 
AutoCloseable {
             throw new NullPointerException("roaringFilter");
         }
         validateSearchParams(topK, nprobe, efSearch);
-        return VectorIndexNative.searchBatchWithRoaringFilter(
-                requireOpen(), queries, queryCount, topK, nprobe, efSearch, 
roaringFilter);
+        synchronized (nativeHandleLock) {
+            enterNativeHandle();
+            try {
+                return VectorIndexNative.searchBatchWithRoaringFilter(
+                        requireOpen(), queries, queryCount, topK, nprobe, 
efSearch, roaringFilter);
+            } finally {
+                exitNativeHandle();
+            }
+        }
     }
 
     @Override
     public void close() {
-        long ptr = nativePtr;
-        nativePtr = 0L;
-        if (ptr != 0L) {
-            VectorIndexNative.freeReader(ptr);
+        synchronized (nativeHandleLock) {
+            enterNativeHandle();
+            try {
+                long ptr = nativePtr;
+                nativePtr = 0L;
+                if (ptr != 0L) {
+                    VectorIndexNative.freeReader(ptr);
+                }
+            } finally {
+                exitNativeHandle();
+            }
         }
     }
 
@@ -162,4 +208,16 @@ public final class VectorIndexReader implements 
AutoCloseable {
         }
         return nativePtr;
     }
+
+    private void enterNativeHandle() {
+        Thread current = Thread.currentThread();
+        if (nativeHandleOwner == current) {
+            throw new IllegalStateException("VectorIndexReader native handle 
is already in use");
+        }
+        nativeHandleOwner = current;
+    }
+
+    private void exitNativeHandle() {
+        nativeHandleOwner = null;
+    }
 }
diff --git a/jni/java/org/apache/paimon/index/ivfpq/VectorIndexWriter.java 
b/jni/java/org/apache/paimon/index/ivfpq/VectorIndexWriter.java
index f62d195..5bebe50 100644
--- a/jni/java/org/apache/paimon/index/ivfpq/VectorIndexWriter.java
+++ b/jni/java/org/apache/paimon/index/ivfpq/VectorIndexWriter.java
@@ -20,7 +20,9 @@ package org.apache.paimon.index.ivfpq;
 public final class VectorIndexWriter implements AutoCloseable {
 
     private final VectorIndexConfig config;
+    private final Object nativeHandleLock = new Object();
     private long nativePtr;
+    private Thread nativeHandleOwner;
 
     public VectorIndexWriter(VectorIndexConfig config) {
         if (config == null) {
@@ -60,7 +62,14 @@ public final class VectorIndexWriter implements 
AutoCloseable {
 
     public void train(float[] data, int vectorCount) {
         validateVectors(data, vectorCount);
-        VectorIndexNative.train(requireOpen(), data, vectorCount);
+        synchronized (nativeHandleLock) {
+            enterNativeHandle();
+            try {
+                VectorIndexNative.train(requireOpen(), data, vectorCount);
+            } finally {
+                exitNativeHandle();
+            }
+        }
     }
 
     public void addVectors(long[] ids, float[] data, int vectorCount) {
@@ -72,22 +81,43 @@ public final class VectorIndexWriter implements 
AutoCloseable {
             throw new IllegalArgumentException(
                     "ids length " + ids.length + " < vectorCount " + 
vectorCount);
         }
-        VectorIndexNative.addVectors(requireOpen(), ids, data, vectorCount);
+        synchronized (nativeHandleLock) {
+            enterNativeHandle();
+            try {
+                VectorIndexNative.addVectors(requireOpen(), ids, data, 
vectorCount);
+            } finally {
+                exitNativeHandle();
+            }
+        }
     }
 
     public void writeIndex(Object output) {
         if (output == null) {
             throw new NullPointerException("output");
         }
-        VectorIndexNative.writeIndex(requireOpen(), output);
+        synchronized (nativeHandleLock) {
+            enterNativeHandle();
+            try {
+                VectorIndexNative.writeIndex(requireOpen(), output);
+            } finally {
+                exitNativeHandle();
+            }
+        }
     }
 
     @Override
     public void close() {
-        long ptr = nativePtr;
-        nativePtr = 0L;
-        if (ptr != 0L) {
-            VectorIndexNative.freeWriter(ptr);
+        synchronized (nativeHandleLock) {
+            enterNativeHandle();
+            try {
+                long ptr = nativePtr;
+                nativePtr = 0L;
+                if (ptr != 0L) {
+                    VectorIndexNative.freeWriter(ptr);
+                }
+            } finally {
+                exitNativeHandle();
+            }
         }
     }
 
@@ -112,4 +142,16 @@ public final class VectorIndexWriter implements 
AutoCloseable {
         }
         return nativePtr;
     }
+
+    private void enterNativeHandle() {
+        Thread current = Thread.currentThread();
+        if (nativeHandleOwner == current) {
+            throw new IllegalStateException("VectorIndexWriter native handle 
is already in use");
+        }
+        nativeHandleOwner = current;
+    }
+
+    private void exitNativeHandle() {
+        nativeHandleOwner = null;
+    }
 }

Reply via email to