This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 3dcd9add6 chore: Remove low-level ffi/jvm timers from native
`ScanExec` (#2939)
3dcd9add6 is described below
commit 3dcd9add65aebe2621c053bb947361df75d0ea5b
Author: Andy Grove <[email protected]>
AuthorDate: Fri Dec 19 06:54:54 2025 -0700
chore: Remove low-level ffi/jvm timers from native `ScanExec` (#2939)
---
native/core/src/execution/operators/scan.rs | 34 +----------------------------
1 file changed, 1 insertion(+), 33 deletions(-)
diff --git a/native/core/src/execution/operators/scan.rs
b/native/core/src/execution/operators/scan.rs
index 1fedafbe8..2543705fb 100644
--- a/native/core/src/execution/operators/scan.rs
+++ b/native/core/src/execution/operators/scan.rs
@@ -77,10 +77,6 @@ pub struct ScanExec {
metrics: ExecutionPlanMetricsSet,
/// Baseline metrics
baseline_metrics: BaselineMetrics,
- /// Time waiting for JVM input plan to execute and return batches
- jvm_fetch_time: Time,
- /// Time spent in FFI
- arrow_ffi_time: Time,
/// Whether native code can assume ownership of batches that it receives
arrow_ffi_safe: bool,
}
@@ -95,8 +91,6 @@ impl ScanExec {
) -> Result<Self, CometError> {
let metrics_set = ExecutionPlanMetricsSet::default();
let baseline_metrics = BaselineMetrics::new(&metrics_set, 0);
- let arrow_ffi_time =
MetricBuilder::new(&metrics_set).subset_time("arrow_ffi_time", 0);
- let jvm_fetch_time =
MetricBuilder::new(&metrics_set).subset_time("jvm_fetch_time", 0);
// Build schema directly from data types since get_next now always
unpacks dictionaries
let schema = schema_from_data_types(&data_types);
@@ -119,8 +113,6 @@ impl ScanExec {
cache,
metrics: metrics_set,
baseline_metrics,
- jvm_fetch_time,
- arrow_ffi_time,
schema,
arrow_ffi_safe,
})
@@ -155,8 +147,6 @@ impl ScanExec {
self.exec_context_id,
self.input_source.as_ref().unwrap().as_obj(),
self.data_types.len(),
- &self.jvm_fetch_time,
- &self.arrow_ffi_time,
self.arrow_ffi_safe,
)?;
*current_batch = Some(next_batch);
@@ -172,8 +162,6 @@ impl ScanExec {
exec_context_id: i64,
iter: &JObject,
num_cols: usize,
- jvm_fetch_time: &Time,
- arrow_ffi_time: &Time,
arrow_ffi_safe: bool,
) -> Result<InputBatch, CometError> {
if exec_context_id == TEST_EXEC_CONTEXT_ID {
@@ -189,15 +177,11 @@ impl ScanExec {
let mut env = JVMClasses::get_env()?;
- let mut timer = jvm_fetch_time.timer();
-
let num_rows: i32 = unsafe {
jni_call!(&mut env,
comet_batch_iterator(iter).has_next() -> i32)?
};
- timer.stop();
-
if num_rows == -1 {
return Ok(InputBatch::EOF);
}
@@ -206,11 +190,9 @@ impl ScanExec {
// JVM via FFI
// Selection vectors can be provided by, for instance, Iceberg to
// remove rows that have been deleted.
- let selection_indices_arrays =
- Self::get_selection_indices(&mut env, iter, num_cols,
jvm_fetch_time, arrow_ffi_time)?;
+ let selection_indices_arrays = Self::get_selection_indices(&mut env,
iter, num_cols)?;
// fetch batch data from JVM via FFI
- let mut timer = arrow_ffi_time.timer();
let (num_rows, array_addrs, schema_addrs) =
Self::allocate_and_fetch_batch(&mut env, iter, num_cols)?;
@@ -262,8 +244,6 @@ impl ScanExec {
}
}
- timer.stop();
-
// If selection was applied, determine the actual row count from the
selected arrays
let actual_num_rows = if let Some(ref selection_arrays) =
selection_indices_arrays {
if !selection_arrays.is_empty() {
@@ -332,21 +312,15 @@ impl ScanExec {
env: &mut jni::JNIEnv,
iter: &JObject,
num_cols: usize,
- jvm_fetch_time: &Time,
- arrow_ffi_time: &Time,
) -> Result<Option<Vec<ArrayRef>>, CometError> {
// Check if all columns have selection vectors
- let mut timer = jvm_fetch_time.timer();
let has_selection_vectors_result: jni::sys::jboolean = unsafe {
jni_call!(env,
comet_batch_iterator(iter).has_selection_vectors() ->
jni::sys::jboolean)?
};
- timer.stop();
let has_selection_vectors = has_selection_vectors_result != 0;
let selection_indices_arrays = if has_selection_vectors {
- let mut timer = arrow_ffi_time.timer();
-
// Allocate arrays for selection indices export (one per column)
let mut indices_array_addrs = Vec::with_capacity(num_cols);
let mut indices_schema_addrs = Vec::with_capacity(num_cols);
@@ -364,10 +338,7 @@ impl ScanExec {
env.set_long_array_region(&indices_array_obj, 0,
&indices_array_addrs)?;
env.set_long_array_region(&indices_schema_obj, 0,
&indices_schema_addrs)?;
- timer.stop();
-
// Export selection indices from JVM
- let mut timer = jvm_fetch_time.timer();
let _exported_count: i32 = unsafe {
jni_call!(env,
comet_batch_iterator(iter).export_selection_indices(
@@ -375,10 +346,8 @@ impl ScanExec {
JValueGen::Object(JObject::from(indices_schema_obj).as_ref())
) -> i32)?
};
- timer.stop();
// Convert to ArrayRef for easier handling
- let mut timer = arrow_ffi_time.timer();
let mut selection_arrays = Vec::with_capacity(num_cols);
for i in 0..num_cols {
let array_data =
@@ -391,7 +360,6 @@ impl ScanExec {
Rc::from_raw(indices_schema_addrs[i] as *const
FFI_ArrowSchema);
}
}
- timer.stop();
Some(selection_arrays)
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]