alamb commented on code in PR #14286:
URL: https://github.com/apache/datafusion/pull/14286#discussion_r2098291930


##########
datafusion-examples/examples/thread_pools_lib/dedicated_executor.rs:
##########
@@ -0,0 +1,1778 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [DedicatedExecutor] for running CPU-bound tasks on a separate tokio 
runtime.
+
+use crate::SendableRecordBatchStream;
+use async_trait::async_trait;
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion_common::error::GenericError;
+use datafusion_common::DataFusionError;
+use futures::stream::BoxStream;
+use futures::{
+    future::{BoxFuture, Shared},
+    Future, FutureExt, Stream, StreamExt, TryFutureExt,
+};
+use log::{info, warn};
+use object_store::path::Path;
+use object_store::{
+    GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, 
ObjectMeta,
+    ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, 
UploadPart,
+};
+use std::cell::RefCell;
+use std::pin::Pin;
+use std::sync::RwLock;
+use std::task::{Context, Poll};
+use std::{fmt::Display, sync::Arc, time::Duration};
+use tokio::runtime::Builder;
+use tokio::task::JoinHandle;
+use tokio::{
+    runtime::Handle,
+    sync::{oneshot::error::RecvError, Notify},
+    task::JoinSet,
+};
+use tokio_stream::wrappers::ReceiverStream;
+
+/// Create a [`DedicatedExecutorBuilder`] from a tokio [`Builder`]
+impl From<Builder> for DedicatedExecutorBuilder {
+    fn from(value: Builder) -> Self {
+        Self::new_from_builder(value)
+    }
+}
+
+/// Manages a separate tokio [`Runtime`] (thread pool) for executing CPU bound
+/// tasks such as DataFusion `ExecutionPlans`.
+///
+/// See [`DedicatedExecutorBuilder`] for creating a new instance.
+///
+/// A `DedicatedExecutor` can helps avoid issues when runnnig IO and CPU bound 
tasks on the
+/// same thread pool by running futures (and any `tasks` that are
+/// `tokio::task::spawned` by them) on a separate tokio [`Executor`].
+///
+/// `DedicatedExecutor`s can be `clone`ed and all clones share the same thread 
pool.
+///
+/// Since the primary use for a `DedicatedExecutor` is offloading CPU bound
+/// work, IO work can not be performed on tasks launched in the Executor.
+///
+/// To perform IO, see:
+/// - [`Self::spawn_io`]
+/// - [`Self::wrap_object_store`]
+///
+/// When [`DedicatedExecutorBuilder::build`] is called, a reference to the
+/// "current" tokio runtime will be stored and used, via 
[`register_io_runtime`] by all
+/// threads spawned by the executor. Any I/O done by threads in this
+/// [`DedicatedExecutor`] should use [`spawn_io`], which will run them on the
+/// I/O runtime.
+///
+/// # TODO examples
+///
+/// # Background
+///
+/// Tokio has the notion of the "current" runtime, which runs the current 
future
+/// and any tasks spawned by it. Typically, this is the runtime created by
+/// `tokio::main` and is used for the main application logic and I/O handling
+///
+/// For CPU bound work, such as DataFusion plan execution, it is important to
+/// run on a separate thread pool to avoid blocking the I/O handling for 
extended
+/// periods of time in order to avoid long poll latencies (which decreases the
+/// throughput of small requests under concurrent load).
+///
+/// # IO Scheduling
+///
+/// I/O, such as network calls, should not be performed on the runtime managed
+/// by [`DedicatedExecutor`]. As tokio is a cooperative scheduler, long-running
+/// CPU tasks will not be preempted and can therefore starve servicing of other
+/// tasks. This manifests in long poll-latencies, where a task is ready to run
+/// but isn't being scheduled to run. For CPU-bound work this isn't a problem 
as
+/// there is no external party waiting on a response, however, for I/O tasks,
+/// long poll latencies can prevent timely servicing of IO, which can have a
+/// significant detrimental effect.
+///
+/// # Details
+///
+/// The worker thread priority is set to low so that such tasks do
+/// not starve other more important tasks (such as answering health checks)
+///
+/// Follows the example from stack overflow and spawns a new
+/// thread to install a Tokio runtime "context"
+/// <https://stackoverflow.com/questions/62536566>
+///
+/// # Trouble Shooting:
+///
+/// ## "No IO runtime registered. Call 
`register_io_runtime`/`register_current_runtime_for_io` in current thread!
+///
+/// This means that IO was attempted on a tokio runtime that was not registered
+/// for IO. One solution is to run the task using [DedicatedExecutor::spawn].
+///
+/// ## "Cannot drop a runtime in a context where blocking is not allowed"`
+///
+/// If you try to use this structure from an async context you see something 
like
+/// thread 'test_builder_plan' panicked at 'Cannot
+/// drop a runtime in a context where blocking is not allowed it means  This
+/// happens when a runtime is dropped from within an asynchronous
+/// context.', .../tokio-1.4.0/src/runtime/blocking/shutdown.rs:51:21
+///
+/// # Notes
+/// This code is derived from code originally written for [InfluxDB 3.0]
+///
+/// [InfluxDB 3.0]: 
https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor
+#[derive(Clone, Debug)]
+pub struct DedicatedExecutor {
+    /// State for managing Tokio Runtime Handle for CPU tasks
+    state: Arc<RwLock<State>>,
+}
+
+impl DedicatedExecutor {
+    /// Create a new builder to crate a [`DedicatedExecutor`]
+    pub fn builder() -> DedicatedExecutorBuilder {
+        DedicatedExecutorBuilder::new()
+    }
+
+    /// Runs the specified [`Future`] (and any tasks it spawns) on the thread
+    /// pool managed by this `DedicatedExecutor`.
+    ///
+    /// See The struct documentation for more details
+    ///
+    /// The specified task is added to the tokio executor immediately and
+    /// compete for the thread pool's resources.
+    ///
+    /// # Behavior on `Drop`
+    ///
+    /// UNLIKE [`tokio::task::spawn`], the returned future is **cancelled** 
when
+    /// it is dropped. Thus, you need ensure the returned future lives until it
+    /// completes (call `await`) or you wish to cancel it.
+    pub fn spawn<T>(&self, task: T) -> impl Future<Output = Result<T::Output, 
JobError>>
+    where
+        T: Future + Send + 'static,
+        T::Output: Send + 'static,
+    {
+        let handle = {
+            let state = self.state.read().expect("lock not poisoned");
+            state.handle.clone()
+        };
+
+        let Some(handle) = handle else {
+            return futures::future::err(JobError::WorkerGone).boxed();
+        };
+
+        // use JoinSet implement "cancel on drop"
+        let mut join_set = JoinSet::new();
+        join_set.spawn_on(task, &handle);
+        async move {
+            join_set
+                .join_next()
+                .await
+                .expect("just spawned task")
+                .map_err(|e| match e.try_into_panic() {
+                    Ok(e) => {
+                        let s = if let Some(s) = e.downcast_ref::<String>() {
+                            s.clone()
+                        } else if let Some(s) = e.downcast_ref::<&str>() {
+                            s.to_string()
+                        } else {
+                            "unknown internal error".to_string()
+                        };
+
+                        JobError::Panic { msg: s }
+                    }
+                    Err(_) => JobError::WorkerGone,
+                })
+        }
+        .boxed()
+    }
+
+    /// signals shutdown of this executor and any Clones
+    pub fn shutdown(&self) {
+        // hang up the channel which will cause the dedicated thread
+        // to quit
+        let mut state = self.state.write().expect("lock not poisoned");
+        state.handle = None;
+        state.start_shutdown.notify_one();
+    }
+
+    /// Stops all subsequent task executions, and waits for the worker
+    /// thread to complete. Note this will shutdown all clones of this
+    /// `DedicatedExecutor` as well.
+    ///
+    /// Only the first all to `join` will actually wait for the
+    /// executing thread to complete. All other calls to join will
+    /// complete immediately.
+    ///
+    /// # Panic / Drop
+    /// [`DedicatedExecutor`] implements shutdown on [`Drop`]. You should just 
use this behavior and NOT call
+    /// [`join`](Self::join) manually during [`Drop`] or panics because this 
might lead to another panic, see
+    /// <https://github.com/rust-lang/futures-rs/issues/2575>.
+    pub async fn join(&self) {
+        self.shutdown();
+
+        // get handle mutex is held
+        let handle = {
+            let state = self.state.read().expect("lock not poisoned");
+            state.completed_shutdown.clone()
+        };
+
+        // wait for completion while not holding the mutex to avoid
+        // deadlocks
+        handle.await.expect("Thread died?")
+    }
+
+    /// Returns an [`ObjectStore`] instance that will always perform I/O work 
on the
+    /// IO_RUNTIME.
+    ///
+    /// Note that this object store will only work correctly if run on this
+    /// dedicated executor. If you try and use it on another executor, it will
+    /// panic with "no IO runtime registered" type error.
+    pub fn wrap_object_store_for_io(
+        &self,
+        object_store: Arc<dyn ObjectStore>,
+    ) -> Arc<IoObjectStore> {
+        Arc::new(IoObjectStore::new(self.clone(), object_store))
+    }
+
+    /// Runs the [`SendableRecordBatchStream`] on the CPU thread pool
+    ///
+    /// This is a convenience method around [`Self::run_cpu_stream`]
+    pub fn run_cpu_sendable_record_batch_stream(
+        &self,
+        stream: SendableRecordBatchStream,
+    ) -> SendableRecordBatchStream {
+        let schema = stream.schema();
+        let stream = self.run_cpu_stream(stream, |job_error| {
+            let job_error: GenericError = Box::new(job_error);
+            DataFusionError::from(job_error)
+                .context("Running RecordBatchStream on DedicatedExecutor")
+        });
+
+        Box::pin(RecordBatchStreamAdapter::new(schema, stream))
+    }
+
+    /// Runs a stream on the CPU thread pool
+    ///
+    /// # Note
+    /// Ths stream must produce Results so that any errors on the dedicated
+    /// executor (like a panic or shutdown) can be communicated back.
+    ///
+    /// # Arguments:
+    /// - stream: the stream to run on this dedicated executor
+    /// - converter: a function that converts a [`JobError`] to the error type 
of the stream
+    pub fn run_cpu_stream<X, E, S, C>(
+        &self,
+        stream: S,
+        converter: C,
+    ) -> impl Stream<Item = Result<X, E>> + Send + 'static
+    where
+        X: Send + 'static,
+        E: Send + 'static,
+        S: Stream<Item = Result<X, E>> + Send + 'static,
+        C: Fn(JobError) -> E + Send + 'static,
+    {
+        let (tx, rx) = tokio::sync::mpsc::channel(1);
+
+        // make a copy to send any job error results back
+        let error_tx = tx.clone();
+
+        // This task will run on the CPU runtime
+        let task = self.spawn(async move {
+            // drive the stream forward on the CPU runtime, sending results
+            // back to the original (presumably IO) runtime
+            let mut stream = Box::pin(stream);
+            while let Some(result) = stream.next().await {
+                // try to send to the sender, if error means the
+                // receiver has been closed and we terminate early
+                if tx.send(result).await.is_err() {
+                    return;
+                }
+            }
+        });
+
+        // fire up a task on the current runtime which transfers results back
+        // from the CPU runtime to the calling runtime
+        let mut set = JoinSet::new();
+        set.spawn(async move {
+            if let Err(e) = task.await {
+                // error running task, try and report it back. An error sending
+                // means the receiver was dropped so there is nowhere to
+                // report errors. Thus ignored via ok()
+                error_tx.send(Err(converter(e))).await.ok();
+            }
+        });
+
+        StreamAndTask {
+            inner: ReceiverStream::new(rx),
+            set,
+        }
+    }
+
+    /// Runs a stream on the IO thread pool
+    ///
+    /// Ths stream must produce Results so that any errors on the dedicated
+    /// executor (like a panic or shutdown) can be communicated back.
+    ///
+    /// Note this has a slightly different API compared to
+    /// [`Self::run_cpu_stream`] because the DedicatedExecutor  doesn't monitor
+    /// the CPU thread pool (that came elsewhere)
+    ///
+    /// # Arguments:
+    /// - stream: the stream to run on this dedicated executor
+    pub fn run_io_stream<X, E, S>(
+        &self,
+        stream: S,
+    ) -> impl Stream<Item = Result<X, E>> + Send + 'static
+    where
+        X: Send + 'static,
+        E: Send + 'static,
+        S: Stream<Item = Result<X, E>> + Send + 'static,
+    {
+        let (tx, rx) = tokio::sync::mpsc::channel(1);
+
+        let mut set = JoinSet::new();
+        set.spawn(Self::spawn_io_static(async move {
+            // drive the stream forward on the IO runtime, sending results
+            // back to the original runtime
+            let mut stream = Box::pin(stream);
+            while let Some(result) = stream.next().await {
+                // try to send to the sender, if error means the
+                // receiver has been closed and we terminate early
+                if tx.send(result).await.is_err() {
+                    return;
+                }
+            }
+        }));
+
+        StreamAndTask {
+            inner: ReceiverStream::new(rx),
+            set,
+        }
+    }
+
+    /// Registers `handle` as the IO runtime for this thread
+    ///
+    /// Users should not need to call this function as it is handled by
+    /// [`DedicatedExecutorBuilder`]
+    ///
+    /// # Notes
+    ///
+    /// This sets a thread-local variable
+    ///
+    /// See [`spawn_io`](Self::spawn_io) for more details
+    pub fn register_io_runtime(handle: Option<Handle>) {
+        IO_RUNTIME.set(handle)
+    }
+
+    /// Runs `fut` on IO runtime of this DedicatedExecutor
+    pub async fn spawn_io<Fut>(&self, fut: Fut) -> Fut::Output
+    where
+        Fut: Future + Send + 'static,
+        Fut::Output: Send,
+    {
+        Self::spawn_io_static(fut).await
+    }
+
+    /// Runs `fut` on the runtime most recently registered by *any* 
DedicatedExecutor.
+    ///
+    /// When possible, it is preferred to use [`Self::spawn_io`]
+    ///
+    /// This functon is provided, similarly to tokio's [`Handle::current()`] to
+    /// avoid having to thread a `DedicatedExecutor` throughout your program.
+    ///
+    /// # Panic
+    /// Needs a IO runtime [registered](register_io_runtime).
+    pub async fn spawn_io_static<Fut>(fut: Fut) -> Fut::Output
+    where
+        Fut: Future + Send + 'static,
+        Fut::Output: Send,
+    {
+        let h = IO_RUNTIME.with_borrow(|h| h.clone()).expect(
+            "No IO runtime registered. If you hit this panic, it likely \
+            means a DataFusion plan or other CPU bound work is running on the \
+            a tokio threadpool used for IO. Try spawning the work using \
+            `DedicatedExecutor::spawn` or for tests 
`DedicatedExecutor::register_current_runtime_for_io`",
+        );
+        DropGuard(h.spawn(fut)).await
+    }
+}
+
+/// A wrapper around a receiver stream and task that ensures the inner
+/// task is cancelled on drop
+struct StreamAndTask<T> {
+    inner: ReceiverStream<T>,
+    /// Task which produces no output. On drop the outstanding task is 
cancelled
+    #[expect(dead_code)]
+    set: JoinSet<()>,
+}
+
+impl<T> Stream for StreamAndTask<T> {
+    type Item = T;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        self.inner.poll_next_unpin(cx)
+    }
+}
+
+thread_local! {
+    /// Tokio runtime `Handle` for doing network (I/O) operations, see 
[`spawn_io`]
+    pub static IO_RUNTIME: RefCell<Option<Handle>> = const { 
RefCell::new(None) };

Review Comment:
   The reason it is a thread local / static is that the original location of 
this code (influxdb_iox`) has several IO calls where we didn't have access to 
the `DedicatedExecutor` and we needed to avoid plumbing a reference down all 
the way
   
   If you can do it with an inner reference I think that is much better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to