This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new fb01049d76 Example for using a separate threadpool for CPU bound work (try 3) (#16331) fb01049d76 is described below commit fb01049d76a47ccbf3c981db5499fd1baff9b1ff Author: Andrew Lamb <and...@nerdnetworks.org> AuthorDate: Mon Jun 23 06:03:40 2025 -0400 Example for using a separate threadpool for CPU bound work (try 3) (#16331) * Example for using a separate threadpool for CPU bound work (try 3) Pare back example * Update datafusion-examples/examples/thread_pools.rs Co-authored-by: Bruce Ritchie <bruce.ritc...@veeva.com> * Add a note about why the main Runtime is used for IO and not CPU * remove random thought * Update comments and simplify shutdown --------- Co-authored-by: Bruce Ritchie <bruce.ritc...@veeva.com> --- datafusion-examples/examples/thread_pools.rs | 350 +++++++++++++++++++++++++++ datafusion/core/src/lib.rs | 18 +- 2 files changed, 361 insertions(+), 7 deletions(-) diff --git a/datafusion-examples/examples/thread_pools.rs b/datafusion-examples/examples/thread_pools.rs new file mode 100644 index 0000000000..bba56b2932 --- /dev/null +++ b/datafusion-examples/examples/thread_pools.rs @@ -0,0 +1,350 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This example shows how to use separate thread pools (tokio [`Runtime`]))s to +//! run the IO and CPU intensive parts of DataFusion plans. +//! +//! # Background +//! +//! DataFusion, by default, plans and executes all operations (both CPU and IO) +//! on the same thread pool. This makes it fast and easy to get started, but +//! can cause issues when running at scale, especially when fetching and operating +//! on data directly from remote sources. +//! +//! Specifically, without configuration such as in this example, DataFusion +//! plans and executes everything the same thread pool (Tokio Runtime), including +//! any I/O, such as reading Parquet files from remote object storage +//! (e.g. AWS S3), catalog access, and CPU intensive work. Running this diverse +//! workload can lead to issues described in the [Architecture section] such as +//! throttled network bandwidth (due to congestion control) and increased +//! latencies or timeouts while processing network messages. +//! +//! [Architecture section]: https://docs.rs/datafusion/latest/datafusion/index.html#thread-scheduling-cpu--io-thread-pools-and-tokio-runtimes + +use arrow::util::pretty::pretty_format_batches; +use datafusion::common::runtime::JoinSet; +use datafusion::error::Result; +use datafusion::execution::SendableRecordBatchStream; +use datafusion::prelude::*; +use futures::stream::StreamExt; +use object_store::client::SpawnedReqwestConnector; +use object_store::http::HttpBuilder; +use std::sync::Arc; +use tokio::runtime::Handle; +use tokio::sync::Notify; +use url::Url; + +/// Normally, you don't need to worry about the details of the tokio +/// [`Runtime`], but for this example it is important to understand how the +/// [`Runtime`]s work. +/// +/// Each thread has "current" runtime that is installed in a thread local +/// variable which is used by the `tokio::spawn` function. +/// +/// The `#[tokio::main]` macro creates a [`Runtime`] and installs it as +/// as the "current" runtime in a thread local variable, on which any `async` +/// [`Future`], [`Stream]`s and [`Task]`s are run. +/// +/// This example uses the runtime created by [`tokio::main`] to do I/O and spawn +/// CPU intensive tasks on a separate [`Runtime`], mirroring the common pattern +/// when using Rust libraries such as `tonic`. Using a separate `Runtime` for +/// CPU bound tasks will often be simpler in larger applications, even though it +/// makes this example slightly more complex. +#[tokio::main] +async fn main() -> Result<()> { + // The first two examples read local files. Enabling the URL table feature + // lets us treat filenames as tables in SQL. + let ctx = SessionContext::new().enable_url_table(); + let sql = format!( + "SELECT * FROM '{}/alltypes_plain.parquet'", + datafusion::test_util::parquet_test_data() + ); + + // Run a query on the current runtime. Calling `await` means the future + // (in this case the `async` function and all spawned work in DataFusion + // plans) on the current runtime. + same_runtime(&ctx, &sql).await?; + + // Run the same query but this time on a different runtime. + // + // Since we call `await` here, the `async` function itself runs on the + // current runtime, but internally `different_runtime_basic` executes the + // DataFusion plan on a different Runtime. + different_runtime_basic(ctx, sql).await?; + + // Run the same query on a different runtime, including remote IO. + // + // NOTE: This is best practice for production systems + different_runtime_advanced().await?; + + Ok(()) +} + +/// Run queries directly on the current tokio `Runtime` +/// +/// This is how most examples in DataFusion are written and works well for +/// development, local query processing, and non latency sensitive workloads. +async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> { + // Calling .sql is an async function as it may also do network + // I/O, for example to contact a remote catalog or do an object store LIST + let df = ctx.sql(sql).await?; + + // While many examples call `collect` or `show()`, those methods buffers the + // results. Internally DataFusion generates output a RecordBatch at a time + + // Calling `execute_stream` return a `SendableRecordBatchStream`. Depending + // on the plan, this may also do network I/O, for example to begin reading a + // parquet file from a remote object store. + let mut stream: SendableRecordBatchStream = df.execute_stream().await?; + + // `next()` drives the plan, incrementally producing new `RecordBatch`es + // using the current runtime. + // + // Perhaps somewhat non obviously, calling `next()` can also result in other + // tasks being spawned on the current runtime (e.g. for `RepartitionExec` to + // read data from each of its input partitions in parallel). + // + // Executing the plan using this pattern intermixes any IO and CPU intensive + // work on same Runtime + while let Some(batch) = stream.next().await { + println!("{}", pretty_format_batches(&[batch?]).unwrap()); + } + Ok(()) +} + +/// Run queries on a **different** Runtime dedicated for CPU bound work +/// +/// This example is suitable for running DataFusion plans against local data +/// sources (e.g. files) and returning results to an async destination, as might +/// be done to return query results to a remote client. +/// +/// Production systems which also read data locally or require very low latency +/// should follow the recommendations on [`different_runtime_advanced`] when +/// processing data from a remote source such as object storage. +async fn different_runtime_basic(ctx: SessionContext, sql: String) -> Result<()> { + // Since we are already in the context of runtime (installed by + // #[tokio::main]), we need a new Runtime (threadpool) for CPU bound tasks + let cpu_runtime = CpuRuntime::try_new()?; + + // Prepare a task that runs the plan on cpu_runtime and sends + // the results back to the original runtime via a channel. + let (tx, mut rx) = tokio::sync::mpsc::channel(2); + let driver_task = async move { + // Plan the query (which might require CPU work to evaluate statistics) + let df = ctx.sql(&sql).await?; + let mut stream: SendableRecordBatchStream = df.execute_stream().await?; + + // Calling `next()` to drive the plan in this task drives the + // execution from the cpu runtime the other thread pool + // + // NOTE any IO run by this plan (for example, reading from an + // `ObjectStore`) will be done on this new thread pool as well. + while let Some(batch) = stream.next().await { + if tx.send(batch).await.is_err() { + // error means dropped receiver, so nothing will get results anymore + return Ok(()); + } + } + Ok(()) as Result<()> + }; + + // Run the driver task on the cpu runtime. Use a JoinSet to + // ensure the spawned task is canceled on error/drop + let mut join_set = JoinSet::new(); + join_set.spawn_on(driver_task, cpu_runtime.handle()); + + // Retrieve the results in the original (IO) runtime. This requires only + // minimal work (pass pointers around). + while let Some(batch) = rx.recv().await { + println!("{}", pretty_format_batches(&[batch?])?); + } + + // wait for completion of the driver task + drain_join_set(join_set).await; + + Ok(()) +} + +/// Run CPU intensive work on a different runtime but do IO operations (object +/// store access) on the current runtime. +async fn different_runtime_advanced() -> Result<()> { + // In this example, we will query a file via https, reading + // the data directly from the plan + + // The current runtime (created by tokio::main) is used for IO + // + // Note this handle should be used for *ALL* remote IO operations in your + // systems, including remote catalog access, which is not included in this + // example. + let cpu_runtime = CpuRuntime::try_new()?; + let io_handle = Handle::current(); + + let ctx = SessionContext::new(); + + // By default, the HttpStore use the same runtime that calls `await` for IO + // operations. This means that if the DataFusion plan is called from the + // cpu_runtime, the HttpStore IO operations will *also* run on the CPU + // runtime, which will error. + // + // To avoid this, we use a `SpawnedReqwestConnector` to configure the + // `ObjectStore` to run the HTTP requests on the IO runtime. + let base_url = Url::parse("https://github.com").unwrap(); + let http_store = HttpBuilder::new() + .with_url(base_url.clone()) + // Use the io_runtime to run the HTTP requests. Without this line, + // you will see an error such as: + // A Tokio 1.x context was found, but IO is disabled. + .with_http_connector(SpawnedReqwestConnector::new(io_handle)) + .build()?; + + // Tell DataFusion to process `http://` urls with this wrapped object store + ctx.register_object_store(&base_url, Arc::new(http_store)); + + // As above, plan and execute the query on the cpu runtime. + let (tx, mut rx) = tokio::sync::mpsc::channel(2); + let driver_task = async move { + // Plan / execute the query + let url = "https://github.com/apache/arrow-testing/raw/master/data/csv/aggregate_test_100.csv"; + let df = ctx + .sql(&format!("SELECT c1,c2,c3 FROM '{url}' LIMIT 5")) + .await?; + + let mut stream: SendableRecordBatchStream = df.execute_stream().await?; + + // Note you can do other non trivial CPU work on the results of the + // stream before sending it back to the original runtime. For example, + // calling a FlightDataEncoder to convert the results to flight messages + // to send over the network + + // send results, as above + while let Some(batch) = stream.next().await { + if tx.send(batch).await.is_err() { + return Ok(()); + } + } + Ok(()) as Result<()> + }; + + let mut join_set = JoinSet::new(); + join_set.spawn_on(driver_task, cpu_runtime.handle()); + while let Some(batch) = rx.recv().await { + println!("{}", pretty_format_batches(&[batch?])?); + } + + Ok(()) +} + +/// Waits for all tasks in the JoinSet to complete and reports any errors that +/// occurred. +/// +/// If we don't do this, any errors that occur in the task (such as IO errors) +/// are not reported. +async fn drain_join_set(mut join_set: JoinSet<Result<()>>) { + // retrieve any errors from the tasks + while let Some(result) = join_set.join_next().await { + match result { + Ok(Ok(())) => {} // task completed successfully + Ok(Err(e)) => eprintln!("Task failed: {e}"), // task failed + Err(e) => eprintln!("JoinSet error: {e}"), // JoinSet error + } + } +} + +/// Creates a Tokio [`Runtime`] for use with CPU bound tasks +/// +/// Tokio forbids dropping `Runtime`s in async contexts, so creating a separate +/// `Runtime` correctly is somewhat tricky. This structure manages the creation +/// and shutdown of a separate thread. +/// +/// # Notes +/// On drop, the thread will wait for all remaining tasks to complete. +/// +/// Depending on your application, more sophisticated shutdown logic may be +/// required, such as ensuring that no new tasks are added to the runtime. +/// +/// # Credits +/// This code is derived from code originally written for [InfluxDB 3.0] +/// +/// [InfluxDB 3.0]: https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor +struct CpuRuntime { + /// Handle is the tokio structure for interacting with a Runtime. + handle: Handle, + /// Signal to start shutting down + notify_shutdown: Arc<Notify>, + /// When thread is active, is Some + thread_join_handle: Option<std::thread::JoinHandle<()>>, +} + +impl Drop for CpuRuntime { + fn drop(&mut self) { + // Notify the thread to shutdown. + self.notify_shutdown.notify_one(); + // In a production system you also need to ensure your code stops adding + // new tasks to the underlying runtime after this point to allow the + // thread to complete its work and exit cleanly. + if let Some(thread_join_handle) = self.thread_join_handle.take() { + // If the thread is still running, we wait for it to finish + print!("Shutting down CPU runtime thread..."); + if let Err(e) = thread_join_handle.join() { + eprintln!("Error joining CPU runtime thread: {e:?}",); + } else { + println!("CPU runtime thread shutdown successfully."); + } + } + } +} + +impl CpuRuntime { + /// Create a new Tokio Runtime for CPU bound tasks + pub fn try_new() -> Result<Self> { + let cpu_runtime = tokio::runtime::Builder::new_multi_thread() + .enable_time() + .build()?; + let handle = cpu_runtime.handle().clone(); + let notify_shutdown = Arc::new(Notify::new()); + let notify_shutdown_captured = Arc::clone(¬ify_shutdown); + + // The cpu_runtime runs and is dropped on a separate thread + let thread_join_handle = std::thread::spawn(move || { + cpu_runtime.block_on(async move { + notify_shutdown_captured.notified().await; + }); + // Note: cpu_runtime is dropped here, which will wait for all tasks + // to complete + }); + + Ok(Self { + handle, + notify_shutdown, + thread_join_handle: Some(thread_join_handle), + }) + } + + /// Return a handle suitable for spawning CPU bound tasks + /// + /// # Notes + /// + /// If a task spawned on this handle attempts to do IO, it will error with a + /// message such as: + /// + /// ```text + ///A Tokio 1.x context was found, but IO is disabled. + /// ``` + pub fn handle(&self) -> &Handle { + &self.handle + } +} diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 989799b1f8..3e3d80caaa 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -510,17 +510,20 @@ //! initial development and processing local files, but it can lead to problems //! under load and/or when reading from network sources such as AWS S3. //! +//! ### Optimizing Latency: Throttled CPU / IO under Highly Concurrent Load +//! //! If your system does not fully utilize either the CPU or network bandwidth //! during execution, or you see significantly higher tail (e.g. p99) latencies //! responding to network requests, **it is likely you need to use a different -//! [`Runtime`] for CPU intensive DataFusion plans**. This effect can be especially -//! pronounced when running several queries concurrently. +//! [`Runtime`] for DataFusion plans**. The [thread_pools example] +//! has an example of how to do so. //! -//! As shown in the following figure, using the same [`Runtime`] for both CPU -//! intensive processing and network requests can introduce significant -//! delays in responding to those network requests. Delays in processing network -//! requests can and does lead network flow control to throttle the available -//! bandwidth in response. +//! As shown below, using the same [`Runtime`] for both CPU intensive processing +//! and network requests can introduce significant delays in responding to +//! those network requests. Delays in processing network requests can and does +//! lead network flow control to throttle the available bandwidth in response. +//! This effect can be especially pronounced when running multiple queries +//! concurrently. //! //! ```text //! Legend @@ -602,6 +605,7 @@ //! //! [Tokio]: https://tokio.rs //! [`Runtime`]: tokio::runtime::Runtime +//! [thread_pools example]: https://github.com/apache/datafusion/tree/main/datafusion-examples/examples/thread_pools.rs //! [`task`]: tokio::task //! [Using Rustlang’s Async Tokio Runtime for CPU-Bound Tasks]: https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/ //! [`RepartitionExec`]: physical_plan::repartition::RepartitionExec --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org