This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 3dcd9add6 chore: Remove low-level ffi/jvm timers from native 
`ScanExec` (#2939)
3dcd9add6 is described below

commit 3dcd9add65aebe2621c053bb947361df75d0ea5b
Author: Andy Grove <[email protected]>
AuthorDate: Fri Dec 19 06:54:54 2025 -0700

    chore: Remove low-level ffi/jvm timers from native `ScanExec` (#2939)
---
 native/core/src/execution/operators/scan.rs | 34 +----------------------------
 1 file changed, 1 insertion(+), 33 deletions(-)

diff --git a/native/core/src/execution/operators/scan.rs 
b/native/core/src/execution/operators/scan.rs
index 1fedafbe8..2543705fb 100644
--- a/native/core/src/execution/operators/scan.rs
+++ b/native/core/src/execution/operators/scan.rs
@@ -77,10 +77,6 @@ pub struct ScanExec {
     metrics: ExecutionPlanMetricsSet,
     /// Baseline metrics
     baseline_metrics: BaselineMetrics,
-    /// Time waiting for JVM input plan to execute and return batches
-    jvm_fetch_time: Time,
-    /// Time spent in FFI
-    arrow_ffi_time: Time,
     /// Whether native code can assume ownership of batches that it receives
     arrow_ffi_safe: bool,
 }
@@ -95,8 +91,6 @@ impl ScanExec {
     ) -> Result<Self, CometError> {
         let metrics_set = ExecutionPlanMetricsSet::default();
         let baseline_metrics = BaselineMetrics::new(&metrics_set, 0);
-        let arrow_ffi_time = 
MetricBuilder::new(&metrics_set).subset_time("arrow_ffi_time", 0);
-        let jvm_fetch_time = 
MetricBuilder::new(&metrics_set).subset_time("jvm_fetch_time", 0);
 
         // Build schema directly from data types since get_next now always 
unpacks dictionaries
         let schema = schema_from_data_types(&data_types);
@@ -119,8 +113,6 @@ impl ScanExec {
             cache,
             metrics: metrics_set,
             baseline_metrics,
-            jvm_fetch_time,
-            arrow_ffi_time,
             schema,
             arrow_ffi_safe,
         })
@@ -155,8 +147,6 @@ impl ScanExec {
                 self.exec_context_id,
                 self.input_source.as_ref().unwrap().as_obj(),
                 self.data_types.len(),
-                &self.jvm_fetch_time,
-                &self.arrow_ffi_time,
                 self.arrow_ffi_safe,
             )?;
             *current_batch = Some(next_batch);
@@ -172,8 +162,6 @@ impl ScanExec {
         exec_context_id: i64,
         iter: &JObject,
         num_cols: usize,
-        jvm_fetch_time: &Time,
-        arrow_ffi_time: &Time,
         arrow_ffi_safe: bool,
     ) -> Result<InputBatch, CometError> {
         if exec_context_id == TEST_EXEC_CONTEXT_ID {
@@ -189,15 +177,11 @@ impl ScanExec {
 
         let mut env = JVMClasses::get_env()?;
 
-        let mut timer = jvm_fetch_time.timer();
-
         let num_rows: i32 = unsafe {
             jni_call!(&mut env,
         comet_batch_iterator(iter).has_next() -> i32)?
         };
 
-        timer.stop();
-
         if num_rows == -1 {
             return Ok(InputBatch::EOF);
         }
@@ -206,11 +190,9 @@ impl ScanExec {
         // JVM via FFI
         // Selection vectors can be provided by, for instance, Iceberg to
         // remove rows that have been deleted.
-        let selection_indices_arrays =
-            Self::get_selection_indices(&mut env, iter, num_cols, 
jvm_fetch_time, arrow_ffi_time)?;
+        let selection_indices_arrays = Self::get_selection_indices(&mut env, 
iter, num_cols)?;
 
         // fetch batch data from JVM via FFI
-        let mut timer = arrow_ffi_time.timer();
         let (num_rows, array_addrs, schema_addrs) =
             Self::allocate_and_fetch_batch(&mut env, iter, num_cols)?;
 
@@ -262,8 +244,6 @@ impl ScanExec {
             }
         }
 
-        timer.stop();
-
         // If selection was applied, determine the actual row count from the 
selected arrays
         let actual_num_rows = if let Some(ref selection_arrays) = 
selection_indices_arrays {
             if !selection_arrays.is_empty() {
@@ -332,21 +312,15 @@ impl ScanExec {
         env: &mut jni::JNIEnv,
         iter: &JObject,
         num_cols: usize,
-        jvm_fetch_time: &Time,
-        arrow_ffi_time: &Time,
     ) -> Result<Option<Vec<ArrayRef>>, CometError> {
         // Check if all columns have selection vectors
-        let mut timer = jvm_fetch_time.timer();
         let has_selection_vectors_result: jni::sys::jboolean = unsafe {
             jni_call!(env,
                 comet_batch_iterator(iter).has_selection_vectors() -> 
jni::sys::jboolean)?
         };
-        timer.stop();
         let has_selection_vectors = has_selection_vectors_result != 0;
 
         let selection_indices_arrays = if has_selection_vectors {
-            let mut timer = arrow_ffi_time.timer();
-
             // Allocate arrays for selection indices export (one per column)
             let mut indices_array_addrs = Vec::with_capacity(num_cols);
             let mut indices_schema_addrs = Vec::with_capacity(num_cols);
@@ -364,10 +338,7 @@ impl ScanExec {
             env.set_long_array_region(&indices_array_obj, 0, 
&indices_array_addrs)?;
             env.set_long_array_region(&indices_schema_obj, 0, 
&indices_schema_addrs)?;
 
-            timer.stop();
-
             // Export selection indices from JVM
-            let mut timer = jvm_fetch_time.timer();
             let _exported_count: i32 = unsafe {
                 jni_call!(env,
                     comet_batch_iterator(iter).export_selection_indices(
@@ -375,10 +346,8 @@ impl ScanExec {
                         
JValueGen::Object(JObject::from(indices_schema_obj).as_ref())
                     ) -> i32)?
             };
-            timer.stop();
 
             // Convert to ArrayRef for easier handling
-            let mut timer = arrow_ffi_time.timer();
             let mut selection_arrays = Vec::with_capacity(num_cols);
             for i in 0..num_cols {
                 let array_data =
@@ -391,7 +360,6 @@ impl ScanExec {
                     Rc::from_raw(indices_schema_addrs[i] as *const 
FFI_ArrowSchema);
                 }
             }
-            timer.stop();
 
             Some(selection_arrays)
         } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to