This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new f697d2789 perf: executePlan uses a channel to park executor task
thread instead of yield_now() (#3553)
f697d2789 is described below
commit f697d27893bdf2cb70d2b6fddb14d9bc3d617dd6
Author: Matt Butrovich <[email protected]>
AuthorDate: Fri Feb 20 09:38:07 2026 -0500
perf: executePlan uses a channel to park executor task thread instead of
yield_now() (#3553)
---
docs/source/contributor-guide/development.md | 73 +++++++++++++++++
native/core/src/execution/jni_api.rs | 112 +++++++++++++++++----------
2 files changed, 144 insertions(+), 41 deletions(-)
diff --git a/docs/source/contributor-guide/development.md
b/docs/source/contributor-guide/development.md
index 14a67ff84..b83f3174d 100644
--- a/docs/source/contributor-guide/development.md
+++ b/docs/source/contributor-guide/development.md
@@ -28,6 +28,79 @@ under the License.
├── spark <- Spark integration
```
+## Threading Architecture
+
+Comet's native execution runs on a shared tokio multi-threaded runtime.
Understanding this
+architecture is important because it affects how you write native operators
and JVM callbacks.
+
+### How execution works
+
+Spark calls into native code via JNI from an **executor task thread**. There
are two execution
+paths depending on whether the plan reads data from the JVM:
+
+**Async I/O path (no JVM data sources, e.g. Iceberg scans):** The DataFusion
stream is spawned
+onto a tokio worker thread and batches are delivered to the executor thread
via an `mpsc` channel.
+The executor thread parks in `blocking_recv()` until the next batch is ready.
This avoids
+busy-polling on I/O-bound workloads.
+
+**JVM data source path (ScanExec present):** The executor thread calls
`block_on()` and polls the
+DataFusion stream directly, interleaving `pull_input_batches()` calls on
`Poll::Pending` to feed
+data from the JVM into ScanExec operators.
+
+In both cases, DataFusion operators execute on **tokio worker threads**, not
on the Spark executor
+task thread. All Spark tasks on an executor share one tokio runtime.
+
+### Rules for native code
+
+**Do not use `thread_local!` or assume thread identity.** Tokio may run your
operator's `poll`
+method on any worker thread, and may move it between threads across polls. Any
state must live
+in the operator struct or be shared via `Arc`.
+
+**JNI calls work from any thread, but have overhead.** `JVMClasses::get_env()`
calls
+`AttachCurrentThread`, which acquires JVM internal locks. The `AttachGuard`
detaches the thread
+when dropped. Repeated attach/detach cycles on tokio workers add overhead, so
avoid calling
+into the JVM on hot paths during stream execution.
+
+**Do not call `TaskContext.get()` from JVM callbacks during execution.**
Spark's `TaskContext` is
+a `ThreadLocal` on the executor task thread. JVM methods invoked from tokio
worker threads will
+see `null`. If you need task metadata, capture it at construction time (in
`createPlan` or
+operator setup) and store it in the operator. See `CometTaskMemoryManager` for
an example — it
+captures `TaskContext.get().taskMemoryManager()` in its constructor and uses
the stored reference
+thereafter.
+
+**Memory pool operations call into the JVM.** `CometUnifiedMemoryPool` and
`CometFairMemoryPool`
+call `acquireMemory()` / `releaseMemory()` via JNI whenever DataFusion
operators grow or shrink
+memory reservations. This happens on whatever thread the operator is executing
on. These calls
+are thread-safe (they use stored `GlobalRef`s, not thread-locals), but they do
trigger
+`AttachCurrentThread`.
+
+**Scalar subqueries call into the JVM.** `Subquery::evaluate()` calls static
methods on
+`CometScalarSubquery` via JNI. These use a static `HashMap`, not
thread-locals, so they are
+safe from any thread.
+
+**Parquet encryption calls into the JVM.** `CometKeyRetriever::retrieve_key()`
calls the JVM
+to unwrap decryption keys during Parquet reads. It uses a stored `GlobalRef`
and a cached
+`JMethodID`, so it is safe from any thread.
+
+### The tokio runtime
+
+The runtime is created once per executor JVM in a `Lazy<Runtime>` static:
+
+- **Worker threads:** `num_cpus` by default, configurable via
`COMET_WORKER_THREADS`
+- **Max blocking threads:** 512 by default, configurable via
`COMET_MAX_BLOCKING_THREADS`
+- All async I/O (S3, HTTP, Parquet reads) runs on worker threads as
non-blocking futures
+
+### Summary of what is safe and what is not
+
+| Pattern | Safe? | Notes
|
+| ----------------------------------------- | ------ |
---------------------------------------- |
+| `Arc<T>` shared across operators | Yes | Standard Rust thread
safety |
+| `JVMClasses::get_env()` from tokio worker | Yes | Attaches thread to JVM
automatically |
+| `thread_local!` in operator code | **No** | Tokio moves tasks
between threads |
+| `TaskContext.get()` in JVM callback | **No** | Returns `null` on
non-executor threads |
+| Storing `JNIEnv` in an operator | **No** | `JNIEnv` is
thread-specific |
+| Capturing state at plan creation time | Yes | Runs on executor
thread, store in struct |
+
## Development Setup
1. Make sure `JAVA_HOME` is set and point to JDK using [support
matrix](../user-guide/latest/installation.md)
diff --git a/native/core/src/execution/jni_api.rs
b/native/core/src/execution/jni_api.rs
index 436e5e99c..bafde3a0c 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -29,7 +29,7 @@ use crate::{
use arrow::array::{Array, RecordBatch, UInt32Array};
use arrow::compute::{take, TakeOptions};
use arrow::datatypes::DataType as ArrowDataType;
-use datafusion::common::ScalarValue;
+use datafusion::common::{Result as DataFusionResult, ScalarValue};
use datafusion::execution::disk_manager::DiskManagerMode;
use datafusion::execution::memory_pool::MemoryPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
@@ -73,6 +73,7 @@ use std::path::PathBuf;
use std::time::{Duration, Instant};
use std::{sync::Arc, task::Poll};
use tokio::runtime::Runtime;
+use tokio::sync::mpsc;
use crate::execution::memory_pools::{
create_memory_pool, handle_task_shared_pool_release,
parse_memory_pool_config, MemoryPoolConfig,
@@ -136,6 +137,8 @@ struct ExecutionContext {
pub input_sources: Vec<Arc<GlobalRef>>,
/// The record batch stream to pull results from
pub stream: Option<SendableRecordBatchStream>,
+ /// Receives batches from a spawned tokio task (async I/O path)
+ pub batch_receiver: Option<mpsc::Receiver<DataFusionResult<RecordBatch>>>,
/// Native metrics
pub metrics: Arc<GlobalRef>,
// The interval in milliseconds to update metrics
@@ -287,6 +290,7 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
scans: vec![],
input_sources,
stream: None,
+ batch_receiver: None,
metrics,
metrics_update_interval,
metrics_last_update_time: Instant::now(),
@@ -530,21 +534,62 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_executePlan(
// Each Comet native execution corresponds to a single Spark
partition,
// so we should always execute partition 0.
let stream = root_op.native_plan.execute(0, task_ctx)?;
- exec_context.stream = Some(stream);
+
+ if exec_context.scans.is_empty() {
+ // No JVM data sources — spawn onto tokio so the executor
+ // thread parks in blocking_recv instead of busy-polling.
+ //
+ // Channel capacity of 2 allows the producer to work one
batch
+ // ahead while the consumer processes the current one via
JNI,
+ // without buffering excessive memory. Increasing this
would
+ // trade memory for latency hiding if JNI/FFI overhead
dominates;
+ // decreasing to 1 would serialize production and
consumption.
+ let (tx, rx) = mpsc::channel(2);
+ let mut stream = stream;
+ get_runtime().spawn(async move {
+ while let Some(batch) = stream.next().await {
+ if tx.send(batch).await.is_err() {
+ break;
+ }
+ }
+ });
+ exec_context.batch_receiver = Some(rx);
+ } else {
+ exec_context.stream = Some(stream);
+ }
} else {
// Pull input batches
pull_input_batches(exec_context)?;
}
- // Enter the runtime once for the entire polling loop to avoid
repeated
- // Runtime::enter() overhead
+ if let Some(rx) = &mut exec_context.batch_receiver {
+ match rx.blocking_recv() {
+ Some(Ok(batch)) => {
+ update_metrics(&mut env, exec_context)?;
+ return prepare_output(
+ &mut env,
+ array_addrs,
+ schema_addrs,
+ batch,
+ exec_context.debug_native,
+ );
+ }
+ Some(Err(e)) => {
+ return Err(e.into());
+ }
+ None => {
+ log_plan_metrics(exec_context, stage_id, partition);
+ return Ok(-1);
+ }
+ }
+ }
+
+ // ScanExec path: busy-poll to interleave JVM batch pulls with
stream polling
get_runtime().block_on(async {
loop {
- // Polling the stream.
let next_item =
exec_context.stream.as_mut().unwrap().next();
let poll_output = poll!(next_item);
- // update metrics at interval
// Only check time every 100 polls to reduce syscall
overhead
if let Some(interval) =
exec_context.metrics_update_interval {
exec_context.poll_count_since_metrics_check += 1;
@@ -560,7 +605,6 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_executePlan(
match poll_output {
Poll::Ready(Some(output)) => {
- // prepare output for FFI transfer
return prepare_output(
&mut env,
array_addrs,
@@ -570,43 +614,14 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_executePlan(
);
}
Poll::Ready(None) => {
- // Reaches EOF of output.
- if exec_context.explain_native {
- if let Some(plan) = &exec_context.root_op {
- let formatted_plan_str =
DisplayableExecutionPlan::with_metrics(
- plan.native_plan.as_ref(),
- )
- .indent(true);
- info!(
- "Comet native query plan with metrics
(Plan #{} Stage {} Partition {}):\
- \n plan creation took {:?}:\
- \n{formatted_plan_str:}",
- plan.plan_id, stage_id, partition,
exec_context.plan_creation_time
- );
- }
- }
+ log_plan_metrics(exec_context, stage_id,
partition);
return Ok(-1);
}
- // A poll pending means the stream is not ready yet.
Poll::Pending => {
- if exec_context.scans.is_empty() {
- // Pure async I/O (e.g., IcebergScanExec,
DataSourceExec)
- // Yield to let the executor drive I/O instead
of busy-polling
- tokio::task::yield_now().await;
- } else {
- // Has ScanExec operators
- // Busy-poll to pull batches from JVM
- // TODO: Investigate if JNI calls are safe
without block_in_place.
- // block_in_place prevents Tokio from
migrating this task to another thread,
- // which is necessary because JNI env is
thread-local. If we can guarantee
- // thread safety another way, we could remove
this wrapper for better perf.
- tokio::task::block_in_place(|| {
- pull_input_batches(exec_context)
- })?;
- }
-
- // Output not ready yet
- continue;
+ // JNI call to pull batches from JVM into ScanExec
operators.
+ // block_in_place lets tokio move other tasks off
this worker
+ // while we wait for JVM data.
+ tokio::task::block_in_place(||
pull_input_batches(exec_context))?;
}
}
}
@@ -648,6 +663,21 @@ fn update_metrics(env: &mut JNIEnv, exec_context: &mut
ExecutionContext) -> Come
}
}
+fn log_plan_metrics(exec_context: &ExecutionContext, stage_id: jint,
partition: jint) {
+ if exec_context.explain_native {
+ if let Some(plan) = &exec_context.root_op {
+ let formatted_plan_str =
+
DisplayableExecutionPlan::with_metrics(plan.native_plan.as_ref()).indent(true);
+ info!(
+ "Comet native query plan with metrics (Plan #{} Stage {}
Partition {}):\
+ \n plan creation took {:?}:\
+ \n{formatted_plan_str:}",
+ plan.plan_id, stage_id, partition,
exec_context.plan_creation_time
+ );
+ }
+ }
+}
+
fn convert_datatype_arrays(
env: &'_ mut JNIEnv<'_>,
serialized_datatypes: JObjectArray,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]