This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 2955a2112 fix: zero Arrow Array offset before sending across FFI 
(#2052)
2955a2112 is described below

commit 2955a211272ce27b3be279cdf1edd29fa362e982
Author: Matt Butrovich <mbutrov...@users.noreply.github.com>
AuthorDate: Thu Jul 31 16:24:10 2025 -0400

    fix: zero Arrow Array offset before sending across FFI (#2052)
---
 native/core/src/execution/jni_api.rs | 68 +++++++++++++++++++++++-------------
 1 file changed, 43 insertions(+), 25 deletions(-)

diff --git a/native/core/src/execution/jni_api.rs 
b/native/core/src/execution/jni_api.rs
index a7ddce34f..a5564c1b4 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -18,28 +18,6 @@
 //! Define JNI APIs which can be called from Java/Scala.
 
 use super::{serde, utils::SparkArrowConvert};
-use arrow::array::RecordBatch;
-use arrow::datatypes::DataType as ArrowDataType;
-use datafusion::execution::memory_pool::MemoryPool;
-use datafusion::{
-    execution::{disk_manager::DiskManagerBuilder, runtime_env::RuntimeEnv},
-    physical_plan::{display::DisplayableExecutionPlan, 
SendableRecordBatchStream},
-    prelude::{SessionConfig, SessionContext},
-};
-use futures::poll;
-use jni::{
-    errors::Result as JNIResult,
-    objects::{
-        JByteArray, JClass, JIntArray, JLongArray, JObject, JObjectArray, 
JPrimitiveArray, JString,
-        ReleaseMode,
-    },
-    sys::{jbyteArray, jint, jlong, jlongArray},
-    JNIEnv,
-};
-use std::path::PathBuf;
-use std::time::{Duration, Instant};
-use std::{sync::Arc, task::Poll};
-
 use crate::{
     errors::{try_unwrap_or_throw, CometError, CometResult},
     execution::{
@@ -48,19 +26,41 @@ use crate::{
     },
     jvm_bridge::{jni_new_global_ref, JVMClasses},
 };
+use arrow::array::{Array, RecordBatch, UInt32Array};
+use arrow::compute::{take, TakeOptions};
+use arrow::datatypes::DataType as ArrowDataType;
 use datafusion::common::ScalarValue;
 use datafusion::execution::disk_manager::DiskManagerMode;
+use datafusion::execution::memory_pool::MemoryPool;
 use datafusion::execution::runtime_env::RuntimeEnvBuilder;
 use datafusion::logical_expr::ScalarUDF;
+use datafusion::{
+    execution::{disk_manager::DiskManagerBuilder, runtime_env::RuntimeEnv},
+    physical_plan::{display::DisplayableExecutionPlan, 
SendableRecordBatchStream},
+    prelude::{SessionConfig, SessionContext},
+};
 use datafusion_comet_proto::spark_operator::Operator;
 use datafusion_spark::function::math::expm1::SparkExpm1;
+use futures::poll;
 use futures::stream::StreamExt;
 use jni::objects::JByteBuffer;
 use jni::sys::JNI_FALSE;
+use jni::{
+    errors::Result as JNIResult,
+    objects::{
+        JByteArray, JClass, JIntArray, JLongArray, JObject, JObjectArray, 
JPrimitiveArray, JString,
+        ReleaseMode,
+    },
+    sys::{jbyteArray, jint, jlong, jlongArray},
+    JNIEnv,
+};
 use jni::{
     objects::GlobalRef,
     sys::{jboolean, jdouble, jintArray, jobjectArray, jstring},
 };
+use std::path::PathBuf;
+use std::time::{Duration, Instant};
+use std::{sync::Arc, task::Poll};
 use tokio::runtime::Runtime;
 
 use crate::execution::memory_pools::{
@@ -341,10 +341,28 @@ fn prepare_output(
         let mut i = 0;
         while i < results.len() {
             let array_ref = 
results.get(i).ok_or(CometError::IndexOutOfBounds(i))?;
-            array_ref
-                .to_data()
-                .move_to_spark(array_addrs[i], schema_addrs[i])?;
 
+            if array_ref.offset() != 0 {
+                // https://github.com/apache/datafusion-comet/issues/2051
+                // Bug with non-zero offset FFI, so take to a new array which 
will have an offset of 0.
+                // We expect this to be a cold code path, hence the 
check_bounds: true and assert_eq.
+                let indices = UInt32Array::from((0..num_rows as 
u32).collect::<Vec<u32>>());
+                let new_array = take(
+                    array_ref,
+                    &indices,
+                    Some(TakeOptions { check_bounds: true }),
+                )?;
+
+                assert_eq!(new_array.offset(), 0);
+
+                new_array
+                    .to_data()
+                    .move_to_spark(array_addrs[i], schema_addrs[i])?;
+            } else {
+                array_ref
+                    .to_data()
+                    .move_to_spark(array_addrs[i], schema_addrs[i])?;
+            }
             i += 1;
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to