yjshen commented on code in PR #2226: URL: https://github.com/apache/arrow-datafusion/pull/2226#discussion_r863799676
########## datafusion/core/src/scheduler/mod.rs: ########## @@ -0,0 +1,454 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A [`Scheduler`] maintains a pool of dedicated worker threads on which +//! query execution can be scheduled. This is based on the idea of [Morsel-Driven Parallelism] +//! and is designed to decouple the execution parallelism from the parallelism expressed in +//! the physical plan as partitions. +//! +//! # Implementation +//! +//! When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it up into smaller +//! chunks called pipelines. Each pipeline may consist of one or more nodes from the +//! [`ExecutionPlan`] tree. +//! +//! The scheduler then maintains a list of pending [`Task`], that identify a partition within +//! a particular pipeline that may be able to make progress on some "morsel" of data. These +//! [`Task`] are then scheduled on the worker pool, with a preference for scheduling work +//! on a given "morsel" on the same thread that produced it. +//! +//! # Rayon +//! +//! Under-the-hood these [`Task`] are scheduled by [rayon], which is a lightweight, work-stealing +//! scheduler optimised for CPU-bound workloads. Pipelines may exploit this fact, and use [rayon]'s +//! structured concurrency primitives to express additional parallelism that may be exploited +//! if there are idle threads available at runtime +//! +//! # Shutdown +//! +//! Queries scheduled on a [`Scheduler`] will run to completion even if the +//! [`Scheduler`] is dropped +//! +//! [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf +//! [rayon]: https://docs.rs/rayon/latest/rayon/ +//! +//! # Example +//! +//! ```rust +//! # use futures::TryStreamExt; +//! # use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext}; +//! # use datafusion_scheduler::Scheduler; +//! +//! # #[tokio::main] +//! # async fn main() { +//! let scheduler = Scheduler::new(4); +//! let config = SessionConfig::new().with_target_partitions(4); +//! let context = SessionContext::with_config(config); +//! +//! context.register_csv("example", "../core/tests/example.csv", CsvReadOptions::new()).await.unwrap(); +//! let plan = context.sql("SELECT MIN(b) FROM example") +//! .await +//! .unwrap() +//! .create_physical_plan() +//! .await +//! .unwrap(); +//! +//! let task = context.task_ctx(); Review Comment: The `task` is slightly misleading from the pipeline's `Task`. ########## datafusion/core/src/scheduler/mod.rs: ########## @@ -0,0 +1,454 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A [`Scheduler`] maintains a pool of dedicated worker threads on which +//! query execution can be scheduled. This is based on the idea of [Morsel-Driven Parallelism] +//! and is designed to decouple the execution parallelism from the parallelism expressed in +//! the physical plan as partitions. +//! +//! # Implementation +//! +//! When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it up into smaller +//! chunks called pipelines. Each pipeline may consist of one or more nodes from the +//! [`ExecutionPlan`] tree. +//! +//! The scheduler then maintains a list of pending [`Task`], that identify a partition within +//! a particular pipeline that may be able to make progress on some "morsel" of data. These +//! [`Task`] are then scheduled on the worker pool, with a preference for scheduling work +//! on a given "morsel" on the same thread that produced it. +//! +//! # Rayon +//! +//! Under-the-hood these [`Task`] are scheduled by [rayon], which is a lightweight, work-stealing +//! scheduler optimised for CPU-bound workloads. Pipelines may exploit this fact, and use [rayon]'s +//! structured concurrency primitives to express additional parallelism that may be exploited +//! if there are idle threads available at runtime +//! +//! # Shutdown +//! +//! Queries scheduled on a [`Scheduler`] will run to completion even if the +//! [`Scheduler`] is dropped +//! +//! [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf +//! [rayon]: https://docs.rs/rayon/latest/rayon/ +//! +//! # Example +//! +//! ```rust +//! # use futures::TryStreamExt; +//! # use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext}; +//! # use datafusion_scheduler::Scheduler; +//! +//! # #[tokio::main] +//! # async fn main() { +//! let scheduler = Scheduler::new(4); +//! let config = SessionConfig::new().with_target_partitions(4); +//! let context = SessionContext::with_config(config); +//! +//! context.register_csv("example", "../core/tests/example.csv", CsvReadOptions::new()).await.unwrap(); +//! let plan = context.sql("SELECT MIN(b) FROM example") +//! .await +//! .unwrap() +//! .create_physical_plan() +//! .await +//! .unwrap(); +//! +//! let task = context.task_ctx(); +//! let stream = scheduler.schedule(plan, task).unwrap(); +//! let scheduled: Vec<_> = stream.try_collect().await.unwrap(); +//! # } +//! ``` +//! + +use std::sync::Arc; + +use futures::stream::BoxStream; +use log::{debug, error}; + +use crate::error::Result; +use crate::execution::context::TaskContext; +use crate::physical_plan::ExecutionPlan; + +use plan::{PipelinePlan, PipelinePlanner, RoutablePipeline}; +use task::{spawn_plan, Task}; + +use rayon::{ThreadPool, ThreadPoolBuilder}; + +pub use task::ExecutionResults; + +mod pipeline; +mod plan; +mod task; + +/// Builder for a [`Scheduler`] +#[derive(Debug)] +pub struct SchedulerBuilder { + inner: ThreadPoolBuilder, +} + +impl SchedulerBuilder { + /// Create a new [`SchedulerConfig`] with the provided number of threads + pub fn new(num_threads: usize) -> Self { + let builder = ThreadPoolBuilder::new() + .num_threads(num_threads) + .panic_handler(|p| error!("{}", format_worker_panic(p))) + .thread_name(|idx| format!("df-worker-{}", idx)); + + Self { inner: builder } + } + + /// Registers a custom panic handler + #[cfg(test)] + fn panic_handler<H>(self, panic_handler: H) -> Self + where + H: Fn(Box<dyn std::any::Any + Send>) + Send + Sync + 'static, + { + Self { + inner: self.inner.panic_handler(panic_handler), + } + } + + /// Build a new [`Scheduler`] + fn build(self) -> Scheduler { + Scheduler { + pool: Arc::new(self.inner.build().unwrap()), + } + } +} + +/// A [`Scheduler`] that can be used to schedule [`ExecutionPlan`] on a dedicated thread pool +pub struct Scheduler { + pool: Arc<ThreadPool>, +} + +impl Scheduler { + /// Create a new [`Scheduler`] with `num_threads` new threads in a dedicated thread pool + pub fn new(num_threads: usize) -> Self { + SchedulerBuilder::new(num_threads).build() + } + + /// Schedule the provided [`ExecutionPlan`] on this [`Scheduler`]. + /// + /// Returns a [`ExecutionResults`] that can be used to receive results as they are produced, + /// as a [`futures::Stream`] of [`RecordBatch`] + pub fn schedule( + &self, + plan: Arc<dyn ExecutionPlan>, + context: Arc<TaskContext>, + ) -> Result<ExecutionResults> { + let plan = PipelinePlanner::new(plan, context).build()?; + Ok(self.schedule_plan(plan)) + } + + /// Schedule the provided [`PipelinePlan`] on this [`Scheduler`]. + pub(crate) fn schedule_plan(&self, plan: PipelinePlan) -> ExecutionResults { + spawn_plan(plan, self.spawner()) + } + + fn spawner(&self) -> Spawner { + Spawner { + pool: self.pool.clone(), + } + } +} + +/// Formats a panic message for a worker +fn format_worker_panic(panic: Box<dyn std::any::Any + Send>) -> String { + let maybe_idx = rayon::current_thread_index(); + let worker: &dyn std::fmt::Display = match &maybe_idx { + Some(idx) => idx, + None => &"UNKNOWN", + }; + + let message = if let Some(msg) = panic.downcast_ref::<&str>() { + *msg + } else if let Some(msg) = panic.downcast_ref::<String>() { + msg.as_str() + } else { + "UNKNOWN" + }; + + format!("worker {} panicked with: {}", worker, message) +} + +/// Returns `true` if the current thread is a rayon worker thread +/// +/// Note: if there are multiple rayon pools, this will return `true` if the current thread +/// belongs to ANY rayon pool, even if this isn't a worker thread of a [`Scheduler`] instance +fn is_worker() -> bool { + rayon::current_thread_index().is_some() +} + +/// Spawn a [`Task`] onto the local workers thread pool +/// +/// There is no guaranteed order of execution, as workers may steal at any time. However, +/// `spawn_local` will append to the front of the current worker's queue, workers pop tasks from +/// the front of their queue, and steal tasks from the back of other workers queues +/// +/// The effect is that tasks spawned using `spawn_local` will typically be prioritised in +/// a LIFO order, however, this should not be relied upon +fn spawn_local(task: Task) { + // Verify is a worker thread to avoid creating a global pool + assert!(is_worker(), "must be called from a worker"); + rayon::spawn(|| task.do_work()) +} + +/// Spawn a [`Task`] onto the local workers thread pool with fifo ordering +/// +/// There is no guaranteed order of execution, as workers may steal at any time. However, +/// `spawn_local_fifo` will append to the back of the current worker's queue, workers pop tasks +/// from the front of their queue, and steal tasks from the back of other workers queues +/// +/// The effect is that tasks spawned using `spawn_local_fifo` will typically be prioritised +/// in a FIFO order, however, this should not be relied upon +fn spawn_local_fifo(task: Task) { + // Verify is a worker thread to avoid creating a global pool + assert!(is_worker(), "must be called from a worker"); + rayon::spawn_fifo(|| task.do_work()) +} + +#[derive(Debug, Clone)] +pub struct Spawner { Review Comment: Doc for the `Spawner`? We could check the clippy by ``` cargo clippy --all-targets --features=scheduler --workspace -- -D warnings ``` ########## datafusion/core/src/scheduler/mod.rs: ########## @@ -0,0 +1,454 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A [`Scheduler`] maintains a pool of dedicated worker threads on which +//! query execution can be scheduled. This is based on the idea of [Morsel-Driven Parallelism] +//! and is designed to decouple the execution parallelism from the parallelism expressed in +//! the physical plan as partitions. +//! +//! # Implementation +//! +//! When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it up into smaller +//! chunks called pipelines. Each pipeline may consist of one or more nodes from the +//! [`ExecutionPlan`] tree. +//! +//! The scheduler then maintains a list of pending [`Task`], that identify a partition within +//! a particular pipeline that may be able to make progress on some "morsel" of data. These +//! [`Task`] are then scheduled on the worker pool, with a preference for scheduling work +//! on a given "morsel" on the same thread that produced it. +//! +//! # Rayon +//! +//! Under-the-hood these [`Task`] are scheduled by [rayon], which is a lightweight, work-stealing +//! scheduler optimised for CPU-bound workloads. Pipelines may exploit this fact, and use [rayon]'s +//! structured concurrency primitives to express additional parallelism that may be exploited +//! if there are idle threads available at runtime +//! +//! # Shutdown +//! +//! Queries scheduled on a [`Scheduler`] will run to completion even if the +//! [`Scheduler`] is dropped +//! +//! [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf +//! [rayon]: https://docs.rs/rayon/latest/rayon/ +//! +//! # Example +//! +//! ```rust +//! # use futures::TryStreamExt; +//! # use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext}; +//! # use datafusion_scheduler::Scheduler; +//! +//! # #[tokio::main] +//! # async fn main() { +//! let scheduler = Scheduler::new(4); +//! let config = SessionConfig::new().with_target_partitions(4); +//! let context = SessionContext::with_config(config); +//! +//! context.register_csv("example", "../core/tests/example.csv", CsvReadOptions::new()).await.unwrap(); +//! let plan = context.sql("SELECT MIN(b) FROM example") +//! .await +//! .unwrap() +//! .create_physical_plan() +//! .await +//! .unwrap(); Review Comment: nit: the above four lines are 1 space less of indent. ########## datafusion/core/src/scheduler/pipeline/execution.rs: ########## @@ -0,0 +1,330 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::collections::VecDeque; +use std::fmt::Formatter; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll, Waker}; + +use arrow::error::ArrowError; +use async_trait::async_trait; +use futures::{Stream, StreamExt, TryStreamExt}; +use parking_lot::Mutex; + +use crate::arrow::datatypes::SchemaRef; +use crate::arrow::{error::Result as ArrowResult, record_batch::RecordBatch}; +use crate::error::Result; +use crate::execution::context::TaskContext; +use crate::physical_plan::expressions::PhysicalSortExpr; +use crate::physical_plan::metrics::MetricsSet; +use crate::physical_plan::{ + displayable, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + RecordBatchStream, SendableRecordBatchStream, Statistics, +}; + +use crate::scheduler::pipeline::Pipeline; +use crate::scheduler::BoxStream; + +/// An [`ExecutionPipeline`] wraps a portion of an [`ExecutionPlan`] and +/// converts it to the push-based [`Pipeline`] interface +/// +/// Internally [`ExecutionPipeline`] is still pull-based which limits its parallelism +/// to that of its output partitioning, however, it provides full compatibility with +/// [`ExecutionPlan`] allowing full interoperability with the existing ecosystem +/// +/// Longer term we will likely want to introduce new traits that differentiate between +/// pipeline-able operators like filters, and pipeline-breakers like aggregations, and +/// are better aligned with a push-based execution model. Review Comment: 👍 ########## datafusion/core/src/scheduler/pipeline/mod.rs: ########## @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::task::{Context, Poll}; + +use arrow::record_batch::RecordBatch; + +use crate::error::Result; + +pub mod execution; +pub mod repartition; + +/// A push-based interface used by the scheduler to drive query execution +/// +/// A pipeline processes data from one or more input partitions, producing output +/// to one or more output partitions. As a [`Pipeline`] may drawn on input from +/// more than one upstream [`Pipeline`], input partitions are identified by both +/// a child index, and a partition index, whereas output partitions are only +/// identified by a partition index. +/// +/// This is not intended as an eventual replacement for the physical plan representation +/// within DataFusion, [`ExecutionPlan`], but rather a generic interface that +/// parts of the physical plan are "compiled" into by the scheduler. +/// +/// # Eager vs Lazy Execution +/// +/// Whether computation is eagerly done on push, or lazily done on pull, is +/// intentionally left as an implementation detail of the [`Pipeline`] +/// +/// This allows flexibility to support the following different patterns, and potentially more: +/// +/// An eager, push-based pipeline, that processes a batch synchronously in [`Pipeline::push`] +/// and immediately wakes the corresponding output partition. +/// +/// A parallel, push-based pipeline, that enqueues the processing of a batch to the rayon +/// thread pool in [`Pipeline::push`], and wakes the corresponding output partition when +/// the job completes. Order and non-order preserving variants are possible +/// +/// A merge pipeline which combines data from one or more input partitions into one or +/// more output partitions. [`Pipeline::push`] adds data to an input buffer, and wakes +/// any output partitions that may now be able to make progress. This may be none if +/// the operator is waiting on data from a different input partition +/// +/// An aggregation pipeline which combines data from one or more input partitions into +/// a single output partition. [`Pipeline::push`] would eagerly update the computed +/// aggregates, and the final [`Pipeline::close`] trigger flushing these to the output. Review Comment: `close` marks the end of input, and `poll_partition` flushes aggregate states to the output? ########## datafusion/core/src/scheduler/pipeline/mod.rs: ########## @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::task::{Context, Poll}; + +use arrow::record_batch::RecordBatch; + +use crate::error::Result; + +pub mod execution; +pub mod repartition; + +/// A push-based interface used by the scheduler to drive query execution +/// +/// A pipeline processes data from one or more input partitions, producing output +/// to one or more output partitions. As a [`Pipeline`] may drawn on input from +/// more than one upstream [`Pipeline`], input partitions are identified by both +/// a child index, and a partition index, whereas output partitions are only +/// identified by a partition index. +/// +/// This is not intended as an eventual replacement for the physical plan representation +/// within DataFusion, [`ExecutionPlan`], but rather a generic interface that +/// parts of the physical plan are "compiled" into by the scheduler. +/// +/// # Eager vs Lazy Execution +/// +/// Whether computation is eagerly done on push, or lazily done on pull, is +/// intentionally left as an implementation detail of the [`Pipeline`] +/// +/// This allows flexibility to support the following different patterns, and potentially more: +/// +/// An eager, push-based pipeline, that processes a batch synchronously in [`Pipeline::push`] +/// and immediately wakes the corresponding output partition. +/// +/// A parallel, push-based pipeline, that enqueues the processing of a batch to the rayon +/// thread pool in [`Pipeline::push`], and wakes the corresponding output partition when +/// the job completes. Order and non-order preserving variants are possible +/// +/// A merge pipeline which combines data from one or more input partitions into one or +/// more output partitions. [`Pipeline::push`] adds data to an input buffer, and wakes +/// any output partitions that may now be able to make progress. This may be none if +/// the operator is waiting on data from a different input partition +/// +/// An aggregation pipeline which combines data from one or more input partitions into +/// a single output partition. [`Pipeline::push`] would eagerly update the computed +/// aggregates, and the final [`Pipeline::close`] trigger flushing these to the output. +/// It would also be possible to flush once the partial aggregates reach a certain size +/// +/// A partition-aware aggregation pipeline, which functions similarly to the above, but +/// computes aggregations per input partition, before combining these prior to flush. +/// +/// An async input pipeline, which has no inputs, and wakes the output partition +/// whenever new data is available +/// +/// A JIT compiled sequence of synchronous operators, that perform multiple operations +/// from the physical plan as a single [`Pipeline`]. Parallelized implementations +/// are also possible +/// +pub trait Pipeline: Send + Sync + std::fmt::Debug { + /// Push a [`RecordBatch`] to the given input partition + fn push(&self, input: RecordBatch, child: usize, partition: usize) -> Result<()>; Review Comment: I suppose the `child` comes from the `ExecutionPlan` children that provides input for the plan, but it's a little bit vague in the pipeline context, `upstream/downstream` are more meaningful names for pipelines? ########## datafusion/core/src/scheduler/task.rs: ########## @@ -0,0 +1,497 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::{DataFusionError, Result}; +use crate::physical_plan::stream::RecordBatchStreamAdapter; +use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; +use crate::scheduler::{ + is_worker, plan::PipelinePlan, spawn_local, spawn_local_fifo, RoutablePipeline, + Spawner, +}; +use arrow::datatypes::SchemaRef; +use arrow::error::ArrowError; +use arrow::record_batch::RecordBatch; +use futures::channel::mpsc; +use futures::task::ArcWake; +use futures::{ready, Stream, StreamExt}; +use log::{debug, trace}; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Weak}; +use std::task::{Context, Poll}; + +/// Spawns a [`PipelinePlan`] using the provided [`Spawner`] +pub fn spawn_plan(plan: PipelinePlan, spawner: Spawner) -> ExecutionResults { + debug!("Spawning pipeline plan: {:#?}", plan); + + let (senders, receivers) = (0..plan.output_partitions) + .map(|_| mpsc::unbounded()) + .unzip::<_, _, Vec<_>, Vec<_>>(); + + let context = Arc::new(ExecutionContext { + spawner, + pipelines: plan.pipelines, + schema: plan.schema, + output: senders, + }); + + for (pipeline_idx, query_pipeline) in context.pipelines.iter().enumerate() { + for partition in 0..query_pipeline.pipeline.output_partitions() { + context.spawner.spawn(Task { + context: context.clone(), + waker: Arc::new(TaskWaker { + context: Arc::downgrade(&context), + wake_count: AtomicUsize::new(1), + pipeline: pipeline_idx, + partition, + }), + }); + } + } + + let partitions = receivers + .into_iter() + .map(|receiver| ExecutionResultStream { + receiver: receiver, + context: context.clone(), + }) + .collect(); + + ExecutionResults { + streams: partitions, + context, + } +} + +/// A [`Task`] identifies an output partition within a given pipeline that may be able to +/// make progress. The [`Scheduler`][super::Scheduler] maintains a list of outstanding +/// [`Task`] and distributes them amongst its worker threads. +pub struct Task { + /// Maintain a link to the [`ExecutionContext`] this is necessary to be able to + /// route the output of the partition to its destination + context: Arc<ExecutionContext>, + + /// A [`ArcWake`] that can be used to re-schedule this [`Task`] for execution + waker: Arc<TaskWaker>, +} + +impl std::fmt::Debug for Task { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let output = &self.context.pipelines[self.waker.pipeline].output; + + f.debug_struct("Task") + .field("pipeline", &self.waker.pipeline) + .field("partition", &self.waker.partition) + .field("output", &output) + .finish() + } +} + +impl Task { + fn handle_error( + &self, + partition: usize, + routable: &RoutablePipeline, + error: DataFusionError, + ) { + self.context.send_query_output(partition, Err(error)); + if let Some(link) = routable.output { + trace!( + "Closing pipeline: {:?}, partition: {}, due to error", + link, + self.waker.partition, + ); + + self.context.pipelines[link.pipeline] + .pipeline + .close(link.child, self.waker.partition); + } + } + + /// Call [`Pipeline::poll_partition`], attempting to make progress on query execution + pub fn do_work(self) { + assert!(is_worker(), "Task::do_work called outside of worker pool"); + if self.context.is_cancelled() { + return; + } + + // Capture the wake count prior to calling [`Pipeline::poll_partition`] + // this allows us to detect concurrent wake ups and handle them correctly + let wake_count = self.waker.wake_count.load(Ordering::SeqCst); + + let node = self.waker.pipeline; + let partition = self.waker.partition; + + let waker = futures::task::waker_ref(&self.waker); + let mut cx = Context::from_waker(&*waker); + + let pipelines = &self.context.pipelines; + let routable = &pipelines[node]; + match routable.pipeline.poll_partition(&mut cx, partition) { + Poll::Ready(Some(Ok(batch))) => { + trace!("Poll {:?}: Ok: {}", self, batch.num_rows()); + match routable.output { + Some(link) => { + trace!( + "Publishing batch to pipeline {:?} partition {}", + link, + partition + ); + + let r = pipelines[link.pipeline] + .pipeline + .push(batch, link.child, partition); + + if let Err(e) = r { + self.handle_error(partition, routable, e); + + // Return without rescheduling this output again + return; + } + } + None => { + trace!("Publishing batch to output"); + self.context.send_query_output(partition, Ok(batch)) + } + } + + // Reschedule this pipeline again + // + // We want to prioritise running tasks triggered by the most recent + // batch, so reschedule with FIFO ordering + // + // Note: We must schedule after we have routed the batch, otherwise + // we introduce a potential ordering race where the newly scheduled + // task runs before this task finishes routing the output + spawn_local_fifo(self); + } + Poll::Ready(Some(Err(e))) => { + trace!("Poll {:?}: Error: {:?}", self, e); + self.handle_error(partition, routable, e) + } + Poll::Ready(None) => { + trace!("Poll {:?}: None", self); + match routable.output { + Some(link) => { + trace!("Closing pipeline: {:?}, partition: {}", link, partition); + pipelines[link.pipeline] + .pipeline + .close(link.child, partition) + } + None => self.context.finish(partition), + } + } + Poll::Pending => { + trace!("Poll {:?}: Pending", self); + // Attempt to reset the wake count with the value obtained prior + // to calling [`Pipeline::poll_partition`]. + // + // If this fails it indicates a wakeup was received whilst executing + // [`Pipeline::poll_partition`] and we should reschedule the task + let reset = self.waker.wake_count.compare_exchange( + wake_count, + 0, + Ordering::SeqCst, + Ordering::SeqCst, + ); + + if reset.is_err() { + trace!("Wakeup triggered whilst polling: {:?}", self); + spawn_local(self); + } + } + } + } +} + +/// The results of the execution of a query +pub struct ExecutionResults { + /// [`ExecutionResultStream`] for each partition of this query + streams: Vec<ExecutionResultStream>, + + /// Keep a reference to the [`ExecutionContext`] so it isn't dropped early + context: Arc<ExecutionContext>, +} + +impl ExecutionResults { + /// Returns a [`SendableRecordBatchStream`] of this execution + /// + /// In the event of multiple output partitions, the output will be interleaved + pub fn stream(self) -> SendableRecordBatchStream { + let schema = self.context.schema.clone(); + Box::pin(RecordBatchStreamAdapter::new( + schema, + futures::stream::select_all(self.streams), + )) + } + + /// Returns a [`SendableRecordBatchStream`] for each partition of this execution + pub fn stream_partitioned(self) -> Vec<SendableRecordBatchStream> { + self.streams.into_iter().map(|s| Box::pin(s) as _).collect() + } +} + +/// A result stream for the execution of a query +struct ExecutionResultStream { + receiver: mpsc::UnboundedReceiver<Option<Result<RecordBatch>>>, + + /// Keep a reference to the [`ExecutionContext`] so it isn't dropped early + context: Arc<ExecutionContext>, +} + +impl Stream for ExecutionResultStream { + type Item = arrow::error::Result<RecordBatch>; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + let opt = ready!(self.receiver.poll_next_unpin(cx)).flatten(); + Poll::Ready(opt.map(|r| r.map_err(|e| ArrowError::ExternalError(Box::new(e))))) + } +} + +impl RecordBatchStream for ExecutionResultStream { + fn schema(&self) -> SchemaRef { + self.context.schema.clone() + } +} + +/// The shared state of all [`Task`] created from the same [`PipelinePlan`] +#[derive(Debug)] +struct ExecutionContext { + /// Spawner for this query + spawner: Spawner, + + /// List of pipelines that belong to this query, pipelines are addressed + /// based on their index within this list + pipelines: Vec<RoutablePipeline>, + + /// Schema of this plans output + pub schema: SchemaRef, + + /// The output streams, per partition, for this query's execution + output: Vec<mpsc::UnboundedSender<Option<Result<RecordBatch>>>>, +} + +impl Drop for ExecutionContext { + fn drop(&mut self) { + debug!("ExecutionContext dropped"); + } +} + +impl ExecutionContext { + /// Returns `true` if this query has been dropped, specifically if the + /// stream returned by [`super::Scheduler::schedule`] has been dropped + fn is_cancelled(&self) -> bool { + self.output.iter().all(|x| x.is_closed()) + } + + /// Sends `output` to this query's output stream + fn send_query_output(&self, partition: usize, output: Result<RecordBatch>) { + let _ = self.output[partition].unbounded_send(Some(output)); + } + + /// Mark this partition as finished + fn finish(&self, partition: usize) { + let _ = self.output[partition].unbounded_send(None); + } +} + +struct TaskWaker { + /// Store a weak reference to the [`ExecutionContext`] to avoid reference cycles if this + /// [`Waker`] is stored within a [`Pipeline`] owned by the [`ExecutionContext`] + context: Weak<ExecutionContext>, + + /// A counter that stores the number of times this has been awoken + /// + /// A value > 0, implies the task is either in the ready queue or + /// currently being executed + /// + /// `TaskWaker::wake` always increments the `wake_count`, however, it only + /// re-enqueues the [`Task`] if the value prior to increment was 0 + /// + /// This ensures that a given [`Task`] is not enqueued multiple times + /// + /// We store an integer, as opposed to a boolean, so that wake ups that + /// occur during [`Pipeline::poll_partition`] can be detected and handled + /// after it has finished executing + /// + wake_count: AtomicUsize, + + /// The index of the pipeline within `query` to poll + pipeline: usize, + + /// The partition of the pipeline within `query` to poll + partition: usize, Review Comment: 👍 Wake up the waker and re-enqueue the task. ########## datafusion/core/src/scheduler/mod.rs: ########## @@ -0,0 +1,454 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A [`Scheduler`] maintains a pool of dedicated worker threads on which +//! query execution can be scheduled. This is based on the idea of [Morsel-Driven Parallelism] +//! and is designed to decouple the execution parallelism from the parallelism expressed in +//! the physical plan as partitions. +//! +//! # Implementation +//! +//! When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it up into smaller +//! chunks called pipelines. Each pipeline may consist of one or more nodes from the +//! [`ExecutionPlan`] tree. +//! +//! The scheduler then maintains a list of pending [`Task`], that identify a partition within +//! a particular pipeline that may be able to make progress on some "morsel" of data. These +//! [`Task`] are then scheduled on the worker pool, with a preference for scheduling work +//! on a given "morsel" on the same thread that produced it. +//! +//! # Rayon +//! +//! Under-the-hood these [`Task`] are scheduled by [rayon], which is a lightweight, work-stealing +//! scheduler optimised for CPU-bound workloads. Pipelines may exploit this fact, and use [rayon]'s +//! structured concurrency primitives to express additional parallelism that may be exploited +//! if there are idle threads available at runtime +//! +//! # Shutdown +//! +//! Queries scheduled on a [`Scheduler`] will run to completion even if the +//! [`Scheduler`] is dropped +//! +//! [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf +//! [rayon]: https://docs.rs/rayon/latest/rayon/ +//! +//! # Example +//! +//! ```rust +//! # use futures::TryStreamExt; +//! # use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext}; +//! # use datafusion_scheduler::Scheduler; +//! +//! # #[tokio::main] +//! # async fn main() { +//! let scheduler = Scheduler::new(4); +//! let config = SessionConfig::new().with_target_partitions(4); +//! let context = SessionContext::with_config(config); +//! +//! context.register_csv("example", "../core/tests/example.csv", CsvReadOptions::new()).await.unwrap(); +//! let plan = context.sql("SELECT MIN(b) FROM example") +//! .await +//! .unwrap() +//! .create_physical_plan() +//! .await +//! .unwrap(); +//! +//! let task = context.task_ctx(); +//! let stream = scheduler.schedule(plan, task).unwrap(); +//! let scheduled: Vec<_> = stream.try_collect().await.unwrap(); +//! # } +//! ``` +//! + +use std::sync::Arc; + +use futures::stream::BoxStream; +use log::{debug, error}; + +use crate::error::Result; +use crate::execution::context::TaskContext; +use crate::physical_plan::ExecutionPlan; + +use plan::{PipelinePlan, PipelinePlanner, RoutablePipeline}; +use task::{spawn_plan, Task}; + +use rayon::{ThreadPool, ThreadPoolBuilder}; + +pub use task::ExecutionResults; + +mod pipeline; +mod plan; +mod task; + +/// Builder for a [`Scheduler`] +#[derive(Debug)] +pub struct SchedulerBuilder { + inner: ThreadPoolBuilder, +} + +impl SchedulerBuilder { + /// Create a new [`SchedulerConfig`] with the provided number of threads + pub fn new(num_threads: usize) -> Self { + let builder = ThreadPoolBuilder::new() + .num_threads(num_threads) + .panic_handler(|p| error!("{}", format_worker_panic(p))) + .thread_name(|idx| format!("df-worker-{}", idx)); + + Self { inner: builder } + } + + /// Registers a custom panic handler + #[cfg(test)] + fn panic_handler<H>(self, panic_handler: H) -> Self + where + H: Fn(Box<dyn std::any::Any + Send>) + Send + Sync + 'static, + { + Self { + inner: self.inner.panic_handler(panic_handler), + } + } + + /// Build a new [`Scheduler`] + fn build(self) -> Scheduler { + Scheduler { + pool: Arc::new(self.inner.build().unwrap()), + } + } +} + +/// A [`Scheduler`] that can be used to schedule [`ExecutionPlan`] on a dedicated thread pool +pub struct Scheduler { + pool: Arc<ThreadPool>, +} + +impl Scheduler { + /// Create a new [`Scheduler`] with `num_threads` new threads in a dedicated thread pool + pub fn new(num_threads: usize) -> Self { + SchedulerBuilder::new(num_threads).build() + } + + /// Schedule the provided [`ExecutionPlan`] on this [`Scheduler`]. + /// + /// Returns a [`ExecutionResults`] that can be used to receive results as they are produced, + /// as a [`futures::Stream`] of [`RecordBatch`] + pub fn schedule( + &self, + plan: Arc<dyn ExecutionPlan>, + context: Arc<TaskContext>, + ) -> Result<ExecutionResults> { + let plan = PipelinePlanner::new(plan, context).build()?; + Ok(self.schedule_plan(plan)) + } + + /// Schedule the provided [`PipelinePlan`] on this [`Scheduler`]. + pub(crate) fn schedule_plan(&self, plan: PipelinePlan) -> ExecutionResults { + spawn_plan(plan, self.spawner()) + } + + fn spawner(&self) -> Spawner { + Spawner { + pool: self.pool.clone(), + } + } +} + +/// Formats a panic message for a worker +fn format_worker_panic(panic: Box<dyn std::any::Any + Send>) -> String { + let maybe_idx = rayon::current_thread_index(); + let worker: &dyn std::fmt::Display = match &maybe_idx { + Some(idx) => idx, + None => &"UNKNOWN", + }; + + let message = if let Some(msg) = panic.downcast_ref::<&str>() { + *msg + } else if let Some(msg) = panic.downcast_ref::<String>() { + msg.as_str() + } else { + "UNKNOWN" + }; + + format!("worker {} panicked with: {}", worker, message) +} + +/// Returns `true` if the current thread is a rayon worker thread +/// +/// Note: if there are multiple rayon pools, this will return `true` if the current thread +/// belongs to ANY rayon pool, even if this isn't a worker thread of a [`Scheduler`] instance +fn is_worker() -> bool { + rayon::current_thread_index().is_some() +} + +/// Spawn a [`Task`] onto the local workers thread pool +/// +/// There is no guaranteed order of execution, as workers may steal at any time. However, +/// `spawn_local` will append to the front of the current worker's queue, workers pop tasks from +/// the front of their queue, and steal tasks from the back of other workers queues Review Comment: 👍 TIL ########## datafusion/core/src/scheduler/pipeline/mod.rs: ########## @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::task::{Context, Poll}; + +use arrow::record_batch::RecordBatch; + +use crate::error::Result; + +pub mod execution; +pub mod repartition; + +/// A push-based interface used by the scheduler to drive query execution +/// +/// A pipeline processes data from one or more input partitions, producing output +/// to one or more output partitions. As a [`Pipeline`] may drawn on input from +/// more than one upstream [`Pipeline`], input partitions are identified by both +/// a child index, and a partition index, whereas output partitions are only +/// identified by a partition index. +/// +/// This is not intended as an eventual replacement for the physical plan representation +/// within DataFusion, [`ExecutionPlan`], but rather a generic interface that +/// parts of the physical plan are "compiled" into by the scheduler. +/// +/// # Eager vs Lazy Execution +/// +/// Whether computation is eagerly done on push, or lazily done on pull, is +/// intentionally left as an implementation detail of the [`Pipeline`] +/// +/// This allows flexibility to support the following different patterns, and potentially more: +/// +/// An eager, push-based pipeline, that processes a batch synchronously in [`Pipeline::push`] +/// and immediately wakes the corresponding output partition. +/// +/// A parallel, push-based pipeline, that enqueues the processing of a batch to the rayon +/// thread pool in [`Pipeline::push`], and wakes the corresponding output partition when +/// the job completes. Order and non-order preserving variants are possible +/// +/// A merge pipeline which combines data from one or more input partitions into one or +/// more output partitions. [`Pipeline::push`] adds data to an input buffer, and wakes +/// any output partitions that may now be able to make progress. This may be none if +/// the operator is waiting on data from a different input partition +/// +/// An aggregation pipeline which combines data from one or more input partitions into +/// a single output partition. [`Pipeline::push`] would eagerly update the computed +/// aggregates, and the final [`Pipeline::close`] trigger flushing these to the output. +/// It would also be possible to flush once the partial aggregates reach a certain size Review Comment: 👍 ########## datafusion/core/src/scheduler/mod.rs: ########## @@ -0,0 +1,454 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A [`Scheduler`] maintains a pool of dedicated worker threads on which +//! query execution can be scheduled. This is based on the idea of [Morsel-Driven Parallelism] +//! and is designed to decouple the execution parallelism from the parallelism expressed in +//! the physical plan as partitions. +//! +//! # Implementation +//! +//! When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it up into smaller +//! chunks called pipelines. Each pipeline may consist of one or more nodes from the +//! [`ExecutionPlan`] tree. +//! +//! The scheduler then maintains a list of pending [`Task`], that identify a partition within +//! a particular pipeline that may be able to make progress on some "morsel" of data. These +//! [`Task`] are then scheduled on the worker pool, with a preference for scheduling work +//! on a given "morsel" on the same thread that produced it. +//! +//! # Rayon +//! +//! Under-the-hood these [`Task`] are scheduled by [rayon], which is a lightweight, work-stealing +//! scheduler optimised for CPU-bound workloads. Pipelines may exploit this fact, and use [rayon]'s +//! structured concurrency primitives to express additional parallelism that may be exploited +//! if there are idle threads available at runtime +//! +//! # Shutdown +//! +//! Queries scheduled on a [`Scheduler`] will run to completion even if the +//! [`Scheduler`] is dropped +//! +//! [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf +//! [rayon]: https://docs.rs/rayon/latest/rayon/ +//! +//! # Example +//! +//! ```rust +//! # use futures::TryStreamExt; +//! # use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext}; +//! # use datafusion_scheduler::Scheduler; +//! +//! # #[tokio::main] +//! # async fn main() { +//! let scheduler = Scheduler::new(4); +//! let config = SessionConfig::new().with_target_partitions(4); +//! let context = SessionContext::with_config(config); +//! +//! context.register_csv("example", "../core/tests/example.csv", CsvReadOptions::new()).await.unwrap(); +//! let plan = context.sql("SELECT MIN(b) FROM example") +//! .await +//! .unwrap() +//! .create_physical_plan() +//! .await +//! .unwrap(); +//! +//! let task = context.task_ctx(); +//! let stream = scheduler.schedule(plan, task).unwrap(); +//! let scheduled: Vec<_> = stream.try_collect().await.unwrap(); +//! # } +//! ``` +//! + +use std::sync::Arc; + +use futures::stream::BoxStream; +use log::{debug, error}; + +use crate::error::Result; +use crate::execution::context::TaskContext; +use crate::physical_plan::ExecutionPlan; + +use plan::{PipelinePlan, PipelinePlanner, RoutablePipeline}; +use task::{spawn_plan, Task}; + +use rayon::{ThreadPool, ThreadPoolBuilder}; + +pub use task::ExecutionResults; + +mod pipeline; +mod plan; +mod task; + +/// Builder for a [`Scheduler`] +#[derive(Debug)] +pub struct SchedulerBuilder { + inner: ThreadPoolBuilder, +} + +impl SchedulerBuilder { + /// Create a new [`SchedulerConfig`] with the provided number of threads + pub fn new(num_threads: usize) -> Self { + let builder = ThreadPoolBuilder::new() + .num_threads(num_threads) + .panic_handler(|p| error!("{}", format_worker_panic(p))) + .thread_name(|idx| format!("df-worker-{}", idx)); + + Self { inner: builder } + } + + /// Registers a custom panic handler + #[cfg(test)] + fn panic_handler<H>(self, panic_handler: H) -> Self + where + H: Fn(Box<dyn std::any::Any + Send>) + Send + Sync + 'static, + { + Self { + inner: self.inner.panic_handler(panic_handler), + } + } + + /// Build a new [`Scheduler`] + fn build(self) -> Scheduler { + Scheduler { + pool: Arc::new(self.inner.build().unwrap()), + } + } +} + +/// A [`Scheduler`] that can be used to schedule [`ExecutionPlan`] on a dedicated thread pool +pub struct Scheduler { + pool: Arc<ThreadPool>, +} + +impl Scheduler { + /// Create a new [`Scheduler`] with `num_threads` new threads in a dedicated thread pool + pub fn new(num_threads: usize) -> Self { + SchedulerBuilder::new(num_threads).build() + } + + /// Schedule the provided [`ExecutionPlan`] on this [`Scheduler`]. + /// + /// Returns a [`ExecutionResults`] that can be used to receive results as they are produced, + /// as a [`futures::Stream`] of [`RecordBatch`] + pub fn schedule( + &self, + plan: Arc<dyn ExecutionPlan>, + context: Arc<TaskContext>, + ) -> Result<ExecutionResults> { + let plan = PipelinePlanner::new(plan, context).build()?; + Ok(self.schedule_plan(plan)) + } + + /// Schedule the provided [`PipelinePlan`] on this [`Scheduler`]. + pub(crate) fn schedule_plan(&self, plan: PipelinePlan) -> ExecutionResults { + spawn_plan(plan, self.spawner()) + } + + fn spawner(&self) -> Spawner { + Spawner { + pool: self.pool.clone(), + } + } +} + +/// Formats a panic message for a worker +fn format_worker_panic(panic: Box<dyn std::any::Any + Send>) -> String { + let maybe_idx = rayon::current_thread_index(); + let worker: &dyn std::fmt::Display = match &maybe_idx { + Some(idx) => idx, + None => &"UNKNOWN", + }; + + let message = if let Some(msg) = panic.downcast_ref::<&str>() { + *msg + } else if let Some(msg) = panic.downcast_ref::<String>() { + msg.as_str() + } else { + "UNKNOWN" + }; + + format!("worker {} panicked with: {}", worker, message) +} + +/// Returns `true` if the current thread is a rayon worker thread +/// +/// Note: if there are multiple rayon pools, this will return `true` if the current thread +/// belongs to ANY rayon pool, even if this isn't a worker thread of a [`Scheduler`] instance +fn is_worker() -> bool { + rayon::current_thread_index().is_some() +} + +/// Spawn a [`Task`] onto the local workers thread pool +/// +/// There is no guaranteed order of execution, as workers may steal at any time. However, +/// `spawn_local` will append to the front of the current worker's queue, workers pop tasks from +/// the front of their queue, and steal tasks from the back of other workers queues +/// +/// The effect is that tasks spawned using `spawn_local` will typically be prioritised in +/// a LIFO order, however, this should not be relied upon +fn spawn_local(task: Task) { + // Verify is a worker thread to avoid creating a global pool + assert!(is_worker(), "must be called from a worker"); + rayon::spawn(|| task.do_work()) +} + +/// Spawn a [`Task`] onto the local workers thread pool with fifo ordering +/// +/// There is no guaranteed order of execution, as workers may steal at any time. However, +/// `spawn_local_fifo` will append to the back of the current worker's queue, workers pop tasks +/// from the front of their queue, and steal tasks from the back of other workers queues +/// +/// The effect is that tasks spawned using `spawn_local_fifo` will typically be prioritised +/// in a FIFO order, however, this should not be relied upon +fn spawn_local_fifo(task: Task) { + // Verify is a worker thread to avoid creating a global pool + assert!(is_worker(), "must be called from a worker"); + rayon::spawn_fifo(|| task.do_work()) +} + +#[derive(Debug, Clone)] +pub struct Spawner { + pool: Arc<ThreadPool>, +} + +impl Spawner { + pub fn spawn(&self, task: Task) { + debug!("Spawning {:?} to any worker", task); + self.pool.spawn(move || task.do_work()); + } +} + +#[cfg(test)] +mod tests { + use arrow::util::pretty::pretty_format_batches; + use std::ops::Range; + use std::panic::panic_any; + + use futures::{StreamExt, TryStreamExt}; + use log::info; + use rand::distributions::uniform::SampleUniform; + use rand::{thread_rng, Rng}; + + use crate::arrow::array::{ArrayRef, PrimitiveArray}; + use crate::arrow::datatypes::{ArrowPrimitiveType, Float64Type, Int32Type}; + use crate::arrow::record_batch::RecordBatch; + use crate::datasource::{MemTable, TableProvider}; + use crate::physical_plan::displayable; + use crate::prelude::{SessionConfig, SessionContext}; + + use super::*; + + fn generate_primitive<T, R>( + rng: &mut R, + len: usize, + valid_percent: f64, + range: Range<T::Native>, + ) -> ArrayRef + where + T: ArrowPrimitiveType, + T::Native: SampleUniform, + R: Rng, + { + Arc::new(PrimitiveArray::<T>::from_iter((0..len).map(|_| { + rng.gen_bool(valid_percent) + .then(|| rng.gen_range(range.clone())) + }))) + } + + fn generate_batch<R: Rng>( + rng: &mut R, + row_count: usize, + id_offset: i32, + ) -> RecordBatch { + let id_range = id_offset..(row_count as i32 + id_offset); + let a = generate_primitive::<Int32Type, _>(rng, row_count, 0.5, 0..1000); + let b = generate_primitive::<Float64Type, _>(rng, row_count, 0.5, 0. ..1000.); + let id = PrimitiveArray::<Int32Type>::from_iter_values(id_range); + + RecordBatch::try_from_iter_with_nullable([ + ("a", a, true), + ("b", b, true), + ("id", Arc::new(id), false), + ]) + .unwrap() + } + + const BATCHES_PER_PARTITION: usize = 20; + const ROWS_PER_BATCH: usize = 100; + const NUM_PARTITIONS: usize = 2; + + fn make_batches() -> Vec<Vec<RecordBatch>> { + let mut rng = thread_rng(); + + let mut id_offset = 0; + + (0..NUM_PARTITIONS) + .map(|_| { + (0..BATCHES_PER_PARTITION) + .map(|_| { + let batch = generate_batch(&mut rng, ROWS_PER_BATCH, id_offset); + id_offset += ROWS_PER_BATCH as i32; + batch + }) + .collect() + }) + .collect() + } + + fn make_provider() -> Arc<dyn TableProvider> { + let batches = make_batches(); + let schema = batches.first().unwrap().first().unwrap().schema(); + Arc::new(MemTable::try_new(schema, make_batches()).unwrap()) + } + + fn init_logging() { + let _ = env_logger::builder().is_test(true).try_init(); + } + + #[tokio::test] + async fn test_simple() { + init_logging(); + + let scheduler = Scheduler::new(4); + + let config = SessionConfig::new().with_target_partitions(4); + let context = SessionContext::with_config(config); + + context.register_table("table1", make_provider()).unwrap(); + context.register_table("table2", make_provider()).unwrap(); + + let queries = [ + "select * from table1 order by id", + "select * from table1 where table1.a > 100 order by id", + "select distinct a from table1 where table1.b > 100 order by a", + "select * from table1 join table2 on table1.id = table2.id order by table1.id", + "select id from table1 union all select id from table2 order by id", + "select id from table1 union all select id from table2 where a > 100 order by id", + "select id, b from (select id, b from table1 union all select id, b from table2 where a > 100 order by id) as t where b > 10 order by id, b", + "select id, MIN(b), MAX(b), AVG(b) from table1 group by id order by id", + "select count(*) from table1 where table1.a > 4", Review Comment: It would be beneficial if we could include one or several SQLs with their execution pipelines in the upper module doc. ########## datafusion/core/src/scheduler/pipeline/mod.rs: ########## @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::task::{Context, Poll}; + +use arrow::record_batch::RecordBatch; + +use crate::error::Result; + +pub mod execution; +pub mod repartition; + +/// A push-based interface used by the scheduler to drive query execution +/// +/// A pipeline processes data from one or more input partitions, producing output +/// to one or more output partitions. As a [`Pipeline`] may drawn on input from +/// more than one upstream [`Pipeline`], input partitions are identified by both +/// a child index, and a partition index, whereas output partitions are only +/// identified by a partition index. Review Comment: Pipeline inputs are identified by both a upstream index, and a partition index, whereas pipeline outputs are only identified by a partition index. ########## datafusion/core/src/scheduler/plan.rs: ########## @@ -0,0 +1,296 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::SchemaRef; +use std::sync::Arc; + +use crate::error::Result; +use crate::execution::context::TaskContext; +use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use crate::physical_plan::repartition::RepartitionExec; +use crate::physical_plan::{ExecutionPlan, Partitioning}; + +use crate::scheduler::pipeline::{ + execution::ExecutionPipeline, repartition::RepartitionPipeline, Pipeline, +}; + +/// Identifies the [`Pipeline`] within the [`PipelinePlan`] to route output to +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct OutputLink { + /// The index of the [`Pipeline`] in [`PipelinePlan`] to route output to + pub pipeline: usize, + + /// The child of the [`Pipeline`] to route output to + pub child: usize, +} + +/// Combines a [`Pipeline`] with an [`OutputLink`] identifying where to send its output +#[derive(Debug)] +pub struct RoutablePipeline { + /// The pipeline that produces data + pub pipeline: Box<dyn Pipeline>, + + /// Where to send output the output of `pipeline` + /// + /// If `None`, the output should be sent to the query output + pub output: Option<OutputLink>, +} + +/// [`PipelinePlan`] is the scheduler's representation of the [`ExecutionPlan`] passed to +/// [`super::Scheduler::schedule`]. It combines the list of [Pipeline`] with the information +/// necessary to route output from one stage to the next +#[derive(Debug)] +pub struct PipelinePlan { + /// Schema of this plans output + pub schema: SchemaRef, + + /// Number of output partitions + pub output_partitions: usize, + + /// Pipelines that comprise this plan + pub pipelines: Vec<RoutablePipeline>, +} + +/// When converting [`ExecutionPlan`] to [`Pipeline`] we may wish to group +/// together multiple operators, [`OperatorGroup`] stores this state +struct OperatorGroup { + /// Where to route the output of the eventual [`Pipeline`] + output: Option<OutputLink>, + + /// The [`ExecutionPlan`] from which to start recursing + root: Arc<dyn ExecutionPlan>, + + /// The number of times to recurse into the [`ExecutionPlan`]'s children + depth: usize, +} + +/// A utility struct to assist converting from [`ExecutionPlan`] to [`PipelinePlan`] +/// +/// The [`ExecutionPlan`] is visited in a depth-first fashion, gradually building +/// up the [`RoutablePipeline`] for the [`PipelinePlan`]. As nodes are visited depth-first, +/// a node is visited only after its parent has been. +pub struct PipelinePlanner { + task_context: Arc<TaskContext>, + + /// The schema of this plan + schema: SchemaRef, + + /// The number of output partitions of this plan + output_partitions: usize, + + /// The current list of completed pipelines + completed: Vec<RoutablePipeline>, + + /// A list of [`ExecutionPlan`] still to visit, along with + /// where they should route their output + to_visit: Vec<(Arc<dyn ExecutionPlan>, Option<OutputLink>)>, + + /// Stores one or more operators to combine + /// together into a single [`ExecutionPipeline`] + execution_operators: Option<OperatorGroup>, +} + +impl PipelinePlanner { + pub fn new(plan: Arc<dyn ExecutionPlan>, task_context: Arc<TaskContext>) -> Self { + let schema = plan.schema(); + let output_partitions = plan.output_partitioning().partition_count(); + Self { + completed: vec![], + to_visit: vec![(plan, None)], + task_context, + execution_operators: None, + schema, + output_partitions, + } + } + + /// Flush the current group of operators stored in `execution_operators` + /// into a single [`ExecutionPipeline] + fn flush_exec(&mut self) -> Result<usize> { + let group = self.execution_operators.take().unwrap(); + let node_idx = self.completed.len(); + self.completed.push(RoutablePipeline { + pipeline: Box::new(ExecutionPipeline::new( + group.root, + self.task_context.clone(), + group.depth, + )?), + output: group.output, + }); + Ok(node_idx) + } + + /// Visit a non-special cased [`ExecutionPlan`] + fn visit_exec( + &mut self, + plan: Arc<dyn ExecutionPlan>, + parent: Option<OutputLink>, + ) -> Result<()> { + let children = plan.children(); + + // Add the operator to the current group of operators to be combined + // into a single [`ExecutionPipeline`]. + // + // TODO: More sophisticated policy, just because we can combine them doesn't mean we should Review Comment: The policy will be best combined with a new push-aware ExecutionPlan API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
