This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new f697d2789 perf: executePlan uses a channel to park executor task 
thread instead of yield_now() (#3553)
f697d2789 is described below

commit f697d27893bdf2cb70d2b6fddb14d9bc3d617dd6
Author: Matt Butrovich <[email protected]>
AuthorDate: Fri Feb 20 09:38:07 2026 -0500

    perf: executePlan uses a channel to park executor task thread instead of 
yield_now() (#3553)
---
 docs/source/contributor-guide/development.md |  73 +++++++++++++++++
 native/core/src/execution/jni_api.rs         | 112 +++++++++++++++++----------
 2 files changed, 144 insertions(+), 41 deletions(-)

diff --git a/docs/source/contributor-guide/development.md 
b/docs/source/contributor-guide/development.md
index 14a67ff84..b83f3174d 100644
--- a/docs/source/contributor-guide/development.md
+++ b/docs/source/contributor-guide/development.md
@@ -28,6 +28,79 @@ under the License.
 ├── spark      <- Spark integration
 ```
 
+## Threading Architecture
+
+Comet's native execution runs on a shared tokio multi-threaded runtime. 
Understanding this
+architecture is important because it affects how you write native operators 
and JVM callbacks.
+
+### How execution works
+
+Spark calls into native code via JNI from an **executor task thread**. There 
are two execution
+paths depending on whether the plan reads data from the JVM:
+
+**Async I/O path (no JVM data sources, e.g. Iceberg scans):** The DataFusion 
stream is spawned
+onto a tokio worker thread and batches are delivered to the executor thread 
via an `mpsc` channel.
+The executor thread parks in `blocking_recv()` until the next batch is ready. 
This avoids
+busy-polling on I/O-bound workloads.
+
+**JVM data source path (ScanExec present):** The executor thread calls 
`block_on()` and polls the
+DataFusion stream directly, interleaving `pull_input_batches()` calls on 
`Poll::Pending` to feed
+data from the JVM into ScanExec operators.
+
+In both cases, DataFusion operators execute on **tokio worker threads**, not 
on the Spark executor
+task thread. All Spark tasks on an executor share one tokio runtime.
+
+### Rules for native code
+
+**Do not use `thread_local!` or assume thread identity.** Tokio may run your 
operator's `poll`
+method on any worker thread, and may move it between threads across polls. Any 
state must live
+in the operator struct or be shared via `Arc`.
+
+**JNI calls work from any thread, but have overhead.** `JVMClasses::get_env()` 
calls
+`AttachCurrentThread`, which acquires JVM internal locks. The `AttachGuard` 
detaches the thread
+when dropped. Repeated attach/detach cycles on tokio workers add overhead, so 
avoid calling
+into the JVM on hot paths during stream execution.
+
+**Do not call `TaskContext.get()` from JVM callbacks during execution.** 
Spark's `TaskContext` is
+a `ThreadLocal` on the executor task thread. JVM methods invoked from tokio 
worker threads will
+see `null`. If you need task metadata, capture it at construction time (in 
`createPlan` or
+operator setup) and store it in the operator. See `CometTaskMemoryManager` for 
an example — it
+captures `TaskContext.get().taskMemoryManager()` in its constructor and uses 
the stored reference
+thereafter.
+
+**Memory pool operations call into the JVM.** `CometUnifiedMemoryPool` and 
`CometFairMemoryPool`
+call `acquireMemory()` / `releaseMemory()` via JNI whenever DataFusion 
operators grow or shrink
+memory reservations. This happens on whatever thread the operator is executing 
on. These calls
+are thread-safe (they use stored `GlobalRef`s, not thread-locals), but they do 
trigger
+`AttachCurrentThread`.
+
+**Scalar subqueries call into the JVM.** `Subquery::evaluate()` calls static 
methods on
+`CometScalarSubquery` via JNI. These use a static `HashMap`, not 
thread-locals, so they are
+safe from any thread.
+
+**Parquet encryption calls into the JVM.** `CometKeyRetriever::retrieve_key()` 
calls the JVM
+to unwrap decryption keys during Parquet reads. It uses a stored `GlobalRef` 
and a cached
+`JMethodID`, so it is safe from any thread.
+
+### The tokio runtime
+
+The runtime is created once per executor JVM in a `Lazy<Runtime>` static:
+
+- **Worker threads:** `num_cpus` by default, configurable via 
`COMET_WORKER_THREADS`
+- **Max blocking threads:** 512 by default, configurable via 
`COMET_MAX_BLOCKING_THREADS`
+- All async I/O (S3, HTTP, Parquet reads) runs on worker threads as 
non-blocking futures
+
+### Summary of what is safe and what is not
+
+| Pattern                                   | Safe?  | Notes                   
                 |
+| ----------------------------------------- | ------ | 
---------------------------------------- |
+| `Arc<T>` shared across operators          | Yes    | Standard Rust thread 
safety              |
+| `JVMClasses::get_env()` from tokio worker | Yes    | Attaches thread to JVM 
automatically     |
+| `thread_local!` in operator code          | **No** | Tokio moves tasks 
between threads        |
+| `TaskContext.get()` in JVM callback       | **No** | Returns `null` on 
non-executor threads   |
+| Storing `JNIEnv` in an operator           | **No** | `JNIEnv` is 
thread-specific              |
+| Capturing state at plan creation time     | Yes    | Runs on executor 
thread, store in struct |
+
 ## Development Setup
 
 1. Make sure `JAVA_HOME` is set and point to JDK using [support 
matrix](../user-guide/latest/installation.md)
diff --git a/native/core/src/execution/jni_api.rs 
b/native/core/src/execution/jni_api.rs
index 436e5e99c..bafde3a0c 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -29,7 +29,7 @@ use crate::{
 use arrow::array::{Array, RecordBatch, UInt32Array};
 use arrow::compute::{take, TakeOptions};
 use arrow::datatypes::DataType as ArrowDataType;
-use datafusion::common::ScalarValue;
+use datafusion::common::{Result as DataFusionResult, ScalarValue};
 use datafusion::execution::disk_manager::DiskManagerMode;
 use datafusion::execution::memory_pool::MemoryPool;
 use datafusion::execution::runtime_env::RuntimeEnvBuilder;
@@ -73,6 +73,7 @@ use std::path::PathBuf;
 use std::time::{Duration, Instant};
 use std::{sync::Arc, task::Poll};
 use tokio::runtime::Runtime;
+use tokio::sync::mpsc;
 
 use crate::execution::memory_pools::{
     create_memory_pool, handle_task_shared_pool_release, 
parse_memory_pool_config, MemoryPoolConfig,
@@ -136,6 +137,8 @@ struct ExecutionContext {
     pub input_sources: Vec<Arc<GlobalRef>>,
     /// The record batch stream to pull results from
     pub stream: Option<SendableRecordBatchStream>,
+    /// Receives batches from a spawned tokio task (async I/O path)
+    pub batch_receiver: Option<mpsc::Receiver<DataFusionResult<RecordBatch>>>,
     /// Native metrics
     pub metrics: Arc<GlobalRef>,
     // The interval in milliseconds to update metrics
@@ -287,6 +290,7 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
                 scans: vec![],
                 input_sources,
                 stream: None,
+                batch_receiver: None,
                 metrics,
                 metrics_update_interval,
                 metrics_last_update_time: Instant::now(),
@@ -530,21 +534,62 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_executePlan(
                 // Each Comet native execution corresponds to a single Spark 
partition,
                 // so we should always execute partition 0.
                 let stream = root_op.native_plan.execute(0, task_ctx)?;
-                exec_context.stream = Some(stream);
+
+                if exec_context.scans.is_empty() {
+                    // No JVM data sources — spawn onto tokio so the executor
+                    // thread parks in blocking_recv instead of busy-polling.
+                    //
+                    // Channel capacity of 2 allows the producer to work one 
batch
+                    // ahead while the consumer processes the current one via 
JNI,
+                    // without buffering excessive memory. Increasing this 
would
+                    // trade memory for latency hiding if JNI/FFI overhead 
dominates;
+                    // decreasing to 1 would serialize production and 
consumption.
+                    let (tx, rx) = mpsc::channel(2);
+                    let mut stream = stream;
+                    get_runtime().spawn(async move {
+                        while let Some(batch) = stream.next().await {
+                            if tx.send(batch).await.is_err() {
+                                break;
+                            }
+                        }
+                    });
+                    exec_context.batch_receiver = Some(rx);
+                } else {
+                    exec_context.stream = Some(stream);
+                }
             } else {
                 // Pull input batches
                 pull_input_batches(exec_context)?;
             }
 
-            // Enter the runtime once for the entire polling loop to avoid 
repeated
-            // Runtime::enter() overhead
+            if let Some(rx) = &mut exec_context.batch_receiver {
+                match rx.blocking_recv() {
+                    Some(Ok(batch)) => {
+                        update_metrics(&mut env, exec_context)?;
+                        return prepare_output(
+                            &mut env,
+                            array_addrs,
+                            schema_addrs,
+                            batch,
+                            exec_context.debug_native,
+                        );
+                    }
+                    Some(Err(e)) => {
+                        return Err(e.into());
+                    }
+                    None => {
+                        log_plan_metrics(exec_context, stage_id, partition);
+                        return Ok(-1);
+                    }
+                }
+            }
+
+            // ScanExec path: busy-poll to interleave JVM batch pulls with 
stream polling
             get_runtime().block_on(async {
                 loop {
-                    // Polling the stream.
                     let next_item = 
exec_context.stream.as_mut().unwrap().next();
                     let poll_output = poll!(next_item);
 
-                    // update metrics at interval
                     // Only check time every 100 polls to reduce syscall 
overhead
                     if let Some(interval) = 
exec_context.metrics_update_interval {
                         exec_context.poll_count_since_metrics_check += 1;
@@ -560,7 +605,6 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_executePlan(
 
                     match poll_output {
                         Poll::Ready(Some(output)) => {
-                            // prepare output for FFI transfer
                             return prepare_output(
                                 &mut env,
                                 array_addrs,
@@ -570,43 +614,14 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_executePlan(
                             );
                         }
                         Poll::Ready(None) => {
-                            // Reaches EOF of output.
-                            if exec_context.explain_native {
-                                if let Some(plan) = &exec_context.root_op {
-                                    let formatted_plan_str = 
DisplayableExecutionPlan::with_metrics(
-                                        plan.native_plan.as_ref(),
-                                    )
-                                    .indent(true);
-                                    info!(
-                                        "Comet native query plan with metrics 
(Plan #{} Stage {} Partition {}):\
-                                    \n plan creation took {:?}:\
-                                    \n{formatted_plan_str:}",
-                                        plan.plan_id, stage_id, partition, 
exec_context.plan_creation_time
-                                    );
-                                }
-                            }
+                            log_plan_metrics(exec_context, stage_id, 
partition);
                             return Ok(-1);
                         }
-                        // A poll pending means the stream is not ready yet.
                         Poll::Pending => {
-                            if exec_context.scans.is_empty() {
-                                // Pure async I/O (e.g., IcebergScanExec, 
DataSourceExec)
-                                // Yield to let the executor drive I/O instead 
of busy-polling
-                                tokio::task::yield_now().await;
-                            } else {
-                                // Has ScanExec operators
-                                // Busy-poll to pull batches from JVM
-                                // TODO: Investigate if JNI calls are safe 
without block_in_place.
-                                // block_in_place prevents Tokio from 
migrating this task to another thread,
-                                // which is necessary because JNI env is 
thread-local. If we can guarantee
-                                // thread safety another way, we could remove 
this wrapper for better perf.
-                                tokio::task::block_in_place(|| {
-                                    pull_input_batches(exec_context)
-                                })?;
-                            }
-
-                            // Output not ready yet
-                            continue;
+                            // JNI call to pull batches from JVM into ScanExec 
operators.
+                            // block_in_place lets tokio move other tasks off 
this worker
+                            // while we wait for JVM data.
+                            tokio::task::block_in_place(|| 
pull_input_batches(exec_context))?;
                         }
                     }
                 }
@@ -648,6 +663,21 @@ fn update_metrics(env: &mut JNIEnv, exec_context: &mut 
ExecutionContext) -> Come
     }
 }
 
+fn log_plan_metrics(exec_context: &ExecutionContext, stage_id: jint, 
partition: jint) {
+    if exec_context.explain_native {
+        if let Some(plan) = &exec_context.root_op {
+            let formatted_plan_str =
+                
DisplayableExecutionPlan::with_metrics(plan.native_plan.as_ref()).indent(true);
+            info!(
+                "Comet native query plan with metrics (Plan #{} Stage {} 
Partition {}):\
+                \n plan creation took {:?}:\
+                \n{formatted_plan_str:}",
+                plan.plan_id, stage_id, partition, 
exec_context.plan_creation_time
+            );
+        }
+    }
+}
+
 fn convert_datatype_arrays(
     env: &'_ mut JNIEnv<'_>,
     serialized_datatypes: JObjectArray,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to