alamb commented on a change in pull request #1883: URL: https://github.com/apache/arrow-datafusion/pull/1883#discussion_r815975802
########## File path: ballista/rust/executor/src/executor_server.rs ########## @@ -261,6 +262,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskRunnerPool<T, let executor_server = self.executor_server.clone(); tokio::spawn(async move { info!("Starting the task runner pool"); + //TODO make it configurable Review comment: ```suggestion // Use a dedicated executor for CPU bound tasks so that the main tokio // executor can still answer requests even when under load //TODO make it configurable ``` ########## File path: ballista/rust/executor/src/cpu_bound_executor.rs ########## @@ -0,0 +1,376 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//Inspire by https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/ + +//! This module contains a dedicated thread pool for running "cpu +//! intensive" workloads as query plans + +use log::warn; +use parking_lot::Mutex; +use std::{pin::Pin, sync::Arc}; +use tokio::sync::oneshot::Receiver; + +use futures::Future; + +/// The type of thing that the dedicated executor runs +type Task = Pin<Box<dyn Future<Output = ()> + Send>>; + +/// Runs futures (and any `tasks` that are `tokio::task::spawned` by +/// them) on a separate tokio runtime, like separate CPU-bound (execute a datafusion plan) tasks +/// from IO-bound tasks(heartbeats). Get more from the above blog. +#[derive(Clone)] +pub struct DedicatedExecutor { + state: Arc<Mutex<State>>, +} + +/// Runs futures (and any `tasks` that are `tokio::task::spawned` by +/// them) on a separate tokio Executor +struct State { + /// The number of threads in this pool + num_threads: usize, + + /// The name of the threads for this executor + thread_name: String, + + /// Channel for requests -- the dedicated executor takes requests + /// from here and runs them. + requests: Option<std::sync::mpsc::Sender<Task>>, + + /// The thread that is doing the work + thread: Option<std::thread::JoinHandle<()>>, +} + +/// The default worker priority (value passed to `libc::setpriority`); +const WORKER_PRIORITY: i32 = 10; + +impl std::fmt::Debug for DedicatedExecutor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let state = self.state.lock(); + + let mut d = f.debug_struct("DedicatedExecutor"); + + d.field("num_threads", &state.num_threads) + .field("thread_name", &state.thread_name); + + if state.requests.is_some() { + d.field("requests", &"Some(...)") + } else { + d.field("requests", &"None") + }; + + if state.thread.is_some() { + d.field("thread", &"Some(...)") + } else { + d.field("thread", &"None") + }; + + d.finish() + } +} + +impl DedicatedExecutor { + /// https://stackoverflow.com/questions/62536566 + /// Creates a new `DedicatedExecutor` with a dedicated tokio + /// runtime that is separate from the `[tokio::main]` threadpool. + /// + /// The worker thread priority is set to low so that such tasks do + /// not starve other more important tasks (such as answering health checks) + /// + pub fn new(thread_name: impl Into<String>, num_threads: usize) -> Self { + let thread_name = thread_name.into(); + let name_copy = thread_name.to_string(); + + let (tx, rx) = std::sync::mpsc::channel(); + + //Cannot create a seperated tokio runtime in another tokio runtime, + //So use std::thread to spawn a thread + let thread = std::thread::spawn(move || { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name(&name_copy) + .worker_threads(num_threads) + .on_thread_start(move || set_current_thread_priority(WORKER_PRIORITY)) + .build() + .expect("Creating tokio runtime"); + + // By entering the context, all calls to `tokio::spawn` go + // to this executor + let _guard = runtime.enter(); + + while let Ok(request) = rx.recv() { + // TODO feedback request status + tokio::task::spawn(request); + } + }); + + let state = State { + num_threads, + thread_name, + requests: Some(tx), + thread: Some(thread), + }; + + Self { + state: Arc::new(Mutex::new(state)), + } + } + + /// Runs the specified Future (and any tasks it spawns) on the + /// `DedicatedExecutor`. + /// + /// Currently all tasks are added to the tokio executor + /// immediately and compete for the threadpool's resources. + pub fn spawn<T>(&self, task: T) -> Receiver<T::Output> + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + let (tx, rx) = tokio::sync::oneshot::channel(); + + // create a execution plan to spawn + let job = Box::pin(async move { + let task_output = task.await; + if tx.send(task_output).is_err() { + warn!("Spawned task output ignored: receiver dropped"); + } + }); + + let mut state = self.state.lock(); + + if let Some(requests) = &mut state.requests { + // would fail if someone has started shutdown + requests.send(job).ok(); + } else { + warn!("tried to schedule task on an executor that was shutdown"); + } + + rx + } + + /// signals shutdown of this executor and any Clones + #[allow(dead_code)] Review comment: I wonder if this `#allow` is really necessary? ########## File path: ballista/rust/executor/src/cpu_bound_executor.rs ########## @@ -0,0 +1,376 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//Inspire by https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/ + +//! This module contains a dedicated thread pool for running "cpu +//! intensive" workloads as query plans + +use log::warn; +use parking_lot::Mutex; +use std::{pin::Pin, sync::Arc}; +use tokio::sync::oneshot::Receiver; + +use futures::Future; + +/// The type of thing that the dedicated executor runs +type Task = Pin<Box<dyn Future<Output = ()> + Send>>; + +/// Runs futures (and any `tasks` that are `tokio::task::spawned` by +/// them) on a separate tokio runtime, like separate CPU-bound (execute a datafusion plan) tasks +/// from IO-bound tasks(heartbeats). Get more from the above blog. +#[derive(Clone)] +pub struct DedicatedExecutor { + state: Arc<Mutex<State>>, +} + +/// Runs futures (and any `tasks` that are `tokio::task::spawned` by +/// them) on a separate tokio Executor +struct State { + /// The number of threads in this pool + num_threads: usize, + + /// The name of the threads for this executor + thread_name: String, + + /// Channel for requests -- the dedicated executor takes requests + /// from here and runs them. + requests: Option<std::sync::mpsc::Sender<Task>>, + + /// The thread that is doing the work + thread: Option<std::thread::JoinHandle<()>>, +} + +/// The default worker priority (value passed to `libc::setpriority`); +const WORKER_PRIORITY: i32 = 10; + +impl std::fmt::Debug for DedicatedExecutor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let state = self.state.lock(); + + let mut d = f.debug_struct("DedicatedExecutor"); + + d.field("num_threads", &state.num_threads) + .field("thread_name", &state.thread_name); + + if state.requests.is_some() { + d.field("requests", &"Some(...)") + } else { + d.field("requests", &"None") + }; + + if state.thread.is_some() { + d.field("thread", &"Some(...)") + } else { + d.field("thread", &"None") + }; + + d.finish() + } +} + +impl DedicatedExecutor { + /// https://stackoverflow.com/questions/62536566 + /// Creates a new `DedicatedExecutor` with a dedicated tokio + /// runtime that is separate from the `[tokio::main]` threadpool. + /// + /// The worker thread priority is set to low so that such tasks do + /// not starve other more important tasks (such as answering health checks) + /// + pub fn new(thread_name: impl Into<String>, num_threads: usize) -> Self { Review comment: FWIW I think @Darksonn and others have noted that a tokio `Handle` might also be able to be used here https://docs.rs/tokio/1.17.0/tokio/runtime/struct.Handle.html ########## File path: ballista/rust/executor/src/executor_server.rs ########## @@ -261,6 +262,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskRunnerPool<T, let executor_server = self.executor_server.clone(); tokio::spawn(async move { info!("Starting the task runner pool"); + //TODO make it configurable Review comment: maybe worth a ticket to track -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org