This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new d49118493 chore: Remove needless from_raw calls (#2638)
d49118493 is described below
commit d49118493c6fbc2a3d8175339b053ac91c58d4cd
Author: Emily Matheys <[email protected]>
AuthorDate: Thu Oct 23 21:43:32 2025 +0300
chore: Remove needless from_raw calls (#2638)
---
native/core/src/execution/jni_api.rs | 135 ++++++++++++++-------------------
native/core/src/parquet/mod.rs | 141 ++++++++++++++---------------------
native/core/src/parquet/util/jni.rs | 7 +-
3 files changed, 114 insertions(+), 169 deletions(-)
diff --git a/native/core/src/execution/jni_api.rs
b/native/core/src/execution/jni_api.rs
index d35347196..716d20e98 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -50,20 +50,16 @@ use datafusion_spark::function::string::char::CharFunc;
use futures::poll;
use futures::stream::StreamExt;
use jni::objects::JByteBuffer;
-use jni::sys::JNI_FALSE;
+use jni::sys::{jlongArray, JNI_FALSE};
use jni::{
errors::Result as JNIResult,
objects::{
- JByteArray, JClass, JIntArray, JLongArray, JObject, JObjectArray,
JPrimitiveArray, JString,
+ GlobalRef, JByteArray, JClass, JIntArray, JLongArray, JObject,
JObjectArray, JString,
ReleaseMode,
},
- sys::{jbyteArray, jint, jlong, jlongArray},
+ sys::{jboolean, jdouble, jint, jlong},
JNIEnv,
};
-use jni::{
- objects::GlobalRef,
- sys::{jboolean, jdouble, jintArray, jobjectArray, jstring},
-};
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::{Duration, Instant};
@@ -159,17 +155,17 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
e: JNIEnv,
_class: JClass,
id: jlong,
- iterators: jobjectArray,
- serialized_query: jbyteArray,
- serialized_spark_configs: jbyteArray,
+ iterators: JObjectArray,
+ serialized_query: JByteArray,
+ serialized_spark_configs: JByteArray,
partition_count: jint,
metrics_node: JObject,
metrics_update_interval: jlong,
comet_task_memory_manager_obj: JObject,
- local_dirs: jobjectArray,
+ local_dirs: JObjectArray,
batch_size: jint,
off_heap_mode: jboolean,
- memory_pool_type: jstring,
+ memory_pool_type: JString,
memory_limit: jlong,
memory_limit_per_task: jlong,
task_attempt_id: jlong,
@@ -177,8 +173,7 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
) -> jlong {
try_unwrap_or_throw(&e, |mut env| {
// Deserialize Spark configs
- let array = unsafe {
JPrimitiveArray::from_raw(serialized_spark_configs) };
- let bytes = env.convert_byte_array(array)?;
+ let bytes = env.convert_byte_array(serialized_spark_configs)?;
let spark_configs = serde::deserialize_config(bytes.as_slice())?;
let spark_config: HashMap<String, String> =
spark_configs.entries.into_iter().collect();
@@ -196,18 +191,16 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
let start = Instant::now();
// Deserialize query plan
- let array = unsafe { JPrimitiveArray::from_raw(serialized_query) };
- let bytes = env.convert_byte_array(array)?;
+ let bytes = env.convert_byte_array(serialized_query)?;
let spark_plan = serde::deserialize_op(bytes.as_slice())?;
let metrics = Arc::new(jni_new_global_ref!(env, metrics_node)?);
// Get the global references of input sources
let mut input_sources = vec![];
- let iter_array = JObjectArray::from_raw(iterators);
- let num_inputs = env.get_array_length(&iter_array)?;
+ let num_inputs = env.get_array_length(&iterators)?;
for i in 0..num_inputs {
- let input_source = env.get_object_array_element(&iter_array,
i)?;
+ let input_source = env.get_object_array_element(&iterators,
i)?;
let input_source = Arc::new(jni_new_global_ref!(env,
input_source)?);
input_sources.push(input_source);
}
@@ -216,7 +209,7 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
let task_memory_manager =
Arc::new(jni_new_global_ref!(env,
comet_task_memory_manager_obj)?);
- let memory_pool_type =
env.get_string(&JString::from_raw(memory_pool_type))?.into();
+ let memory_pool_type = env.get_string(&memory_pool_type)?.into();
let memory_pool_config = parse_memory_pool_config(
off_heap_mode != JNI_FALSE,
memory_pool_type,
@@ -227,13 +220,12 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
create_memory_pool(&memory_pool_config, task_memory_manager,
task_attempt_id);
// Get local directories for storing spill files
- let local_dirs_array = JObjectArray::from_raw(local_dirs);
- let num_local_dirs = env.get_array_length(&local_dirs_array)?;
- let mut local_dirs = vec![];
+ let num_local_dirs = env.get_array_length(&local_dirs)?;
+ let mut local_dirs_vec = vec![];
for i in 0..num_local_dirs {
- let local_dir: JString =
env.get_object_array_element(&local_dirs_array, i)?.into();
+ let local_dir: JString =
env.get_object_array_element(&local_dirs, i)?.into();
let local_dir = env.get_string(&local_dir)?;
- local_dirs.push(local_dir.into());
+ local_dirs_vec.push(local_dir.into());
}
// We need to keep the session context alive. Some session state
like temporary
@@ -242,7 +234,7 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
let session = prepare_datafusion_session_context(
batch_size as usize,
memory_pool,
- local_dirs,
+ local_dirs_vec,
max_temp_directory_size,
)?;
@@ -344,21 +336,17 @@ fn prepare_datafusion_session_context(
/// Prepares arrow arrays for output.
fn prepare_output(
env: &mut JNIEnv,
- array_addrs: jlongArray,
- schema_addrs: jlongArray,
+ array_addrs: JLongArray,
+ schema_addrs: JLongArray,
output_batch: RecordBatch,
validate: bool,
) -> CometResult<jlong> {
- let array_address_array = unsafe { JLongArray::from_raw(array_addrs) };
- let num_cols = env.get_array_length(&array_address_array)? as usize;
+ let num_cols = env.get_array_length(&array_addrs)? as usize;
- let array_addrs =
- unsafe { env.get_array_elements(&array_address_array,
ReleaseMode::NoCopyBack)? };
+ let array_addrs = unsafe { env.get_array_elements(&array_addrs,
ReleaseMode::NoCopyBack)? };
let array_addrs = &*array_addrs;
- let schema_address_array = unsafe { JLongArray::from_raw(schema_addrs) };
- let schema_addrs =
- unsafe { env.get_array_elements(&schema_address_array,
ReleaseMode::NoCopyBack)? };
+ let schema_addrs = unsafe { env.get_array_elements(&schema_addrs,
ReleaseMode::NoCopyBack)? };
let schema_addrs = &*schema_addrs;
let results = output_batch.columns();
@@ -441,8 +429,8 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_executePlan(
stage_id: jint,
partition: jint,
exec_context: jlong,
- array_addrs: jlongArray,
- schema_addrs: jlongArray,
+ array_addrs: JLongArray,
+ schema_addrs: JLongArray,
) -> jlong {
try_unwrap_or_throw(&e, |mut env| {
// Retrieve the query
@@ -599,24 +587,21 @@ fn update_metrics(env: &mut JNIEnv, exec_context: &mut
ExecutionContext) -> Come
fn convert_datatype_arrays(
env: &'_ mut JNIEnv<'_>,
- serialized_datatypes: jobjectArray,
+ serialized_datatypes: JObjectArray,
) -> JNIResult<Vec<ArrowDataType>> {
- unsafe {
- let obj_array = JObjectArray::from_raw(serialized_datatypes);
- let array_len = env.get_array_length(&obj_array)?;
- let mut res: Vec<ArrowDataType> = Vec::new();
-
- for i in 0..array_len {
- let inner_array = env.get_object_array_element(&obj_array, i)?;
- let inner_array: JByteArray = inner_array.into();
- let bytes = env.convert_byte_array(inner_array)?;
- let data_type =
serde::deserialize_data_type(bytes.as_slice()).unwrap();
- let arrow_dt = to_arrow_datatype(&data_type);
- res.push(arrow_dt);
- }
-
- Ok(res)
+ let array_len = env.get_array_length(&serialized_datatypes)?;
+ let mut res: Vec<ArrowDataType> = Vec::new();
+
+ for i in 0..array_len {
+ let inner_array = env.get_object_array_element(&serialized_datatypes,
i)?;
+ let inner_array: JByteArray = inner_array.into();
+ let bytes = env.convert_byte_array(inner_array)?;
+ let data_type =
serde::deserialize_data_type(bytes.as_slice()).unwrap();
+ let arrow_dt = to_arrow_datatype(&data_type);
+ res.push(arrow_dt);
}
+
+ Ok(res)
}
fn get_execution_context<'a>(id: i64) -> &'a mut ExecutionContext {
@@ -634,16 +619,16 @@ fn get_execution_context<'a>(id: i64) -> &'a mut
ExecutionContext {
pub unsafe extern "system" fn
Java_org_apache_comet_Native_writeSortedFileNative(
e: JNIEnv,
_class: JClass,
- row_addresses: jlongArray,
- row_sizes: jintArray,
- serialized_datatypes: jobjectArray,
- file_path: jstring,
+ row_addresses: JLongArray,
+ row_sizes: JIntArray,
+ serialized_datatypes: JObjectArray,
+ file_path: JString,
prefer_dictionary_ratio: jdouble,
batch_size: jlong,
checksum_enabled: jboolean,
checksum_algo: jint,
current_checksum: jlong,
- compression_codec: jstring,
+ compression_codec: JString,
compression_level: jint,
tracing_enabled: jboolean,
) -> jlongArray {
@@ -654,21 +639,16 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_writeSortedFileNative
|| {
let data_types = convert_datatype_arrays(&mut env,
serialized_datatypes)?;
- let row_address_array = JLongArray::from_raw(row_addresses);
- let row_num = env.get_array_length(&row_address_array)? as
usize;
+ let row_num = env.get_array_length(&row_addresses)? as usize;
let row_addresses =
- env.get_array_elements(&row_address_array,
ReleaseMode::NoCopyBack)?;
+ env.get_array_elements(&row_addresses,
ReleaseMode::NoCopyBack)?;
- let row_size_array = JIntArray::from_raw(row_sizes);
- let row_sizes = env.get_array_elements(&row_size_array,
ReleaseMode::NoCopyBack)?;
+ let row_sizes = env.get_array_elements(&row_sizes,
ReleaseMode::NoCopyBack)?;
let row_addresses_ptr = row_addresses.as_ptr();
let row_sizes_ptr = row_sizes.as_ptr();
- let output_path: String = env
- .get_string(&JString::from_raw(file_path))
- .unwrap()
- .into();
+ let output_path: String =
env.get_string(&file_path).unwrap().into();
let checksum_enabled = checksum_enabled == 1;
let current_checksum = if current_checksum == i64::MIN {
@@ -678,10 +658,7 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_writeSortedFileNative
Some(current_checksum as u32)
};
- let compression_codec: String = env
- .get_string(&JString::from_raw(compression_codec))
- .unwrap()
- .into();
+ let compression_codec: String =
env.get_string(&compression_codec).unwrap().into();
let compression_codec = match compression_codec.as_str() {
"zstd" => CompressionCodec::Zstd(compression_level),
@@ -754,8 +731,8 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_decodeShuffleBlock(
_class: JClass,
byte_buffer: JByteBuffer,
length: jint,
- array_addrs: jlongArray,
- schema_addrs: jlongArray,
+ array_addrs: JLongArray,
+ schema_addrs: JLongArray,
tracing_enabled: jboolean,
) -> jlong {
try_unwrap_or_throw(&e, |mut env| {
@@ -775,10 +752,10 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_decodeShuffleBlock(
pub unsafe extern "system" fn Java_org_apache_comet_Native_traceBegin(
e: JNIEnv,
_class: JClass,
- event: jstring,
+ event: JString,
) {
try_unwrap_or_throw(&e, |mut env| {
- let name: String =
env.get_string(&JString::from_raw(event)).unwrap().into();
+ let name: String = env.get_string(&event).unwrap().into();
trace_begin(&name);
Ok(())
})
@@ -790,10 +767,10 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_traceBegin(
pub unsafe extern "system" fn Java_org_apache_comet_Native_traceEnd(
e: JNIEnv,
_class: JClass,
- event: jstring,
+ event: JString,
) {
try_unwrap_or_throw(&e, |mut env| {
- let name: String =
env.get_string(&JString::from_raw(event)).unwrap().into();
+ let name: String = env.get_string(&event).unwrap().into();
trace_end(&name);
Ok(())
})
@@ -805,11 +782,11 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_traceEnd(
pub unsafe extern "system" fn Java_org_apache_comet_Native_logMemoryUsage(
e: JNIEnv,
_class: JClass,
- name: jstring,
+ name: JString,
value: jlong,
) {
try_unwrap_or_throw(&e, |mut env| {
- let name: String =
env.get_string(&JString::from_raw(name)).unwrap().into();
+ let name: String = env.get_string(&name).unwrap().into();
log_memory_usage(&name, value as u64);
Ok(())
})
diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs
index ca70c2fc3..57aa89ff1 100644
--- a/native/core/src/parquet/mod.rs
+++ b/native/core/src/parquet/mod.rs
@@ -41,10 +41,7 @@ use arrow::ffi::FFI_ArrowArray;
use jni::JNIEnv;
use jni::{
objects::{GlobalRef, JByteBuffer, JClass},
- sys::{
- jboolean, jbooleanArray, jbyte, jbyteArray, jdouble, jfloat, jint,
jintArray, jlong,
- jlongArray, jobject, jobjectArray, jshort,
- },
+ sys::{jboolean, jbyte, jdouble, jfloat, jint, jlong, jshort},
};
use self::util::jni::TypePromotionInfo;
@@ -66,9 +63,9 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::{SessionConfig, SessionContext};
use futures::{poll, StreamExt};
use jni::objects::{
- JBooleanArray, JByteArray, JLongArray, JMap, JObject, JPrimitiveArray,
JString, ReleaseMode,
+ JBooleanArray, JByteArray, JLongArray, JMap, JObject, JObjectArray,
JString, ReleaseMode,
};
-use jni::sys::{jstring, JNI_FALSE};
+use jni::sys::{jintArray, JNI_FALSE};
use object_store::path::Path;
use read::ColumnReader;
use util::jni::{convert_column_descriptor, convert_encoding,
deserialize_schema};
@@ -86,7 +83,7 @@ pub extern "system" fn
Java_org_apache_comet_parquet_Native_initColumnReader(
primitive_type: jint,
logical_type: jint,
read_primitive_type: jint,
- jni_path: jobjectArray,
+ jni_path: JObjectArray,
max_dl: jint,
max_rl: jint,
bit_width: jint,
@@ -148,7 +145,7 @@ pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_setDictionary
_jclass: JClass,
handle: jlong,
page_value_count: jint,
- page_data: jbyteArray,
+ page_data: JByteArray,
encoding: jint,
) {
try_unwrap_or_throw(&e, |env| {
@@ -158,10 +155,9 @@ pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_setDictionary
let encoding = convert_encoding(encoding);
// copy the input on-heap buffer to native
- let page_data_array = unsafe { JPrimitiveArray::from_raw(page_data) };
- let page_len = env.get_array_length(&page_data_array)?;
+ let page_len = env.get_array_length(&page_data)?;
let mut buffer = MutableBuffer::from_len_zeroed(page_len as usize);
- env.get_byte_array_region(&page_data_array, 0,
from_u8_slice(buffer.as_slice_mut()))?;
+ env.get_byte_array_region(&page_data, 0,
from_u8_slice(buffer.as_slice_mut()))?;
reader.set_dictionary_page(page_value_count as usize, buffer.into(),
encoding);
Ok(())
@@ -176,7 +172,7 @@ pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_setPageV1(
_jclass: JClass,
handle: jlong,
page_value_count: jint,
- page_data: jbyteArray,
+ page_data: JByteArray,
value_encoding: jint,
) {
try_unwrap_or_throw(&e, |env| {
@@ -186,10 +182,9 @@ pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_setPageV1(
let encoding = convert_encoding(value_encoding);
// copy the input on-heap buffer to native
- let page_data_array = unsafe { JPrimitiveArray::from_raw(page_data) };
- let page_len = env.get_array_length(&page_data_array)?;
+ let page_len = env.get_array_length(&page_data)?;
let mut buffer = MutableBuffer::from_len_zeroed(page_len as usize);
- env.get_byte_array_region(&page_data_array, 0,
from_u8_slice(buffer.as_slice_mut()))?;
+ env.get_byte_array_region(&page_data, 0,
from_u8_slice(buffer.as_slice_mut()))?;
reader.set_page_v1(page_value_count as usize, buffer.into(), encoding);
Ok(())
@@ -204,7 +199,7 @@ pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_setPageBuffer
_jclass: JClass,
handle: jlong,
page_value_count: jint,
- buffer: jobject,
+ buffer: JByteBuffer,
value_encoding: jint,
) {
try_unwrap_or_throw(&e, |env| {
@@ -214,15 +209,12 @@ pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_setPageBuffer
// convert value encoding ordinal to the native encoding definition
let encoding = convert_encoding(value_encoding);
- // Get slices from Java DirectByteBuffer
- let jbuffer = unsafe { JByteBuffer::from_raw(buffer) };
-
// Convert the page to global reference so it won't get GC'd by Java.
Also free the last
// page if there is any.
- ctx.last_data_page = Some(env.new_global_ref(&jbuffer)?);
+ ctx.last_data_page = Some(env.new_global_ref(&buffer)?);
- let buf_slice = env.get_direct_buffer_address(&jbuffer)?;
- let buf_capacity = env.get_direct_buffer_capacity(&jbuffer)?;
+ let buf_slice = env.get_direct_buffer_address(&buffer)?;
+ let buf_capacity = env.get_direct_buffer_capacity(&buffer)?;
unsafe {
let page_ptr = NonNull::new_unchecked(buf_slice);
@@ -245,9 +237,9 @@ pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_setPageV2(
_jclass: JClass,
handle: jlong,
page_value_count: jint,
- def_level_data: jbyteArray,
- rep_level_data: jbyteArray,
- value_data: jbyteArray,
+ def_level_data: JByteArray,
+ rep_level_data: JByteArray,
+ value_data: JByteArray,
value_encoding: jint,
) {
try_unwrap_or_throw(&e, |env| {
@@ -257,20 +249,17 @@ pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_setPageV2(
let encoding = convert_encoding(value_encoding);
// copy the input on-heap buffer to native
- let def_level_array = unsafe {
JPrimitiveArray::from_raw(def_level_data) };
- let dl_len = env.get_array_length(&def_level_array)?;
+ let dl_len = env.get_array_length(&def_level_data)?;
let mut dl_buffer = MutableBuffer::from_len_zeroed(dl_len as usize);
- env.get_byte_array_region(&def_level_array, 0,
from_u8_slice(dl_buffer.as_slice_mut()))?;
+ env.get_byte_array_region(&def_level_data, 0,
from_u8_slice(dl_buffer.as_slice_mut()))?;
- let rep_level_array = unsafe {
JPrimitiveArray::from_raw(rep_level_data) };
- let rl_len = env.get_array_length(&rep_level_array)?;
+ let rl_len = env.get_array_length(&rep_level_data)?;
let mut rl_buffer = MutableBuffer::from_len_zeroed(rl_len as usize);
- env.get_byte_array_region(&rep_level_array, 0,
from_u8_slice(rl_buffer.as_slice_mut()))?;
+ env.get_byte_array_region(&rep_level_data, 0,
from_u8_slice(rl_buffer.as_slice_mut()))?;
- let value_array = unsafe { JPrimitiveArray::from_raw(value_data) };
- let v_len = env.get_array_length(&value_array)?;
+ let v_len = env.get_array_length(&value_data)?;
let mut v_buffer = MutableBuffer::from_len_zeroed(v_len as usize);
- env.get_byte_array_region(&value_array, 0,
from_u8_slice(v_buffer.as_slice_mut()))?;
+ env.get_byte_array_region(&value_data, 0,
from_u8_slice(v_buffer.as_slice_mut()))?;
reader.set_page_v2(
page_value_count as usize,
@@ -401,15 +390,14 @@ pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_setBinary(
e: JNIEnv,
_jclass: JClass,
handle: jlong,
- value: jbyteArray,
+ value: JByteArray,
) {
try_unwrap_or_throw(&e, |env| {
let reader = get_reader(handle)?;
- let value_array = unsafe { JPrimitiveArray::from_raw(value) };
- let len = env.get_array_length(&value_array)?;
+ let len = env.get_array_length(&value)?;
let mut buffer = MutableBuffer::from_len_zeroed(len as usize);
- env.get_byte_array_region(&value_array, 0,
from_u8_slice(buffer.as_slice_mut()))?;
+ env.get_byte_array_region(&value, 0,
from_u8_slice(buffer.as_slice_mut()))?;
reader.set_binary(buffer);
Ok(())
})
@@ -422,15 +410,14 @@ pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_setDecimal(
e: JNIEnv,
_jclass: JClass,
handle: jlong,
- value: jbyteArray,
+ value: JByteArray,
) {
try_unwrap_or_throw(&e, |env| {
let reader = get_reader(handle)?;
- let value_array = unsafe { JPrimitiveArray::from_raw(value) };
- let len = env.get_array_length(&value_array)?;
+ let len = env.get_array_length(&value)?;
let mut buffer = MutableBuffer::from_len_zeroed(len as usize);
- env.get_byte_array_region(&value_array, 0,
from_u8_slice(buffer.as_slice_mut()))?;
+ env.get_byte_array_region(&value, 0,
from_u8_slice(buffer.as_slice_mut()))?;
reader.set_decimal_flba(buffer);
Ok(())
})
@@ -460,12 +447,11 @@ pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_setIndices(
handle: jlong,
offset: jlong,
batch_size: jint,
- indices: jlongArray,
+ indices: JLongArray,
) -> jlong {
try_unwrap_or_throw(&e, |mut env| {
let reader = get_reader(handle)?;
- let indice_array = unsafe { JLongArray::from_raw(indices) };
- let indices = unsafe { env.get_array_elements(&indice_array,
ReleaseMode::NoCopyBack)? };
+ let indices = unsafe { env.get_array_elements(&indices,
ReleaseMode::NoCopyBack)? };
let len = indices.len();
// paris alternately contains start index and length of continuous
indices
let pairs = unsafe { core::slice::from_raw_parts_mut(indices.as_ptr(),
len) };
@@ -496,15 +482,14 @@ pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_setIsDeleted(
e: JNIEnv,
_jclass: JClass,
handle: jlong,
- is_deleted: jbooleanArray,
+ is_deleted: JBooleanArray,
) {
try_unwrap_or_throw(&e, |env| {
let reader = get_reader(handle)?;
- let is_deleted_array = unsafe { JBooleanArray::from_raw(is_deleted) };
- let len = env.get_array_length(&is_deleted_array)?;
+ let len = env.get_array_length(&is_deleted)?;
let mut buffer = MutableBuffer::from_len_zeroed(len as usize);
- env.get_boolean_array_region(&is_deleted_array, 0,
buffer.as_slice_mut())?;
+ env.get_boolean_array_region(&is_deleted, 0, buffer.as_slice_mut())?;
reader.set_is_deleted(buffer);
Ok(())
})
@@ -675,20 +660,16 @@ pub fn get_object_store_options(
pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_validateObjectStoreConfig(
e: JNIEnv,
_jclass: JClass,
- file_path: jstring,
- object_store_options: jobject,
+ file_path: JString,
+ object_store_options: JObject,
) {
- try_unwrap_or_throw(&e, |mut env| unsafe {
+ try_unwrap_or_throw(&e, |mut env| {
let session_config = SessionConfig::new();
let planner =
PhysicalPlanner::new(Arc::new(SessionContext::new_with_config(session_config)),
0);
let session_ctx = planner.session_ctx();
- let path: String = env
- .get_string(&JString::from_raw(file_path))
- .unwrap()
- .into();
- let object_store_config =
- get_object_store_options(&mut env,
JObject::from_raw(object_store_options))?;
+ let path: String = env.get_string(&file_path).unwrap().into();
+ let object_store_config = get_object_store_options(&mut env,
object_store_options)?;
let (_, _) = prepare_object_store_with_configs(
session_ctx.runtime_env(),
path.clone(),
@@ -704,17 +685,17 @@ pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_validateObjec
pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_initRecordBatchReader(
e: JNIEnv,
_jclass: JClass,
- file_path: jstring,
+ file_path: JString,
file_size: jlong,
- starts: jlongArray,
- lengths: jlongArray,
- filter: jbyteArray,
- required_schema: jbyteArray,
- data_schema: jbyteArray,
- session_timezone: jstring,
+ starts: JLongArray,
+ lengths: JLongArray,
+ filter: JByteArray,
+ required_schema: JByteArray,
+ data_schema: JByteArray,
+ session_timezone: JString,
batch_size: jint,
case_sensitive: jboolean,
- object_store_options: jobject,
+ object_store_options: JObject,
key_unwrapper_obj: JObject,
) -> jlong {
try_unwrap_or_throw(&e, |mut env| unsafe {
@@ -724,30 +705,23 @@ pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_initRecordBat
PhysicalPlanner::new(Arc::new(SessionContext::new_with_config(session_config)),
0);
let session_ctx = planner.session_ctx();
- let path: String = env
- .get_string(&JString::from_raw(file_path))
- .unwrap()
- .into();
+ let path: String = env.get_string(&file_path).unwrap().into();
- let object_store_config =
- get_object_store_options(&mut env,
JObject::from_raw(object_store_options))?;
+ let object_store_config = get_object_store_options(&mut env,
object_store_options)?;
let (object_store_url, object_store_path) =
prepare_object_store_with_configs(
session_ctx.runtime_env(),
path.clone(),
&object_store_config,
)?;
- let required_schema_array = JByteArray::from_raw(required_schema);
- let required_schema_buffer =
env.convert_byte_array(&required_schema_array)?;
+ let required_schema_buffer = env.convert_byte_array(&required_schema)?;
let required_schema =
Arc::new(deserialize_schema(required_schema_buffer.as_bytes())?);
- let data_schema_array = JByteArray::from_raw(data_schema);
- let data_schema_buffer = env.convert_byte_array(&data_schema_array)?;
+ let data_schema_buffer = env.convert_byte_array(&data_schema)?;
let data_schema =
Arc::new(deserialize_schema(data_schema_buffer.as_bytes())?);
let data_filters = if !filter.is_null() {
- let filter_array = JByteArray::from_raw(filter);
- let filter_buffer = env.convert_byte_array(&filter_array)?;
+ let filter_buffer = env.convert_byte_array(&filter)?;
let filter_expr =
serde::deserialize_expr(filter_buffer.as_slice())?;
Some(vec![
planner.create_expr(&filter_expr, Arc::clone(&data_schema))?
@@ -755,21 +729,16 @@ pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_initRecordBat
} else {
None
};
- let starts_array = JLongArray::from_raw(starts);
- let starts = env.get_array_elements(&starts_array,
ReleaseMode::NoCopyBack)?;
+ let starts = env.get_array_elements(&starts, ReleaseMode::NoCopyBack)?;
let starts = core::slice::from_raw_parts_mut(starts.as_ptr(),
starts.len());
- let lengths_array = JLongArray::from_raw(lengths);
- let lengths = env.get_array_elements(&lengths_array,
ReleaseMode::NoCopyBack)?;
+ let lengths = env.get_array_elements(&lengths,
ReleaseMode::NoCopyBack)?;
let lengths = core::slice::from_raw_parts_mut(lengths.as_ptr(),
lengths.len());
let file_groups =
get_file_groups_single_file(&object_store_path, file_size as u64,
starts, lengths);
- let session_timezone: String = env
- .get_string(&JString::from_raw(session_timezone))
- .unwrap()
- .into();
+ let session_timezone: String =
env.get_string(&session_timezone).unwrap().into();
// Handle key unwrapper for encrypted files
let encryption_enabled = if !key_unwrapper_obj.is_null() {
diff --git a/native/core/src/parquet/util/jni.rs
b/native/core/src/parquet/util/jni.rs
index d7d86a5e2..966351e71 100644
--- a/native/core/src/parquet/util/jni.rs
+++ b/native/core/src/parquet/util/jni.rs
@@ -20,7 +20,7 @@ use std::sync::Arc;
use jni::{
errors::Result as JNIResult,
objects::{JObjectArray, JString},
- sys::{jboolean, jint, jobjectArray},
+ sys::{jboolean, jint},
JNIEnv,
};
@@ -50,7 +50,7 @@ pub fn convert_column_descriptor(
scale: jint,
time_unit: jint,
is_adjusted_utc: jboolean,
- jni_path: jobjectArray,
+ jni_path: JObjectArray,
) -> JNIResult<ColumnDescriptor> {
let physical_type = convert_physical_type(physical_type_id);
let type_length = fix_type_length(&physical_type, type_length);
@@ -132,8 +132,7 @@ impl TypePromotionInfo {
}
}
-fn convert_column_path(env: &mut JNIEnv, path: jobjectArray) ->
JNIResult<ColumnPath> {
- let path_array = unsafe { JObjectArray::from_raw(path) };
+fn convert_column_path(env: &mut JNIEnv, path_array: JObjectArray) ->
JNIResult<ColumnPath> {
let array_len = env.get_array_length(&path_array)?;
let mut res: Vec<String> = Vec::new();
for i in 0..array_len {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]