This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-vector-index.git
The following commit(s) were added to refs/heads/main by this push:
new a18e446 Guard JNI native calls against Rust panics (#21)
a18e446 is described below
commit a18e446f8a64aaa1fdd117dcbc12e02d1748b453
Author: QuakeWang <[email protected]>
AuthorDate: Tue Jun 9 21:38:07 2026 +0800
Guard JNI native calls against Rust panics (#21)
---
.github/workflows/ci.yml | 5 +
.../index/ivfpq/IVFPQNativePanicBoundaryTest.java | 160 +++
jni/src/lib.rs | 1215 +++++++++++---------
3 files changed, 805 insertions(+), 575 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 81e421c..71845a2 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -113,6 +113,11 @@ jobs:
- name: Build JNI library
run: cargo build -p paimon-vindex-jni --release
+ - name: Test JNI panic boundary
+ run: |
+ java -cp target/java-api-test
org.apache.paimon.index.ivfpq.IVFPQNativePanicBoundaryTest \
+ "$(pwd)/target/release/libpaimon_vindex_jni.so"
+
python-build:
runs-on: ubuntu-latest
steps:
diff --git
a/jni/java-test/org/apache/paimon/index/ivfpq/IVFPQNativePanicBoundaryTest.java
b/jni/java-test/org/apache/paimon/index/ivfpq/IVFPQNativePanicBoundaryTest.java
new file mode 100644
index 0000000..d9cc7ec
--- /dev/null
+++
b/jni/java-test/org/apache/paimon/index/ivfpq/IVFPQNativePanicBoundaryTest.java
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.paimon.index.ivfpq;
+
+import java.io.ByteArrayOutputStream;
+
+public class IVFPQNativePanicBoundaryTest {
+
+ public static void main(String[] args) {
+ if (args.length != 1) {
+ throw new IllegalArgumentException("native library path is
required");
+ }
+
+ System.load(args[0]);
+
+ testVoidEntrypointPanicBecomesRuntimeException();
+ testObjectEntrypointPanicBecomesRuntimeException();
+
+ IVFFlatWriter survivor = new IVFFlatWriter(1, 1, Metric.L2);
+ survivor.close();
+ }
+
+ private static void testVoidEntrypointPanicBecomesRuntimeException() {
+ final IVFFlatWriter writer = new IVFFlatWriter(1, 1, Metric.L2);
+ try {
+ assertThrows(RuntimeException.class, new ThrowingRunnable() {
+ @Override
+ public void run() {
+ writer.addVectors(new long[] {1L}, new float[] {1.0f}, 1);
+ }
+ });
+ } finally {
+ writer.close();
+ }
+ }
+
+ private static void testObjectEntrypointPanicBecomesRuntimeException() {
+ ByteArrayPositionOutputStream output = new
ByteArrayPositionOutputStream();
+ IVFFlatWriter writer = new IVFFlatWriter(1, 1, Metric.L2);
+ try {
+ writer.train(new float[] {0.0f, 1.0f}, 2);
+ writer.addVectors(new long[] {1L, 2L}, new float[] {Float.NaN,
1.0f}, 2);
+ writer.writeIndex(output);
+ } finally {
+ writer.close();
+ }
+
+ IVFFlatReader reader = new IVFFlatReader(new
ByteArraySeekableInputStream(output.toByteArray()));
+ try {
+ assertEquals(1, reader.dimension());
+ assertThrows(RuntimeException.class, new ThrowingRunnable() {
+ @Override
+ public void run() {
+ reader.search(new float[] {0.0f}, 2, 1);
+ }
+ });
+ assertEquals(2L, reader.totalVectors());
+ } finally {
+ reader.close();
+ }
+ }
+
+ private static void assertEquals(int expected, int actual) {
+ if (expected != actual) {
+ throw new AssertionError("expected " + expected + " but got " +
actual);
+ }
+ }
+
+ private static void assertEquals(long expected, long actual) {
+ if (expected != actual) {
+ throw new AssertionError("expected " + expected + " but got " +
actual);
+ }
+ }
+
+ private static void assertThrows(Class<? extends Throwable> expected,
ThrowingRunnable runnable) {
+ try {
+ runnable.run();
+ } catch (Throwable t) {
+ if (expected.isInstance(t)) {
+ String message = t.getMessage();
+ if (message == null || !message.contains("Rust panic in JNI
call")) {
+ throw new AssertionError("unexpected exception message: "
+ message, t);
+ }
+ return;
+ }
+ throw new AssertionError("expected " + expected.getName() + " but
got " + t.getClass().getName(), t);
+ }
+ throw new AssertionError("expected " + expected.getName());
+ }
+
+ private interface ThrowingRunnable {
+ void run() throws Throwable;
+ }
+
+ public static final class ByteArrayPositionOutputStream {
+ private final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ public void write(byte[] bytes) {
+ out.write(bytes, 0, bytes.length);
+ }
+
+ public byte[] toByteArray() {
+ return out.toByteArray();
+ }
+ }
+
+ public static final class ByteArraySeekableInputStream {
+ private final byte[] data;
+ private int position;
+
+ ByteArraySeekableInputStream(byte[] data) {
+ this.data = data.clone();
+ }
+
+ public void seek(long newPosition) {
+ if (newPosition < 0 || newPosition > data.length) {
+ throw new IllegalArgumentException("position out of range: " +
newPosition);
+ }
+ this.position = (int) newPosition;
+ }
+
+ public int read(byte[] buffer, int offset, int length) {
+ if (position >= data.length) {
+ return -1;
+ }
+ int bytesToRead = Math.min(length, data.length - position);
+ System.arraycopy(data, position, buffer, offset, bytesToRead);
+ position += bytesToRead;
+ return bytesToRead;
+ }
+
+ public int pread(long readPosition, byte[] buffer, int offset, int
length) {
+ if (readPosition < 0 || readPosition > data.length) {
+ return -1;
+ }
+ int start = (int) readPosition;
+ if (start >= data.length) {
+ return -1;
+ }
+ int bytesToRead = Math.min(length, data.length - start);
+ System.arraycopy(data, start, buffer, offset, bytesToRead);
+ return bytesToRead;
+ }
+ }
+}
diff --git a/jni/src/lib.rs b/jni/src/lib.rs
index ca0b079..8519d32 100644
--- a/jni/src/lib.rs
+++ b/jni/src/lib.rs
@@ -30,6 +30,8 @@ use paimon_vindex_core::ivfflat_io::{
use paimon_vindex_core::ivfpq::{
search_batch_reader, search_batch_reader_roaring_filter, IVFPQIndex,
};
+use std::any::Any;
+use std::panic::{catch_unwind, AssertUnwindSafe};
use stream::{JniOutputStream, JniSeekableStream};
fn throw_and_return<T: Default>(env: &mut JNIEnv, msg: &str) -> T {
@@ -37,6 +39,35 @@ fn throw_and_return<T: Default>(env: &mut JNIEnv, msg: &str)
-> T {
T::default()
}
+fn jni_call<T, F>(mut env: JNIEnv, f: F) -> T
+where
+ T: Default,
+ F: FnOnce(&mut JNIEnv) -> T,
+{
+ match catch_unwind(AssertUnwindSafe(|| f(&mut env))) {
+ Ok(value) => value,
+ Err(payload) => throw_panic_and_return(&mut env, &*payload),
+ }
+}
+
+fn jni_call_void<F>(env: JNIEnv, f: F)
+where
+ F: FnOnce(&mut JNIEnv),
+{
+ jni_call(env, |env| f(env))
+}
+
+fn throw_panic_and_return<T: Default>(env: &mut JNIEnv, payload: &(dyn Any +
Send)) -> T {
+ let payload = if let Some(message) = payload.downcast_ref::<&str>() {
+ *message
+ } else if let Some(message) = payload.downcast_ref::<String>() {
+ message.as_str()
+ } else {
+ "unknown panic payload"
+ };
+ throw_and_return(env, &format!("Rust panic in JNI call: {}", payload))
+}
+
fn deref_writer(ptr: jlong) -> Option<&'static mut IVFPQIndex> {
if ptr == 0 {
None
@@ -153,7 +184,7 @@ fn build_batch_result(
#[no_mangle]
pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFPQNative_createWriter(
- mut env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
d: jint,
nlist: jint,
@@ -161,246 +192,254 @@ pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFPQNative_createWrit
metric: jint,
use_opq: jboolean,
) -> jlong {
- if d <= 0 || nlist <= 0 || m <= 0 {
- return throw_and_return(
- &mut env,
- &format!("invalid parameters: d={}, nlist={}, m={}", d, nlist, m),
- );
- }
- if d % m != 0 {
- return throw_and_return(&mut env, &format!("d={} must be divisible by
m={}", d, m));
- }
+ jni_call(env, |env| {
+ if d <= 0 || nlist <= 0 || m <= 0 {
+ return throw_and_return(
+ env,
+ &format!("invalid parameters: d={}, nlist={}, m={}", d, nlist,
m),
+ );
+ }
+ if d % m != 0 {
+ return throw_and_return(env, &format!("d={} must be divisible by
m={}", d, m));
+ }
- let metric_type = match MetricType::from_code(metric as u32) {
- Some(m) => m,
- None => return throw_and_return(&mut env, &format!("Unknown metric:
{}", metric)),
- };
+ let metric_type = match MetricType::from_code(metric as u32) {
+ Some(m) => m,
+ None => return throw_and_return(env, &format!("Unknown metric:
{}", metric)),
+ };
- let index = Box::new(IVFPQIndex::new(
- d as usize,
- nlist as usize,
- m as usize,
- metric_type,
- use_opq != 0,
- ));
- Box::into_raw(index) as jlong
+ let index = Box::new(IVFPQIndex::new(
+ d as usize,
+ nlist as usize,
+ m as usize,
+ metric_type,
+ use_opq != 0,
+ ));
+ Box::into_raw(index) as jlong
+ })
}
#[no_mangle]
pub extern "system" fn Java_org_apache_paimon_index_ivfpq_IVFPQNative_train(
- mut env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
ptr: jlong,
data: JFloatArray,
n: jint,
) {
- let index = match deref_writer(ptr) {
- Some(i) => i,
- None => return throw_and_return(&mut env, "null native pointer (writer
already freed?)"),
- };
+ jni_call_void(env, |env| {
+ let index = match deref_writer(ptr) {
+ Some(i) => i,
+ None => return throw_and_return(env, "null native pointer (writer
already freed?)"),
+ };
- if n <= 0 {
- return throw_and_return(&mut env, &format!("invalid n: {}", n));
- }
- let n = n as usize;
+ if n <= 0 {
+ return throw_and_return(env, &format!("invalid n: {}", n));
+ }
+ let n = n as usize;
- let len = match env.get_array_length(&data) {
- Ok(l) => l as usize,
- Err(e) => return throw_and_return(&mut env,
&format!("get_array_length: {}", e)),
- };
+ let len = match env.get_array_length(&data) {
+ Ok(l) => l as usize,
+ Err(e) => return throw_and_return(env, &format!("get_array_length:
{}", e)),
+ };
- if len < n * index.d {
- return throw_and_return(
- &mut env,
- &format!(
- "data array too short: {} < n*d={}*{}={}",
- len,
- n,
- index.d,
- n * index.d
- ),
- );
- }
+ if len < n * index.d {
+ return throw_and_return(
+ env,
+ &format!(
+ "data array too short: {} < n*d={}*{}={}",
+ len,
+ n,
+ index.d,
+ n * index.d
+ ),
+ );
+ }
- let mut buf = vec![0.0f32; len];
- if let Err(e) = env.get_float_array_region(&data, 0, &mut buf) {
- return throw_and_return(&mut env, &format!("get_float_array_region:
{}", e));
- }
+ let mut buf = vec![0.0f32; len];
+ if let Err(e) = env.get_float_array_region(&data, 0, &mut buf) {
+ return throw_and_return(env, &format!("get_float_array_region:
{}", e));
+ }
- index.train(&buf, n);
+ index.train(&buf, n);
+ })
}
#[no_mangle]
pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFPQNative_addVectors(
- mut env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
ptr: jlong,
ids: JLongArray,
data: JFloatArray,
n: jint,
) {
- let index = match deref_writer(ptr) {
- Some(i) => i,
- None => return throw_and_return(&mut env, "null native pointer (writer
already freed?)"),
- };
+ jni_call_void(env, |env| {
+ let index = match deref_writer(ptr) {
+ Some(i) => i,
+ None => return throw_and_return(env, "null native pointer (writer
already freed?)"),
+ };
- if n <= 0 {
- return throw_and_return(&mut env, &format!("invalid n: {}", n));
- }
- let n = n as usize;
+ if n <= 0 {
+ return throw_and_return(env, &format!("invalid n: {}", n));
+ }
+ let n = n as usize;
- let id_len = match env.get_array_length(&ids) {
- Ok(l) => l as usize,
- Err(e) => return throw_and_return(&mut env,
&format!("get_array_length: {}", e)),
- };
- if id_len < n {
- return throw_and_return(
- &mut env,
- &format!("ids array too short: {} < n={}", id_len, n),
- );
- }
+ let id_len = match env.get_array_length(&ids) {
+ Ok(l) => l as usize,
+ Err(e) => return throw_and_return(env, &format!("get_array_length:
{}", e)),
+ };
+ if id_len < n {
+ return throw_and_return(env, &format!("ids array too short: {} <
n={}", id_len, n));
+ }
- let mut id_buf = vec![0i64; n];
- if let Err(e) = env.get_long_array_region(&ids, 0, &mut id_buf) {
- return throw_and_return(&mut env, &format!("get_long_array_region:
{}", e));
- }
+ let mut id_buf = vec![0i64; n];
+ if let Err(e) = env.get_long_array_region(&ids, 0, &mut id_buf) {
+ return throw_and_return(env, &format!("get_long_array_region: {}",
e));
+ }
- let data_len = match env.get_array_length(&data) {
- Ok(l) => l as usize,
- Err(e) => return throw_and_return(&mut env,
&format!("get_array_length: {}", e)),
- };
- if data_len < n * index.d {
- return throw_and_return(
- &mut env,
- &format!("data array too short: {} < n*d={}", data_len, n *
index.d),
- );
- }
+ let data_len = match env.get_array_length(&data) {
+ Ok(l) => l as usize,
+ Err(e) => return throw_and_return(env, &format!("get_array_length:
{}", e)),
+ };
+ if data_len < n * index.d {
+ return throw_and_return(
+ env,
+ &format!("data array too short: {} < n*d={}", data_len, n *
index.d),
+ );
+ }
- let mut data_buf = vec![0.0f32; data_len];
- if let Err(e) = env.get_float_array_region(&data, 0, &mut data_buf) {
- return throw_and_return(&mut env, &format!("get_float_array_region:
{}", e));
- }
+ let mut data_buf = vec![0.0f32; data_len];
+ if let Err(e) = env.get_float_array_region(&data, 0, &mut data_buf) {
+ return throw_and_return(env, &format!("get_float_array_region:
{}", e));
+ }
- index.add(&data_buf, &id_buf, n);
+ index.add(&data_buf, &id_buf, n);
+ })
}
#[no_mangle]
pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFPQNative_writeIndex(
- mut env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
ptr: jlong,
stream_output: JObject,
) {
- let index = match deref_writer(ptr) {
- Some(i) => i,
- None => return throw_and_return(&mut env, "null native pointer (writer
already freed?)"),
- };
+ jni_call_void(env, |env| {
+ let index = match deref_writer(ptr) {
+ Some(i) => i,
+ None => return throw_and_return(env, "null native pointer (writer
already freed?)"),
+ };
- let jvm = match env.get_java_vm() {
- Ok(vm) => vm,
- Err(e) => return throw_and_return(&mut env, &format!("get_java_vm:
{}", e)),
- };
+ let jvm = match env.get_java_vm() {
+ Ok(vm) => vm,
+ Err(e) => return throw_and_return(env, &format!("get_java_vm: {}",
e)),
+ };
- let global_ref = match env.new_global_ref(stream_output) {
- Ok(r) => r,
- Err(e) => return throw_and_return(&mut env, &format!("new_global_ref:
{}", e)),
- };
+ let global_ref = match env.new_global_ref(stream_output) {
+ Ok(r) => r,
+ Err(e) => return throw_and_return(env, &format!("new_global_ref:
{}", e)),
+ };
- let mut writer = JniOutputStream::new(jvm, global_ref);
- if let Err(e) = write_index(index, &mut writer) {
- throw_and_return::<()>(&mut env, &format!("write_index: {}", e));
- }
+ let mut writer = JniOutputStream::new(jvm, global_ref);
+ if let Err(e) = write_index(index, &mut writer) {
+ throw_and_return::<()>(env, &format!("write_index: {}", e));
+ }
+ })
}
#[no_mangle]
pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFPQNative_freeWriter(
- _env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
ptr: jlong,
) {
- if ptr != 0 {
- unsafe {
- drop(Box::from_raw(ptr as *mut IVFPQIndex));
+ jni_call_void(env, |_env| {
+ if ptr != 0 {
+ unsafe {
+ drop(Box::from_raw(ptr as *mut IVFPQIndex));
+ }
}
- }
+ })
}
// --- Reader API ---
#[no_mangle]
pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFPQNative_openReader(
- mut env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
stream_input: JObject,
) -> jlong {
- let jvm = match env.get_java_vm() {
- Ok(vm) => vm,
- Err(e) => return throw_and_return(&mut env, &format!("get_java_vm:
{}", e)),
- };
+ jni_call(env, |env| {
+ let jvm = match env.get_java_vm() {
+ Ok(vm) => vm,
+ Err(e) => return throw_and_return(env, &format!("get_java_vm: {}",
e)),
+ };
- let global_ref = match env.new_global_ref(stream_input) {
- Ok(r) => r,
- Err(e) => return throw_and_return(&mut env, &format!("new_global_ref:
{}", e)),
- };
+ let global_ref = match env.new_global_ref(stream_input) {
+ Ok(r) => r,
+ Err(e) => return throw_and_return(env, &format!("new_global_ref:
{}", e)),
+ };
- let stream = JniSeekableStream::new(jvm, global_ref);
- let reader = match IVFPQIndexReader::open(stream) {
- Ok(r) => r,
- Err(e) => return throw_and_return(&mut env, &format!("open: {}", e)),
- };
+ let stream = JniSeekableStream::new(jvm, global_ref);
+ let reader = match IVFPQIndexReader::open(stream) {
+ Ok(r) => r,
+ Err(e) => return throw_and_return(env, &format!("open: {}", e)),
+ };
- Box::into_raw(Box::new(reader)) as jlong
+ Box::into_raw(Box::new(reader)) as jlong
+ })
}
#[no_mangle]
pub extern "system" fn Java_org_apache_paimon_index_ivfpq_IVFPQNative_search(
- mut env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
ptr: jlong,
query: JFloatArray,
k: jint,
nprobe: jint,
) -> jobject {
- let reader = match deref_reader(ptr) {
- Some(r) => r,
- None => return throw_and_return(&mut env, "null native pointer (reader
already freed?)"),
- };
+ jni_call(env, |env| {
+ let reader = match deref_reader(ptr) {
+ Some(r) => r,
+ None => return throw_and_return(env, "null native pointer (reader
already freed?)"),
+ };
- if k <= 0 || nprobe <= 0 {
- return throw_and_return(
- &mut env,
- &format!("invalid parameters: k={}, nprobe={}", k, nprobe),
- );
- }
+ if k <= 0 || nprobe <= 0 {
+ return throw_and_return(
+ env,
+ &format!("invalid parameters: k={}, nprobe={}", k, nprobe),
+ );
+ }
- let d = reader.d;
- let query_len = match env.get_array_length(&query) {
- Ok(l) => l as usize,
- Err(e) => return throw_and_return(&mut env,
&format!("get_array_length: {}", e)),
- };
- if query_len != d {
- return throw_and_return(
- &mut env,
- &format!("query array length {} != d={}", query_len, d),
- );
- }
+ let d = reader.d;
+ let query_len = match env.get_array_length(&query) {
+ Ok(l) => l as usize,
+ Err(e) => return throw_and_return(env, &format!("get_array_length:
{}", e)),
+ };
+ if query_len != d {
+ return throw_and_return(env, &format!("query array length {} !=
d={}", query_len, d));
+ }
- let mut query_buf = vec![0.0f32; d];
- if let Err(e) = env.get_float_array_region(&query, 0, &mut query_buf) {
- return throw_and_return(&mut env, &format!("get_float_array_region:
{}", e));
- }
+ let mut query_buf = vec![0.0f32; d];
+ if let Err(e) = env.get_float_array_region(&query, 0, &mut query_buf) {
+ return throw_and_return(env, &format!("get_float_array_region:
{}", e));
+ }
- let (ids, dists) = match reader.search(&query_buf, k as usize, nprobe as
usize) {
- Ok(r) => r,
- Err(e) => return throw_and_return(&mut env, &format!("search: {}", e)),
- };
+ let (ids, dists) = match reader.search(&query_buf, k as usize, nprobe
as usize) {
+ Ok(r) => r,
+ Err(e) => return throw_and_return(env, &format!("search: {}", e)),
+ };
- build_result(&mut env, ids, dists)
+ build_result(env, ids, dists)
+ })
}
#[no_mangle]
pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFPQNative_searchWithRoaringFilter(
- mut env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
ptr: jlong,
query: JFloatArray,
@@ -408,86 +447,89 @@ pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFPQNative_searchWith
nprobe: jint,
roaring_filter: JByteArray,
) -> jobject {
- let reader = match deref_reader(ptr) {
- Some(r) => r,
- None => return throw_and_return(&mut env, "null native pointer (reader
already freed?)"),
- };
+ jni_call(env, |env| {
+ let reader = match deref_reader(ptr) {
+ Some(r) => r,
+ None => return throw_and_return(env, "null native pointer (reader
already freed?)"),
+ };
- if k <= 0 || nprobe <= 0 {
- return throw_and_return(
- &mut env,
- &format!("invalid parameters: k={}, nprobe={}", k, nprobe),
- );
- }
+ if k <= 0 || nprobe <= 0 {
+ return throw_and_return(
+ env,
+ &format!("invalid parameters: k={}, nprobe={}", k, nprobe),
+ );
+ }
- let d = reader.d;
- let query_len = match env.get_array_length(&query) {
- Ok(l) => l as usize,
- Err(e) => return throw_and_return(&mut env,
&format!("get_array_length: {}", e)),
- };
- if query_len != d {
- return throw_and_return(
- &mut env,
- &format!("query array length {} != d={}", query_len, d),
- );
- }
+ let d = reader.d;
+ let query_len = match env.get_array_length(&query) {
+ Ok(l) => l as usize,
+ Err(e) => return throw_and_return(env, &format!("get_array_length:
{}", e)),
+ };
+ if query_len != d {
+ return throw_and_return(env, &format!("query array length {} !=
d={}", query_len, d));
+ }
- let mut query_buf = vec![0.0f32; d];
- if let Err(e) = env.get_float_array_region(&query, 0, &mut query_buf) {
- return throw_and_return(&mut env, &format!("get_float_array_region:
{}", e));
- }
+ let mut query_buf = vec![0.0f32; d];
+ if let Err(e) = env.get_float_array_region(&query, 0, &mut query_buf) {
+ return throw_and_return(env, &format!("get_float_array_region:
{}", e));
+ }
- let filter_bytes = match read_byte_array(&mut env, roaring_filter) {
- Ok(bytes) => bytes,
- Err(e) => return throw_and_return(&mut env, &e),
- };
+ let filter_bytes = match read_byte_array(env, roaring_filter) {
+ Ok(bytes) => bytes,
+ Err(e) => return throw_and_return(env, &e),
+ };
- let (ids, dists) = match reader.search_with_roaring_filter(
- &query_buf,
- k as usize,
- nprobe as usize,
- &filter_bytes,
- ) {
- Ok(r) => r,
- Err(e) => return throw_and_return(&mut env,
&format!("search_with_filter: {}", e)),
- };
+ let (ids, dists) = match reader.search_with_roaring_filter(
+ &query_buf,
+ k as usize,
+ nprobe as usize,
+ &filter_bytes,
+ ) {
+ Ok(r) => r,
+ Err(e) => return throw_and_return(env,
&format!("search_with_filter: {}", e)),
+ };
- build_result(&mut env, ids, dists)
+ build_result(env, ids, dists)
+ })
}
// --- Reader metadata ---
#[no_mangle]
pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFPQNative_getDimension(
- mut env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
ptr: jlong,
) -> jint {
- let reader = match deref_reader(ptr) {
- Some(r) => r,
- None => return throw_and_return(&mut env, "null native pointer (reader
already freed?)"),
- };
- reader.d as jint
+ jni_call(env, |env| {
+ let reader = match deref_reader(ptr) {
+ Some(r) => r,
+ None => return throw_and_return(env, "null native pointer (reader
already freed?)"),
+ };
+ reader.d as jint
+ })
}
#[no_mangle]
pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFPQNative_getTotalVectors(
- mut env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
ptr: jlong,
) -> jlong {
- let reader = match deref_reader(ptr) {
- Some(r) => r,
- None => return throw_and_return(&mut env, "null native pointer (reader
already freed?)"),
- };
- reader.total_vectors
+ jni_call(env, |env| {
+ let reader = match deref_reader(ptr) {
+ Some(r) => r,
+ None => return throw_and_return(env, "null native pointer (reader
already freed?)"),
+ };
+ reader.total_vectors
+ })
}
// --- Batch search ---
#[no_mangle]
pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFPQNative_searchBatch(
- mut env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
ptr: jlong,
queries: JFloatArray,
@@ -495,50 +537,52 @@ pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFPQNative_searchBatc
k: jint,
nprobe: jint,
) -> jobject {
- let reader = match deref_reader(ptr) {
- Some(r) => r,
- None => return throw_and_return(&mut env, "null native pointer (reader
already freed?)"),
- };
+ jni_call(env, |env| {
+ let reader = match deref_reader(ptr) {
+ Some(r) => r,
+ None => return throw_and_return(env, "null native pointer (reader
already freed?)"),
+ };
- if nq <= 0 || k <= 0 || nprobe <= 0 {
- return throw_and_return(
- &mut env,
- &format!("invalid parameters: nq={}, k={}, nprobe={}", nq, k,
nprobe),
- );
- }
+ if nq <= 0 || k <= 0 || nprobe <= 0 {
+ return throw_and_return(
+ env,
+ &format!("invalid parameters: nq={}, k={}, nprobe={}", nq, k,
nprobe),
+ );
+ }
- let d = reader.d;
- let nq = nq as usize;
- let k = k as usize;
+ let d = reader.d;
+ let nq = nq as usize;
+ let k = k as usize;
- let query_len = match env.get_array_length(&queries) {
- Ok(l) => l as usize,
- Err(e) => return throw_and_return(&mut env,
&format!("get_array_length: {}", e)),
- };
- if query_len != nq * d {
- return throw_and_return(
- &mut env,
- &format!("queries array length {} != nq*d={}", query_len, nq * d),
- );
- }
+ let query_len = match env.get_array_length(&queries) {
+ Ok(l) => l as usize,
+ Err(e) => return throw_and_return(env, &format!("get_array_length:
{}", e)),
+ };
+ if query_len != nq * d {
+ return throw_and_return(
+ env,
+ &format!("queries array length {} != nq*d={}", query_len, nq *
d),
+ );
+ }
- let mut query_buf = vec![0.0f32; nq * d];
- if let Err(e) = env.get_float_array_region(&queries, 0, &mut query_buf) {
- return throw_and_return(&mut env, &format!("get_float_array_region:
{}", e));
- }
+ let mut query_buf = vec![0.0f32; nq * d];
+ if let Err(e) = env.get_float_array_region(&queries, 0, &mut
query_buf) {
+ return throw_and_return(env, &format!("get_float_array_region:
{}", e));
+ }
- let (all_ids, all_dists) = match search_batch_reader(reader, &query_buf,
nq, k, nprobe as usize)
- {
- Ok(result) => result,
- Err(e) => return throw_and_return(&mut env, &format!("search_batch:
{}", e)),
- };
+ let (all_ids, all_dists) =
+ match search_batch_reader(reader, &query_buf, nq, k, nprobe as
usize) {
+ Ok(result) => result,
+ Err(e) => return throw_and_return(env, &format!("search_batch:
{}", e)),
+ };
- build_batch_result(&mut env, all_ids, all_dists, nq, k)
+ build_batch_result(env, all_ids, all_dists, nq, k)
+ })
}
#[no_mangle]
pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFPQNative_searchBatchWithRoaringFilter(
- mut env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
ptr: jlong,
queries: JFloatArray,
@@ -547,287 +591,299 @@ pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFPQNative_searchBatc
nprobe: jint,
roaring_filter: JByteArray,
) -> jobject {
- let reader = match deref_reader(ptr) {
- Some(r) => r,
- None => return throw_and_return(&mut env, "null native pointer (reader
already freed?)"),
- };
+ jni_call(env, |env| {
+ let reader = match deref_reader(ptr) {
+ Some(r) => r,
+ None => return throw_and_return(env, "null native pointer (reader
already freed?)"),
+ };
- if nq <= 0 || k <= 0 || nprobe <= 0 {
- return throw_and_return(
- &mut env,
- &format!("invalid parameters: nq={}, k={}, nprobe={}", nq, k,
nprobe),
- );
- }
+ if nq <= 0 || k <= 0 || nprobe <= 0 {
+ return throw_and_return(
+ env,
+ &format!("invalid parameters: nq={}, k={}, nprobe={}", nq, k,
nprobe),
+ );
+ }
- let d = reader.d;
- let nq = nq as usize;
- let k = k as usize;
+ let d = reader.d;
+ let nq = nq as usize;
+ let k = k as usize;
- let query_len = match env.get_array_length(&queries) {
- Ok(l) => l as usize,
- Err(e) => return throw_and_return(&mut env,
&format!("get_array_length: {}", e)),
- };
- if query_len != nq * d {
- return throw_and_return(
- &mut env,
- &format!("queries array length {} != nq*d={}", query_len, nq * d),
- );
- }
+ let query_len = match env.get_array_length(&queries) {
+ Ok(l) => l as usize,
+ Err(e) => return throw_and_return(env, &format!("get_array_length:
{}", e)),
+ };
+ if query_len != nq * d {
+ return throw_and_return(
+ env,
+ &format!("queries array length {} != nq*d={}", query_len, nq *
d),
+ );
+ }
- let mut query_buf = vec![0.0f32; nq * d];
- if let Err(e) = env.get_float_array_region(&queries, 0, &mut query_buf) {
- return throw_and_return(&mut env, &format!("get_float_array_region:
{}", e));
- }
+ let mut query_buf = vec![0.0f32; nq * d];
+ if let Err(e) = env.get_float_array_region(&queries, 0, &mut
query_buf) {
+ return throw_and_return(env, &format!("get_float_array_region:
{}", e));
+ }
- let filter_bytes = match read_byte_array(&mut env, roaring_filter) {
- Ok(bytes) => bytes,
- Err(e) => return throw_and_return(&mut env, &e),
- };
+ let filter_bytes = match read_byte_array(env, roaring_filter) {
+ Ok(bytes) => bytes,
+ Err(e) => return throw_and_return(env, &e),
+ };
- let (all_ids, all_dists) = match search_batch_reader_roaring_filter(
- reader,
- &query_buf,
- nq,
- k,
- nprobe as usize,
- &filter_bytes,
- ) {
- Ok(result) => result,
- Err(e) => return throw_and_return(&mut env,
&format!("search_batch_with_filter: {}", e)),
- };
+ let (all_ids, all_dists) = match search_batch_reader_roaring_filter(
+ reader,
+ &query_buf,
+ nq,
+ k,
+ nprobe as usize,
+ &filter_bytes,
+ ) {
+ Ok(result) => result,
+ Err(e) => return throw_and_return(env,
&format!("search_batch_with_filter: {}", e)),
+ };
- build_batch_result(&mut env, all_ids, all_dists, nq, k)
+ build_batch_result(env, all_ids, all_dists, nq, k)
+ })
}
#[no_mangle]
pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFPQNative_freeReader(
- _env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
ptr: jlong,
) {
- if ptr != 0 {
- unsafe {
- drop(Box::from_raw(
- ptr as *mut IVFPQIndexReader<JniSeekableStream>,
- ));
+ jni_call_void(env, |_env| {
+ if ptr != 0 {
+ unsafe {
+ drop(Box::from_raw(
+ ptr as *mut IVFPQIndexReader<JniSeekableStream>,
+ ));
+ }
}
- }
+ })
}
// --- IVF-FLAT Writer API ---
#[no_mangle]
pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFFlatNative_createWriter(
- mut env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
d: jint,
nlist: jint,
metric: jint,
) -> jlong {
- if d <= 0 || nlist <= 0 {
- return throw_and_return(
- &mut env,
- &format!("invalid parameters: d={}, nlist={}", d, nlist),
- );
- }
+ jni_call(env, |env| {
+ if d <= 0 || nlist <= 0 {
+ return throw_and_return(
+ env,
+ &format!("invalid parameters: d={}, nlist={}", d, nlist),
+ );
+ }
- let metric_type = match MetricType::from_code(metric as u32) {
- Some(m) => m,
- None => return throw_and_return(&mut env, &format!("Unknown metric:
{}", metric)),
- };
+ let metric_type = match MetricType::from_code(metric as u32) {
+ Some(m) => m,
+ None => return throw_and_return(env, &format!("Unknown metric:
{}", metric)),
+ };
- let index = Box::new(IVFFlatIndex::new(d as usize, nlist as usize,
metric_type));
- Box::into_raw(index) as jlong
+ let index = Box::new(IVFFlatIndex::new(d as usize, nlist as usize,
metric_type));
+ Box::into_raw(index) as jlong
+ })
}
#[no_mangle]
pub extern "system" fn Java_org_apache_paimon_index_ivfpq_IVFFlatNative_train(
- mut env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
ptr: jlong,
data: JFloatArray,
n: jint,
) {
- let index = match deref_flat_writer(ptr) {
- Some(i) => i,
- None => return throw_and_return(&mut env, "null native pointer (writer
already freed?)"),
- };
- if n <= 0 {
- return throw_and_return(&mut env, &format!("invalid n: {}", n));
- }
- let n = n as usize;
- let len = match env.get_array_length(&data) {
- Ok(l) => l as usize,
- Err(e) => return throw_and_return(&mut env,
&format!("get_array_length: {}", e)),
- };
- if len < n * index.d {
- return throw_and_return(
- &mut env,
- &format!("data array too short: {} < n*d={}", len, n * index.d),
- );
- }
- let mut buf = vec![0.0f32; len];
- if let Err(e) = env.get_float_array_region(&data, 0, &mut buf) {
- return throw_and_return(&mut env, &format!("get_float_array_region:
{}", e));
- }
- index.train(&buf, n);
+ jni_call_void(env, |env| {
+ let index = match deref_flat_writer(ptr) {
+ Some(i) => i,
+ None => return throw_and_return(env, "null native pointer (writer
already freed?)"),
+ };
+ if n <= 0 {
+ return throw_and_return(env, &format!("invalid n: {}", n));
+ }
+ let n = n as usize;
+ let len = match env.get_array_length(&data) {
+ Ok(l) => l as usize,
+ Err(e) => return throw_and_return(env, &format!("get_array_length:
{}", e)),
+ };
+ if len < n * index.d {
+ return throw_and_return(
+ env,
+ &format!("data array too short: {} < n*d={}", len, n *
index.d),
+ );
+ }
+ let mut buf = vec![0.0f32; len];
+ if let Err(e) = env.get_float_array_region(&data, 0, &mut buf) {
+ return throw_and_return(env, &format!("get_float_array_region:
{}", e));
+ }
+ index.train(&buf, n);
+ })
}
#[no_mangle]
pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFFlatNative_addVectors(
- mut env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
ptr: jlong,
ids: JLongArray,
data: JFloatArray,
n: jint,
) {
- let index = match deref_flat_writer(ptr) {
- Some(i) => i,
- None => return throw_and_return(&mut env, "null native pointer (writer
already freed?)"),
- };
- if n <= 0 {
- return throw_and_return(&mut env, &format!("invalid n: {}", n));
- }
- let n = n as usize;
- let id_len = match env.get_array_length(&ids) {
- Ok(l) => l as usize,
- Err(e) => return throw_and_return(&mut env,
&format!("get_array_length: {}", e)),
- };
- if id_len < n {
- return throw_and_return(
- &mut env,
- &format!("ids array too short: {} < n={}", id_len, n),
- );
- }
- let mut id_buf = vec![0i64; n];
- if let Err(e) = env.get_long_array_region(&ids, 0, &mut id_buf) {
- return throw_and_return(&mut env, &format!("get_long_array_region:
{}", e));
- }
+ jni_call_void(env, |env| {
+ let index = match deref_flat_writer(ptr) {
+ Some(i) => i,
+ None => return throw_and_return(env, "null native pointer (writer
already freed?)"),
+ };
+ if n <= 0 {
+ return throw_and_return(env, &format!("invalid n: {}", n));
+ }
+ let n = n as usize;
+ let id_len = match env.get_array_length(&ids) {
+ Ok(l) => l as usize,
+ Err(e) => return throw_and_return(env, &format!("get_array_length:
{}", e)),
+ };
+ if id_len < n {
+ return throw_and_return(env, &format!("ids array too short: {} <
n={}", id_len, n));
+ }
+ let mut id_buf = vec![0i64; n];
+ if let Err(e) = env.get_long_array_region(&ids, 0, &mut id_buf) {
+ return throw_and_return(env, &format!("get_long_array_region: {}",
e));
+ }
- let data_len = match env.get_array_length(&data) {
- Ok(l) => l as usize,
- Err(e) => return throw_and_return(&mut env,
&format!("get_array_length: {}", e)),
- };
- if data_len < n * index.d {
- return throw_and_return(
- &mut env,
- &format!("data array too short: {} < n*d={}", data_len, n *
index.d),
- );
- }
- let mut data_buf = vec![0.0f32; data_len];
- if let Err(e) = env.get_float_array_region(&data, 0, &mut data_buf) {
- return throw_and_return(&mut env, &format!("get_float_array_region:
{}", e));
- }
- index.add(&data_buf, &id_buf, n);
+ let data_len = match env.get_array_length(&data) {
+ Ok(l) => l as usize,
+ Err(e) => return throw_and_return(env, &format!("get_array_length:
{}", e)),
+ };
+ if data_len < n * index.d {
+ return throw_and_return(
+ env,
+ &format!("data array too short: {} < n*d={}", data_len, n *
index.d),
+ );
+ }
+ let mut data_buf = vec![0.0f32; data_len];
+ if let Err(e) = env.get_float_array_region(&data, 0, &mut data_buf) {
+ return throw_and_return(env, &format!("get_float_array_region:
{}", e));
+ }
+ index.add(&data_buf, &id_buf, n);
+ })
}
#[no_mangle]
pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFFlatNative_writeIndex(
- mut env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
ptr: jlong,
stream_output: JObject,
) {
- let index = match deref_flat_writer(ptr) {
- Some(i) => i,
- None => return throw_and_return(&mut env, "null native pointer (writer
already freed?)"),
- };
- let jvm = match env.get_java_vm() {
- Ok(vm) => vm,
- Err(e) => return throw_and_return(&mut env, &format!("get_java_vm:
{}", e)),
- };
- let global_ref = match env.new_global_ref(stream_output) {
- Ok(r) => r,
- Err(e) => return throw_and_return(&mut env, &format!("new_global_ref:
{}", e)),
- };
- let mut writer = JniOutputStream::new(jvm, global_ref);
- if let Err(e) = write_ivfflat_index(index, &mut writer) {
- throw_and_return::<()>(&mut env, &format!("write_ivfflat_index: {}",
e));
- }
+ jni_call_void(env, |env| {
+ let index = match deref_flat_writer(ptr) {
+ Some(i) => i,
+ None => return throw_and_return(env, "null native pointer (writer
already freed?)"),
+ };
+ let jvm = match env.get_java_vm() {
+ Ok(vm) => vm,
+ Err(e) => return throw_and_return(env, &format!("get_java_vm: {}",
e)),
+ };
+ let global_ref = match env.new_global_ref(stream_output) {
+ Ok(r) => r,
+ Err(e) => return throw_and_return(env, &format!("new_global_ref:
{}", e)),
+ };
+ let mut writer = JniOutputStream::new(jvm, global_ref);
+ if let Err(e) = write_ivfflat_index(index, &mut writer) {
+ throw_and_return::<()>(env, &format!("write_ivfflat_index: {}",
e));
+ }
+ })
}
#[no_mangle]
pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFFlatNative_freeWriter(
- _env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
ptr: jlong,
) {
- if ptr != 0 {
- unsafe {
- drop(Box::from_raw(ptr as *mut IVFFlatIndex));
+ jni_call_void(env, |_env| {
+ if ptr != 0 {
+ unsafe {
+ drop(Box::from_raw(ptr as *mut IVFFlatIndex));
+ }
}
- }
+ })
}
// --- IVF-FLAT Reader API ---
#[no_mangle]
pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFFlatNative_openReader(
- mut env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
stream_input: JObject,
) -> jlong {
- let jvm = match env.get_java_vm() {
- Ok(vm) => vm,
- Err(e) => return throw_and_return(&mut env, &format!("get_java_vm:
{}", e)),
- };
- let global_ref = match env.new_global_ref(stream_input) {
- Ok(r) => r,
- Err(e) => return throw_and_return(&mut env, &format!("new_global_ref:
{}", e)),
- };
- let stream = JniSeekableStream::new(jvm, global_ref);
- let reader = match IVFFlatIndexReader::open(stream) {
- Ok(r) => r,
- Err(e) => return throw_and_return(&mut env, &format!("open: {}", e)),
- };
- Box::into_raw(Box::new(reader)) as jlong
+ jni_call(env, |env| {
+ let jvm = match env.get_java_vm() {
+ Ok(vm) => vm,
+ Err(e) => return throw_and_return(env, &format!("get_java_vm: {}",
e)),
+ };
+ let global_ref = match env.new_global_ref(stream_input) {
+ Ok(r) => r,
+ Err(e) => return throw_and_return(env, &format!("new_global_ref:
{}", e)),
+ };
+ let stream = JniSeekableStream::new(jvm, global_ref);
+ let reader = match IVFFlatIndexReader::open(stream) {
+ Ok(r) => r,
+ Err(e) => return throw_and_return(env, &format!("open: {}", e)),
+ };
+ Box::into_raw(Box::new(reader)) as jlong
+ })
}
#[no_mangle]
pub extern "system" fn Java_org_apache_paimon_index_ivfpq_IVFFlatNative_search(
- mut env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
ptr: jlong,
query: JFloatArray,
k: jint,
nprobe: jint,
) -> jobject {
- let reader = match deref_flat_reader(ptr) {
- Some(r) => r,
- None => return throw_and_return(&mut env, "null native pointer (reader
already freed?)"),
- };
- if k <= 0 || nprobe <= 0 {
- return throw_and_return(
- &mut env,
- &format!("invalid parameters: k={}, nprobe={}", k, nprobe),
- );
- }
- let d = reader.d;
- let query_len = match env.get_array_length(&query) {
- Ok(l) => l as usize,
- Err(e) => return throw_and_return(&mut env,
&format!("get_array_length: {}", e)),
- };
- if query_len != d {
- return throw_and_return(
- &mut env,
- &format!("query array length {} != d={}", query_len, d),
- );
- }
- let mut query_buf = vec![0.0f32; d];
- if let Err(e) = env.get_float_array_region(&query, 0, &mut query_buf) {
- return throw_and_return(&mut env, &format!("get_float_array_region:
{}", e));
- }
- let (ids, dists) = match reader.search(&query_buf, k as usize, nprobe as
usize) {
- Ok(r) => r,
- Err(e) => return throw_and_return(&mut env, &format!("search: {}", e)),
- };
- build_result(&mut env, ids, dists)
+ jni_call(env, |env| {
+ let reader = match deref_flat_reader(ptr) {
+ Some(r) => r,
+ None => return throw_and_return(env, "null native pointer (reader
already freed?)"),
+ };
+ if k <= 0 || nprobe <= 0 {
+ return throw_and_return(
+ env,
+ &format!("invalid parameters: k={}, nprobe={}", k, nprobe),
+ );
+ }
+ let d = reader.d;
+ let query_len = match env.get_array_length(&query) {
+ Ok(l) => l as usize,
+ Err(e) => return throw_and_return(env, &format!("get_array_length:
{}", e)),
+ };
+ if query_len != d {
+ return throw_and_return(env, &format!("query array length {} !=
d={}", query_len, d));
+ }
+ let mut query_buf = vec![0.0f32; d];
+ if let Err(e) = env.get_float_array_region(&query, 0, &mut query_buf) {
+ return throw_and_return(env, &format!("get_float_array_region:
{}", e));
+ }
+ let (ids, dists) = match reader.search(&query_buf, k as usize, nprobe
as usize) {
+ Ok(r) => r,
+ Err(e) => return throw_and_return(env, &format!("search: {}", e)),
+ };
+ build_result(env, ids, dists)
+ })
}
#[no_mangle]
pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFFlatNative_searchWithRoaringFilter(
- mut env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
ptr: jlong,
query: JFloatArray,
@@ -835,50 +891,49 @@ pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFFlatNative_searchWi
nprobe: jint,
roaring_filter: JByteArray,
) -> jobject {
- let reader = match deref_flat_reader(ptr) {
- Some(r) => r,
- None => return throw_and_return(&mut env, "null native pointer (reader
already freed?)"),
- };
- if k <= 0 || nprobe <= 0 {
- return throw_and_return(
- &mut env,
- &format!("invalid parameters: k={}, nprobe={}", k, nprobe),
- );
- }
- let d = reader.d;
- let query_len = match env.get_array_length(&query) {
- Ok(l) => l as usize,
- Err(e) => return throw_and_return(&mut env,
&format!("get_array_length: {}", e)),
- };
- if query_len != d {
- return throw_and_return(
- &mut env,
- &format!("query array length {} != d={}", query_len, d),
- );
- }
- let mut query_buf = vec![0.0f32; d];
- if let Err(e) = env.get_float_array_region(&query, 0, &mut query_buf) {
- return throw_and_return(&mut env, &format!("get_float_array_region:
{}", e));
- }
- let filter_bytes = match read_byte_array(&mut env, roaring_filter) {
- Ok(bytes) => bytes,
- Err(e) => return throw_and_return(&mut env, &e),
- };
- let (ids, dists) = match reader.search_with_roaring_filter(
- &query_buf,
- k as usize,
- nprobe as usize,
- &filter_bytes,
- ) {
- Ok(r) => r,
- Err(e) => return throw_and_return(&mut env,
&format!("search_with_filter: {}", e)),
- };
- build_result(&mut env, ids, dists)
+ jni_call(env, |env| {
+ let reader = match deref_flat_reader(ptr) {
+ Some(r) => r,
+ None => return throw_and_return(env, "null native pointer (reader
already freed?)"),
+ };
+ if k <= 0 || nprobe <= 0 {
+ return throw_and_return(
+ env,
+ &format!("invalid parameters: k={}, nprobe={}", k, nprobe),
+ );
+ }
+ let d = reader.d;
+ let query_len = match env.get_array_length(&query) {
+ Ok(l) => l as usize,
+ Err(e) => return throw_and_return(env, &format!("get_array_length:
{}", e)),
+ };
+ if query_len != d {
+ return throw_and_return(env, &format!("query array length {} !=
d={}", query_len, d));
+ }
+ let mut query_buf = vec![0.0f32; d];
+ if let Err(e) = env.get_float_array_region(&query, 0, &mut query_buf) {
+ return throw_and_return(env, &format!("get_float_array_region:
{}", e));
+ }
+ let filter_bytes = match read_byte_array(env, roaring_filter) {
+ Ok(bytes) => bytes,
+ Err(e) => return throw_and_return(env, &e),
+ };
+ let (ids, dists) = match reader.search_with_roaring_filter(
+ &query_buf,
+ k as usize,
+ nprobe as usize,
+ &filter_bytes,
+ ) {
+ Ok(r) => r,
+ Err(e) => return throw_and_return(env,
&format!("search_with_filter: {}", e)),
+ };
+ build_result(env, ids, dists)
+ })
}
#[no_mangle]
pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFFlatNative_searchBatch(
- mut env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
ptr: jlong,
queries: JFloatArray,
@@ -886,48 +941,50 @@ pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFFlatNative_searchBa
k: jint,
nprobe: jint,
) -> jobject {
- let reader = match deref_flat_reader(ptr) {
- Some(r) => r,
- None => return throw_and_return(&mut env, "null native pointer (reader
already freed?)"),
- };
- if nq <= 0 || k <= 0 || nprobe <= 0 {
- return throw_and_return(
- &mut env,
- &format!("invalid parameters: nq={}, k={}, nprobe={}", nq, k,
nprobe),
- );
- }
+ jni_call(env, |env| {
+ let reader = match deref_flat_reader(ptr) {
+ Some(r) => r,
+ None => return throw_and_return(env, "null native pointer (reader
already freed?)"),
+ };
+ if nq <= 0 || k <= 0 || nprobe <= 0 {
+ return throw_and_return(
+ env,
+ &format!("invalid parameters: nq={}, k={}, nprobe={}", nq, k,
nprobe),
+ );
+ }
- let d = reader.d;
- let nq = nq as usize;
- let k = k as usize;
- let query_len = match env.get_array_length(&queries) {
- Ok(l) => l as usize,
- Err(e) => return throw_and_return(&mut env,
&format!("get_array_length: {}", e)),
- };
- if query_len != nq * d {
- return throw_and_return(
- &mut env,
- &format!("queries array length {} != nq*d={}", query_len, nq * d),
- );
- }
+ let d = reader.d;
+ let nq = nq as usize;
+ let k = k as usize;
+ let query_len = match env.get_array_length(&queries) {
+ Ok(l) => l as usize,
+ Err(e) => return throw_and_return(env, &format!("get_array_length:
{}", e)),
+ };
+ if query_len != nq * d {
+ return throw_and_return(
+ env,
+ &format!("queries array length {} != nq*d={}", query_len, nq *
d),
+ );
+ }
- let mut query_buf = vec![0.0f32; nq * d];
- if let Err(e) = env.get_float_array_region(&queries, 0, &mut query_buf) {
- return throw_and_return(&mut env, &format!("get_float_array_region:
{}", e));
- }
+ let mut query_buf = vec![0.0f32; nq * d];
+ if let Err(e) = env.get_float_array_region(&queries, 0, &mut
query_buf) {
+ return throw_and_return(env, &format!("get_float_array_region:
{}", e));
+ }
- let (all_ids, all_dists) =
- match search_batch_ivfflat_reader(reader, &query_buf, nq, k, nprobe as
usize) {
- Ok(result) => result,
- Err(e) => return throw_and_return(&mut env,
&format!("search_batch: {}", e)),
- };
+ let (all_ids, all_dists) =
+ match search_batch_ivfflat_reader(reader, &query_buf, nq, k,
nprobe as usize) {
+ Ok(result) => result,
+ Err(e) => return throw_and_return(env, &format!("search_batch:
{}", e)),
+ };
- build_batch_result(&mut env, all_ids, all_dists, nq, k)
+ build_batch_result(env, all_ids, all_dists, nq, k)
+ })
}
#[no_mangle]
pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFFlatNative_searchBatchWithRoaringFilter(
- mut env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
ptr: jlong,
queries: JFloatArray,
@@ -936,92 +993,100 @@ pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFFlatNative_searchBa
nprobe: jint,
roaring_filter: JByteArray,
) -> jobject {
- let reader = match deref_flat_reader(ptr) {
- Some(r) => r,
- None => return throw_and_return(&mut env, "null native pointer (reader
already freed?)"),
- };
- if nq <= 0 || k <= 0 || nprobe <= 0 {
- return throw_and_return(
- &mut env,
- &format!("invalid parameters: nq={}, k={}, nprobe={}", nq, k,
nprobe),
- );
- }
+ jni_call(env, |env| {
+ let reader = match deref_flat_reader(ptr) {
+ Some(r) => r,
+ None => return throw_and_return(env, "null native pointer (reader
already freed?)"),
+ };
+ if nq <= 0 || k <= 0 || nprobe <= 0 {
+ return throw_and_return(
+ env,
+ &format!("invalid parameters: nq={}, k={}, nprobe={}", nq, k,
nprobe),
+ );
+ }
- let d = reader.d;
- let nq = nq as usize;
- let k = k as usize;
- let query_len = match env.get_array_length(&queries) {
- Ok(l) => l as usize,
- Err(e) => return throw_and_return(&mut env,
&format!("get_array_length: {}", e)),
- };
- if query_len != nq * d {
- return throw_and_return(
- &mut env,
- &format!("queries array length {} != nq*d={}", query_len, nq * d),
- );
- }
+ let d = reader.d;
+ let nq = nq as usize;
+ let k = k as usize;
+ let query_len = match env.get_array_length(&queries) {
+ Ok(l) => l as usize,
+ Err(e) => return throw_and_return(env, &format!("get_array_length:
{}", e)),
+ };
+ if query_len != nq * d {
+ return throw_and_return(
+ env,
+ &format!("queries array length {} != nq*d={}", query_len, nq *
d),
+ );
+ }
- let mut query_buf = vec![0.0f32; nq * d];
- if let Err(e) = env.get_float_array_region(&queries, 0, &mut query_buf) {
- return throw_and_return(&mut env, &format!("get_float_array_region:
{}", e));
- }
+ let mut query_buf = vec![0.0f32; nq * d];
+ if let Err(e) = env.get_float_array_region(&queries, 0, &mut
query_buf) {
+ return throw_and_return(env, &format!("get_float_array_region:
{}", e));
+ }
- let filter_bytes = match read_byte_array(&mut env, roaring_filter) {
- Ok(bytes) => bytes,
- Err(e) => return throw_and_return(&mut env, &e),
- };
- let (all_ids, all_dists) = match
search_batch_ivfflat_reader_roaring_filter(
- reader,
- &query_buf,
- nq,
- k,
- nprobe as usize,
- &filter_bytes,
- ) {
- Ok(result) => result,
- Err(e) => return throw_and_return(&mut env,
&format!("search_batch_with_filter: {}", e)),
- };
+ let filter_bytes = match read_byte_array(env, roaring_filter) {
+ Ok(bytes) => bytes,
+ Err(e) => return throw_and_return(env, &e),
+ };
+ let (all_ids, all_dists) = match
search_batch_ivfflat_reader_roaring_filter(
+ reader,
+ &query_buf,
+ nq,
+ k,
+ nprobe as usize,
+ &filter_bytes,
+ ) {
+ Ok(result) => result,
+ Err(e) => return throw_and_return(env,
&format!("search_batch_with_filter: {}", e)),
+ };
- build_batch_result(&mut env, all_ids, all_dists, nq, k)
+ build_batch_result(env, all_ids, all_dists, nq, k)
+ })
}
#[no_mangle]
pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFFlatNative_getDimension(
- mut env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
ptr: jlong,
) -> jint {
- let reader = match deref_flat_reader(ptr) {
- Some(r) => r,
- None => return throw_and_return(&mut env, "null native pointer (reader
already freed?)"),
- };
- reader.d as jint
+ jni_call(env, |env| {
+ let reader = match deref_flat_reader(ptr) {
+ Some(r) => r,
+ None => return throw_and_return(env, "null native pointer (reader
already freed?)"),
+ };
+ reader.d as jint
+ })
}
#[no_mangle]
pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFFlatNative_getTotalVectors(
- mut env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
ptr: jlong,
) -> jlong {
- let reader = match deref_flat_reader(ptr) {
- Some(r) => r,
- None => return throw_and_return(&mut env, "null native pointer (reader
already freed?)"),
- };
- reader.total_vectors
+ jni_call(env, |env| {
+ let reader = match deref_flat_reader(ptr) {
+ Some(r) => r,
+ None => return throw_and_return(env, "null native pointer (reader
already freed?)"),
+ };
+ reader.total_vectors
+ })
}
#[no_mangle]
pub extern "system" fn
Java_org_apache_paimon_index_ivfpq_IVFFlatNative_freeReader(
- _env: JNIEnv,
+ env: JNIEnv,
_class: JClass,
ptr: jlong,
) {
- if ptr != 0 {
- unsafe {
- drop(Box::from_raw(
- ptr as *mut IVFFlatIndexReader<JniSeekableStream>,
- ));
+ jni_call_void(env, |_env| {
+ if ptr != 0 {
+ unsafe {
+ drop(Box::from_raw(
+ ptr as *mut IVFFlatIndexReader<JniSeekableStream>,
+ ));
+ }
}
- }
+ })
}