This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new fb01049d76 Example for using a separate threadpool for CPU bound work 
(try 3) (#16331)
fb01049d76 is described below

commit fb01049d76a47ccbf3c981db5499fd1baff9b1ff
Author: Andrew Lamb <and...@nerdnetworks.org>
AuthorDate: Mon Jun 23 06:03:40 2025 -0400

    Example for using a separate threadpool for CPU bound work (try 3) (#16331)
    
    * Example for using a separate threadpool for CPU bound work (try 3)
    
    Pare back example
    
    * Update datafusion-examples/examples/thread_pools.rs
    
    Co-authored-by: Bruce Ritchie <bruce.ritc...@veeva.com>
    
    * Add a note about why the main Runtime is used for IO and not CPU
    
    * remove random thought
    
    * Update comments and simplify shutdown
    
    ---------
    
    Co-authored-by: Bruce Ritchie <bruce.ritc...@veeva.com>
---
 datafusion-examples/examples/thread_pools.rs | 350 +++++++++++++++++++++++++++
 datafusion/core/src/lib.rs                   |  18 +-
 2 files changed, 361 insertions(+), 7 deletions(-)

diff --git a/datafusion-examples/examples/thread_pools.rs 
b/datafusion-examples/examples/thread_pools.rs
new file mode 100644
index 0000000000..bba56b2932
--- /dev/null
+++ b/datafusion-examples/examples/thread_pools.rs
@@ -0,0 +1,350 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example shows how to use separate thread pools (tokio [`Runtime`]))s 
to
+//! run the IO and CPU intensive parts of DataFusion plans.
+//!
+//! # Background
+//!
+//! DataFusion, by default, plans and executes all operations (both CPU and IO)
+//! on the same thread pool. This makes it fast and easy to get started, but
+//! can cause issues when running at scale, especially when fetching and 
operating
+//! on data directly from remote sources.
+//!
+//! Specifically, without configuration such as in this example, DataFusion
+//! plans and executes everything the same thread pool (Tokio Runtime), 
including
+//! any I/O, such as reading Parquet files from remote object storage
+//! (e.g. AWS S3), catalog access, and CPU intensive work. Running this diverse
+//! workload can lead to issues described in the [Architecture section] such as
+//! throttled network bandwidth (due to congestion control) and increased
+//! latencies or timeouts while processing network messages.
+//!
+//! [Architecture section]: 
https://docs.rs/datafusion/latest/datafusion/index.html#thread-scheduling-cpu--io-thread-pools-and-tokio-runtimes
+
+use arrow::util::pretty::pretty_format_batches;
+use datafusion::common::runtime::JoinSet;
+use datafusion::error::Result;
+use datafusion::execution::SendableRecordBatchStream;
+use datafusion::prelude::*;
+use futures::stream::StreamExt;
+use object_store::client::SpawnedReqwestConnector;
+use object_store::http::HttpBuilder;
+use std::sync::Arc;
+use tokio::runtime::Handle;
+use tokio::sync::Notify;
+use url::Url;
+
+/// Normally, you don't need to worry about the details of the tokio
+/// [`Runtime`], but for this example it is important to understand how the
+/// [`Runtime`]s work.
+///
+/// Each thread has "current" runtime that is installed in a thread local
+/// variable which is used by the `tokio::spawn` function.
+///
+/// The `#[tokio::main]` macro  creates a [`Runtime`] and installs it as
+/// as the "current" runtime in a thread local variable, on which any `async`
+/// [`Future`], [`Stream]`s and [`Task]`s are run.
+///
+/// This example uses the runtime created by [`tokio::main`] to do I/O and 
spawn
+/// CPU intensive tasks on a separate [`Runtime`], mirroring the common pattern
+/// when using Rust libraries such as `tonic`. Using a separate `Runtime` for
+/// CPU bound tasks will often be simpler in larger applications, even though 
it
+/// makes this example slightly more complex.
+#[tokio::main]
+async fn main() -> Result<()> {
+    // The first two examples read local files. Enabling the URL table feature
+    // lets us treat filenames as tables in SQL.
+    let ctx = SessionContext::new().enable_url_table();
+    let sql = format!(
+        "SELECT * FROM '{}/alltypes_plain.parquet'",
+        datafusion::test_util::parquet_test_data()
+    );
+
+    // Run a query on the current runtime. Calling `await` means the future
+    // (in this case the `async` function and all spawned work in DataFusion
+    // plans) on the current runtime.
+    same_runtime(&ctx, &sql).await?;
+
+    // Run the same query but this time on a different runtime.
+    //
+    // Since we call `await` here, the `async` function itself runs on the
+    // current runtime, but internally `different_runtime_basic` executes the
+    // DataFusion plan on a different Runtime.
+    different_runtime_basic(ctx, sql).await?;
+
+    // Run the same query on a different runtime, including remote IO.
+    //
+    // NOTE: This is best practice for production systems
+    different_runtime_advanced().await?;
+
+    Ok(())
+}
+
+/// Run queries directly on the current tokio `Runtime`
+///
+/// This is how most examples in DataFusion are written and works well for
+/// development, local query processing, and non latency sensitive workloads.
+async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> {
+    // Calling .sql is an async function as it may also do network
+    // I/O, for example to contact a remote catalog or do an object store LIST
+    let df = ctx.sql(sql).await?;
+
+    // While many examples call `collect` or `show()`, those methods buffers 
the
+    // results. Internally DataFusion generates output a RecordBatch at a time
+
+    // Calling `execute_stream` return a `SendableRecordBatchStream`. Depending
+    // on the plan, this may also do network I/O, for example to begin reading 
a
+    // parquet file from a remote object store.
+    let mut stream: SendableRecordBatchStream = df.execute_stream().await?;
+
+    // `next()` drives the plan, incrementally producing new `RecordBatch`es
+    // using the current runtime.
+    //
+    // Perhaps somewhat non obviously, calling `next()` can also result in 
other
+    // tasks being spawned on the current runtime (e.g. for `RepartitionExec` 
to
+    // read data from each of its input partitions in parallel).
+    //
+    // Executing the plan using this pattern intermixes any IO and CPU 
intensive
+    // work on same Runtime
+    while let Some(batch) = stream.next().await {
+        println!("{}", pretty_format_batches(&[batch?]).unwrap());
+    }
+    Ok(())
+}
+
+/// Run queries on a **different** Runtime dedicated for CPU bound work
+///
+/// This example is suitable for running DataFusion plans against local data
+/// sources (e.g. files) and returning results to an async destination, as 
might
+/// be done to return query results to a remote client.
+///
+/// Production systems which also read data locally or require very low latency
+/// should follow the recommendations on [`different_runtime_advanced`] when
+/// processing data from a remote source such as object storage.
+async fn different_runtime_basic(ctx: SessionContext, sql: String) -> 
Result<()> {
+    // Since we are already in the context of runtime (installed by
+    // #[tokio::main]), we need a new Runtime (threadpool) for CPU bound tasks
+    let cpu_runtime = CpuRuntime::try_new()?;
+
+    // Prepare a task that runs the plan on cpu_runtime and sends
+    // the results back to the original runtime via a channel.
+    let (tx, mut rx) = tokio::sync::mpsc::channel(2);
+    let driver_task = async move {
+        // Plan the query (which might require CPU work to evaluate statistics)
+        let df = ctx.sql(&sql).await?;
+        let mut stream: SendableRecordBatchStream = df.execute_stream().await?;
+
+        // Calling `next()` to drive the plan in this task drives the
+        // execution from the cpu runtime the other thread pool
+        //
+        // NOTE any IO run by this plan (for example, reading from an
+        // `ObjectStore`) will be done on this new thread pool as well.
+        while let Some(batch) = stream.next().await {
+            if tx.send(batch).await.is_err() {
+                // error means dropped receiver, so nothing will get results 
anymore
+                return Ok(());
+            }
+        }
+        Ok(()) as Result<()>
+    };
+
+    // Run the driver task on the cpu runtime. Use a JoinSet to
+    // ensure the spawned task is canceled on error/drop
+    let mut join_set = JoinSet::new();
+    join_set.spawn_on(driver_task, cpu_runtime.handle());
+
+    // Retrieve the results in the original (IO) runtime. This requires only
+    // minimal work (pass pointers around).
+    while let Some(batch) = rx.recv().await {
+        println!("{}", pretty_format_batches(&[batch?])?);
+    }
+
+    // wait for completion of the driver task
+    drain_join_set(join_set).await;
+
+    Ok(())
+}
+
+/// Run CPU intensive work on a different runtime but do IO operations (object
+/// store access) on the current runtime.
+async fn different_runtime_advanced() -> Result<()> {
+    // In this example, we will query a file via https, reading
+    // the data directly from the plan
+
+    // The current runtime (created by tokio::main) is used for IO
+    //
+    // Note this handle should be used for *ALL* remote IO operations in your
+    // systems, including remote catalog access, which is not included in this
+    // example.
+    let cpu_runtime = CpuRuntime::try_new()?;
+    let io_handle = Handle::current();
+
+    let ctx = SessionContext::new();
+
+    // By default, the HttpStore use the same runtime that calls `await` for IO
+    // operations. This means that if the DataFusion plan is called from the
+    // cpu_runtime,  the HttpStore IO operations will *also* run on the CPU
+    // runtime, which will error.
+    //
+    // To avoid this, we use a `SpawnedReqwestConnector` to configure the
+    // `ObjectStore` to run the HTTP requests on the IO runtime.
+    let base_url = Url::parse("https://github.com";).unwrap();
+    let http_store = HttpBuilder::new()
+        .with_url(base_url.clone())
+        // Use the io_runtime to run the HTTP requests. Without this line,
+        // you will see an error such as:
+        // A Tokio 1.x context was found, but IO is disabled.
+        .with_http_connector(SpawnedReqwestConnector::new(io_handle))
+        .build()?;
+
+    // Tell DataFusion to process `http://` urls with this wrapped object store
+    ctx.register_object_store(&base_url, Arc::new(http_store));
+
+    // As above, plan and execute the query on the cpu runtime.
+    let (tx, mut rx) = tokio::sync::mpsc::channel(2);
+    let driver_task = async move {
+        // Plan / execute the query
+        let url = 
"https://github.com/apache/arrow-testing/raw/master/data/csv/aggregate_test_100.csv";;
+        let df = ctx
+            .sql(&format!("SELECT c1,c2,c3 FROM '{url}' LIMIT 5"))
+            .await?;
+
+        let mut stream: SendableRecordBatchStream = df.execute_stream().await?;
+
+        // Note you can do other non trivial CPU work on the results of the
+        // stream before sending it back to the original runtime. For example,
+        // calling a FlightDataEncoder to convert the results to flight 
messages
+        // to send over the network
+
+        // send results, as above
+        while let Some(batch) = stream.next().await {
+            if tx.send(batch).await.is_err() {
+                return Ok(());
+            }
+        }
+        Ok(()) as Result<()>
+    };
+
+    let mut join_set = JoinSet::new();
+    join_set.spawn_on(driver_task, cpu_runtime.handle());
+    while let Some(batch) = rx.recv().await {
+        println!("{}", pretty_format_batches(&[batch?])?);
+    }
+
+    Ok(())
+}
+
+/// Waits for all tasks in the JoinSet to complete and reports any errors that
+/// occurred.
+///
+/// If we don't do this, any errors that occur in the task (such as IO errors)
+/// are not reported.
+async fn drain_join_set(mut join_set: JoinSet<Result<()>>) {
+    // retrieve any errors from the tasks
+    while let Some(result) = join_set.join_next().await {
+        match result {
+            Ok(Ok(())) => {}                             // task completed 
successfully
+            Ok(Err(e)) => eprintln!("Task failed: {e}"), // task failed
+            Err(e) => eprintln!("JoinSet error: {e}"),   // JoinSet error
+        }
+    }
+}
+
+/// Creates a Tokio [`Runtime`] for use with CPU bound tasks
+///
+/// Tokio forbids dropping `Runtime`s in async contexts, so creating a separate
+/// `Runtime` correctly is somewhat tricky. This structure manages the creation
+/// and shutdown of a separate thread.
+///
+/// # Notes
+/// On drop, the thread will wait for all remaining tasks to complete.
+///
+/// Depending on your application, more sophisticated shutdown logic may be
+/// required, such as ensuring that no new tasks are added to the runtime.
+///
+/// # Credits
+/// This code is derived from code originally written for [InfluxDB 3.0]
+///
+/// [InfluxDB 3.0]: 
https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor
+struct CpuRuntime {
+    /// Handle is the tokio structure for interacting with a Runtime.
+    handle: Handle,
+    /// Signal to start shutting down
+    notify_shutdown: Arc<Notify>,
+    /// When thread is active, is Some
+    thread_join_handle: Option<std::thread::JoinHandle<()>>,
+}
+
+impl Drop for CpuRuntime {
+    fn drop(&mut self) {
+        // Notify the thread to shutdown.
+        self.notify_shutdown.notify_one();
+        // In a production system you also need to ensure your code stops 
adding
+        // new tasks to the underlying runtime after this point to allow the
+        // thread to complete its work and exit cleanly.
+        if let Some(thread_join_handle) = self.thread_join_handle.take() {
+            // If the thread is still running, we wait for it to finish
+            print!("Shutting down CPU runtime thread...");
+            if let Err(e) = thread_join_handle.join() {
+                eprintln!("Error joining CPU runtime thread: {e:?}",);
+            } else {
+                println!("CPU runtime thread shutdown successfully.");
+            }
+        }
+    }
+}
+
+impl CpuRuntime {
+    /// Create a new Tokio Runtime for CPU bound tasks
+    pub fn try_new() -> Result<Self> {
+        let cpu_runtime = tokio::runtime::Builder::new_multi_thread()
+            .enable_time()
+            .build()?;
+        let handle = cpu_runtime.handle().clone();
+        let notify_shutdown = Arc::new(Notify::new());
+        let notify_shutdown_captured = Arc::clone(&notify_shutdown);
+
+        // The cpu_runtime runs and is dropped on a separate thread
+        let thread_join_handle = std::thread::spawn(move || {
+            cpu_runtime.block_on(async move {
+                notify_shutdown_captured.notified().await;
+            });
+            // Note: cpu_runtime is dropped here, which will wait for all tasks
+            // to complete
+        });
+
+        Ok(Self {
+            handle,
+            notify_shutdown,
+            thread_join_handle: Some(thread_join_handle),
+        })
+    }
+
+    /// Return a handle suitable for spawning CPU bound tasks
+    ///
+    /// # Notes
+    ///
+    /// If a task spawned on this handle attempts to do IO, it will error with 
a
+    /// message such as:
+    ///
+    /// ```text
+    ///A Tokio 1.x context was found, but IO is disabled.
+    /// ```
+    pub fn handle(&self) -> &Handle {
+        &self.handle
+    }
+}
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index 989799b1f8..3e3d80caaa 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -510,17 +510,20 @@
 //! initial development and processing local files, but it can lead to problems
 //! under load and/or when reading from network sources such as AWS S3.
 //!
+//! ### Optimizing Latency: Throttled CPU / IO under Highly Concurrent Load
+//!
 //! If your system does not fully utilize either the CPU or network bandwidth
 //! during execution, or you see significantly higher tail (e.g. p99) latencies
 //! responding to network requests, **it is likely you need to use a different
-//! [`Runtime`] for CPU intensive DataFusion plans**. This effect can be 
especially
-//! pronounced when running several queries concurrently.
+//! [`Runtime`] for DataFusion plans**. The [thread_pools example]
+//! has  an example of how to do so.
 //!
-//! As shown in the following figure, using the same [`Runtime`] for both CPU
-//! intensive processing and network requests can introduce significant
-//! delays in responding to those network requests. Delays in processing 
network
-//! requests can and does lead network flow control to throttle the available
-//! bandwidth in response.
+//! As shown below, using the same [`Runtime`] for both CPU intensive 
processing
+//! and network requests can introduce significant delays in responding to
+//! those network requests. Delays in processing network requests can and does
+//! lead network flow control to throttle the available bandwidth in response.
+//! This effect can be especially pronounced when running multiple queries
+//! concurrently.
 //!
 //! ```text
 //!                                                                          
Legend
@@ -602,6 +605,7 @@
 //!
 //! [Tokio]:  https://tokio.rs
 //! [`Runtime`]: tokio::runtime::Runtime
+//! [thread_pools example]: 
https://github.com/apache/datafusion/tree/main/datafusion-examples/examples/thread_pools.rs
 //! [`task`]: tokio::task
 //! [Using Rustlang’s Async Tokio Runtime for CPU-Bound Tasks]: 
https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
 //! [`RepartitionExec`]: physical_plan::repartition::RepartitionExec


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to