This is an automated email from the ASF dual-hosted git repository. mbutrovich pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 2955a2112 fix: zero Arrow Array offset before sending across FFI (#2052) 2955a2112 is described below commit 2955a211272ce27b3be279cdf1edd29fa362e982 Author: Matt Butrovich <mbutrov...@users.noreply.github.com> AuthorDate: Thu Jul 31 16:24:10 2025 -0400 fix: zero Arrow Array offset before sending across FFI (#2052) --- native/core/src/execution/jni_api.rs | 68 +++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 25 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index a7ddce34f..a5564c1b4 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -18,28 +18,6 @@ //! Define JNI APIs which can be called from Java/Scala. use super::{serde, utils::SparkArrowConvert}; -use arrow::array::RecordBatch; -use arrow::datatypes::DataType as ArrowDataType; -use datafusion::execution::memory_pool::MemoryPool; -use datafusion::{ - execution::{disk_manager::DiskManagerBuilder, runtime_env::RuntimeEnv}, - physical_plan::{display::DisplayableExecutionPlan, SendableRecordBatchStream}, - prelude::{SessionConfig, SessionContext}, -}; -use futures::poll; -use jni::{ - errors::Result as JNIResult, - objects::{ - JByteArray, JClass, JIntArray, JLongArray, JObject, JObjectArray, JPrimitiveArray, JString, - ReleaseMode, - }, - sys::{jbyteArray, jint, jlong, jlongArray}, - JNIEnv, -}; -use std::path::PathBuf; -use std::time::{Duration, Instant}; -use std::{sync::Arc, task::Poll}; - use crate::{ errors::{try_unwrap_or_throw, CometError, CometResult}, execution::{ @@ -48,19 +26,41 @@ use crate::{ }, jvm_bridge::{jni_new_global_ref, JVMClasses}, }; +use arrow::array::{Array, RecordBatch, UInt32Array}; +use arrow::compute::{take, TakeOptions}; +use arrow::datatypes::DataType as ArrowDataType; use datafusion::common::ScalarValue; use datafusion::execution::disk_manager::DiskManagerMode; +use datafusion::execution::memory_pool::MemoryPool; use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::logical_expr::ScalarUDF; +use datafusion::{ + execution::{disk_manager::DiskManagerBuilder, runtime_env::RuntimeEnv}, + physical_plan::{display::DisplayableExecutionPlan, SendableRecordBatchStream}, + prelude::{SessionConfig, SessionContext}, +}; use datafusion_comet_proto::spark_operator::Operator; use datafusion_spark::function::math::expm1::SparkExpm1; +use futures::poll; use futures::stream::StreamExt; use jni::objects::JByteBuffer; use jni::sys::JNI_FALSE; +use jni::{ + errors::Result as JNIResult, + objects::{ + JByteArray, JClass, JIntArray, JLongArray, JObject, JObjectArray, JPrimitiveArray, JString, + ReleaseMode, + }, + sys::{jbyteArray, jint, jlong, jlongArray}, + JNIEnv, +}; use jni::{ objects::GlobalRef, sys::{jboolean, jdouble, jintArray, jobjectArray, jstring}, }; +use std::path::PathBuf; +use std::time::{Duration, Instant}; +use std::{sync::Arc, task::Poll}; use tokio::runtime::Runtime; use crate::execution::memory_pools::{ @@ -341,10 +341,28 @@ fn prepare_output( let mut i = 0; while i < results.len() { let array_ref = results.get(i).ok_or(CometError::IndexOutOfBounds(i))?; - array_ref - .to_data() - .move_to_spark(array_addrs[i], schema_addrs[i])?; + if array_ref.offset() != 0 { + // https://github.com/apache/datafusion-comet/issues/2051 + // Bug with non-zero offset FFI, so take to a new array which will have an offset of 0. + // We expect this to be a cold code path, hence the check_bounds: true and assert_eq. + let indices = UInt32Array::from((0..num_rows as u32).collect::<Vec<u32>>()); + let new_array = take( + array_ref, + &indices, + Some(TakeOptions { check_bounds: true }), + )?; + + assert_eq!(new_array.offset(), 0); + + new_array + .to_data() + .move_to_spark(array_addrs[i], schema_addrs[i])?; + } else { + array_ref + .to_data() + .move_to_spark(array_addrs[i], schema_addrs[i])?; + } i += 1; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org