This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 58dee739 minor: refactor prepare_output so that it does not require an 
ExecutionContext (#1194)
58dee739 is described below

commit 58dee739b6b8c3e7c6057e01c72307cdcff56ada
Author: Andy Grove <[email protected]>
AuthorDate: Sun Dec 22 12:25:35 2024 -0700

    minor: refactor prepare_output so that it does not require an 
ExecutionContext (#1194)
---
 native/core/src/execution/jni_api.rs | 20 +++++++-------------
 1 file changed, 7 insertions(+), 13 deletions(-)

diff --git a/native/core/src/execution/jni_api.rs 
b/native/core/src/execution/jni_api.rs
index eb73675b..2c1a55f4 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -231,7 +231,7 @@ fn prepare_output(
     array_addrs: jlongArray,
     schema_addrs: jlongArray,
     output_batch: RecordBatch,
-    exec_context: &mut ExecutionContext,
+    validate: bool,
 ) -> CometResult<jlong> {
     let array_address_array = unsafe { JLongArray::from_raw(array_addrs) };
     let num_cols = env.get_array_length(&array_address_array)? as usize;
@@ -255,7 +255,7 @@ fn prepare_output(
         )));
     }
 
-    if exec_context.debug_native {
+    if validate {
         // Validate the output arrays.
         for array in results.iter() {
             let array_data = array.to_data();
@@ -275,9 +275,6 @@ fn prepare_output(
         i += 1;
     }
 
-    // Update metrics
-    update_metrics(env, exec_context)?;
-
     Ok(num_rows as jlong)
 }
 
@@ -356,22 +353,22 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_executePlan(
             let next_item = exec_context.stream.as_mut().unwrap().next();
             let poll_output = exec_context.runtime.block_on(async { 
poll!(next_item) });
 
+            // Update metrics
+            update_metrics(&mut env, exec_context)?;
+
             match poll_output {
                 Poll::Ready(Some(output)) => {
+                    // prepare output for FFI transfer
                     return prepare_output(
                         &mut env,
                         array_addrs,
                         schema_addrs,
                         output?,
-                        exec_context,
+                        exec_context.debug_native,
                     );
                 }
                 Poll::Ready(None) => {
                     // Reaches EOF of output.
-
-                    // Update metrics
-                    update_metrics(&mut env, exec_context)?;
-
                     if exec_context.explain_native {
                         if let Some(plan) = &exec_context.root_op {
                             let formatted_plan_str =
@@ -391,9 +388,6 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_executePlan(
                 // A poll pending means there are more than one blocking 
operators,
                 // we don't need go back-forth between JVM/Native. Just 
keeping polling.
                 Poll::Pending => {
-                    // Update metrics
-                    update_metrics(&mut env, exec_context)?;
-
                     // Pull input batches
                     pull_input_batches(exec_context)?;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to