alamb commented on code in PR #2226: URL: https://github.com/apache/arrow-datafusion/pull/2226#discussion_r851347286
########## datafusion/scheduler/src/pipeline/mod.rs: ########## @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::task::{Context, Poll}; + +use arrow::record_batch::RecordBatch; + +use crate::ArrowResult; + +pub mod execution; +pub mod repartition; + +/// A push-based interface used by the scheduler to drive query execution +/// +/// A pipeline processes data from one or more input partitions, producing output +/// to one or more output partitions. As a [`Pipeline`] may drawn on input from +/// more than one upstream [`Pipeline`], input partitions are identified by both +/// a child index, and a partition index, whereas output partitions are only +/// identified by a partition index. +/// +/// This is not intended as an eventual replacement for the physical plan representation +/// within DataFusion, [`ExecutionPlan`], but rather a generic interface that +/// parts of the physical plan are "compiled" into by the scheduler. +/// +/// # Push vs Pull Execution +/// +/// Whilst the interface exposed to the scheduler is push-based, in which member functions Review Comment: ```suggestion /// Whilst the interface exposed to the scheduler is push-based, the order of member function ``` ########## datafusion/scheduler/src/pipeline/execution.rs: ########## @@ -0,0 +1,324 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::collections::VecDeque; +use std::fmt::Formatter; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll, Waker}; + +use arrow::error::ArrowError; +use async_trait::async_trait; +use futures::{Stream, StreamExt, TryStreamExt}; +use parking_lot::Mutex; + +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::{error::Result as ArrowResult, record_batch::RecordBatch}; +use datafusion::error::Result; +use datafusion::execution::context::TaskContext; +use datafusion::physical_plan::expressions::PhysicalSortExpr; +use datafusion::physical_plan::metrics::MetricsSet; +use datafusion::physical_plan::{ + displayable, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + RecordBatchStream, SendableRecordBatchStream, Statistics, +}; + +use crate::pipeline::Pipeline; +use crate::BoxStream; + +/// An [`ExecutionPipeline`] wraps a portion of an [`ExecutionPlan`] and +/// converts it to the push-based [`Pipeline`] interface +/// +/// Internally [`ExecutionPipeline`] is still pull-based which limits its parallelism +/// to that of its output partitioning, however, it provides full compatibility with +/// [`ExecutionPlan`] allowing full interoperability with the existing ecosystem +/// +/// Longer term we will likely want to introduce new traits that differentiate between +/// stateless operators like filters, and stateful aggregations, and are better aligned +/// with a push-based execution model. This in turn will allow for [`Pipeline`] implementations +/// that are able to introduce parallelism beyond that expressed in their partitioning Review Comment: I still don't understand this distinction stateful and non stateful operators. Maybe the difference is the "pipeline breaking" ability (the standard database term for operators that may not produce output until they have seen some/all of their input) ########## datafusion/scheduler/src/lib.rs: ########## @@ -0,0 +1,386 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use futures::stream::{BoxStream, StreamExt}; +use log::{debug, error}; + +use datafusion::arrow::error::Result as ArrowResult; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::error::Result; +use datafusion::execution::context::TaskContext; +use datafusion::physical_plan::ExecutionPlan; + +use crate::query::Query; +use crate::task::{spawn_query, Task}; + +use rayon::{ThreadPool, ThreadPoolBuilder}; + +mod pipeline; +mod query; +mod task; + +/// Builder for a [`Scheduler`] +#[derive(Debug)] +pub struct SchedulerBuilder { + inner: ThreadPoolBuilder, +} + +impl SchedulerBuilder { + /// Create a new [`SchedulerConfig`] with the provided number of threads + pub fn new(num_threads: usize) -> Self { + let builder = ThreadPoolBuilder::new() + .num_threads(num_threads) + .panic_handler(|p| error!("{}", format_worker_panic(p))) + .thread_name(|idx| format!("df-worker-{}", idx)); + + Self { inner: builder } + } + + /// Registers a custom panic handler + /// + /// Used by tests + #[allow(dead_code)] + fn panic_handler<H>(self, panic_handler: H) -> Self + where + H: Fn(Box<dyn std::any::Any + Send>) + Send + Sync + 'static, + { + Self { + inner: self.inner.panic_handler(panic_handler), + } + } + + /// Build a new [`Scheduler`] + fn build(self) -> Scheduler { + Scheduler { + pool: Arc::new(self.inner.build().unwrap()), + } + } +} + +/// A [`Scheduler`] maintains a pool of dedicated worker threads on which +/// query execution can be scheduled. This is based on the idea of [Morsel-Driven Parallelism] +/// and is designed to decouple the execution parallelism from the parallelism expressed in +/// the physical plan as partitions. +/// +/// # Implementation +/// +/// When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it up into smaller +/// chunks called pipelines. Each pipeline may consist of one or more nodes from the +/// [`ExecutionPlan`] tree. +/// +/// The scheduler then maintains a list of pending [`Task`], that identify a partition within +/// a particular pipeline that may be able to make progress on some "morsel" of data. These +/// [`Task`] are then scheduled on the worker pool, with a preference for scheduling work +/// on a given "morsel" on the same thread that produced it. +/// +/// # Rayon +/// +/// Under-the-hood these [`Task`] are scheduled by [rayon], which is a lightweight, work-stealing +/// scheduler optimised for CPU-bound workloads. Pipelines may exploit this fact, and use [rayon]'s +/// structured concurrency primitives to express additional parallelism that may be exploited +/// if there are idle threads available at runtime +/// +/// # Shutdown +/// +/// TBC +/// +/// [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf +/// [rayon]: https://docs.rs/rayon/latest/rayon/ +/// +pub struct Scheduler { + pool: Arc<ThreadPool>, +} + +impl Scheduler { + /// Create a new [`Scheduler`] with `num_threads` new threads in a dedicated thread pool + pub fn new(num_threads: usize) -> Self { + SchedulerBuilder::new(num_threads).build() + } + + /// Schedule the provided [`ExecutionPlan`] on this [`Scheduler`]. + /// + /// Returns a [`BoxStream`] that can be used to receive results as they are produced + pub fn schedule( + &self, + plan: Arc<dyn ExecutionPlan>, + context: Arc<TaskContext>, + ) -> Result<BoxStream<'static, ArrowResult<RecordBatch>>> { + let (query, receiver) = Query::new(plan, context, self.spawner())?; + spawn_query(Arc::new(query)); + Ok(receiver.boxed()) + } + + fn spawner(&self) -> Spawner { + Spawner { + pool: self.pool.clone(), + } + } +} + +/// Formats a panic message for a worker +fn format_worker_panic(panic: Box<dyn std::any::Any + Send>) -> String { + let maybe_idx = rayon::current_thread_index(); + let worker: &dyn std::fmt::Display = match &maybe_idx { + Some(idx) => idx, + None => &"UNKNOWN", + }; + + let message = if let Some(msg) = panic.downcast_ref::<&str>() { + *msg + } else if let Some(msg) = panic.downcast_ref::<String>() { + msg.as_str() + } else { + "UNKNOWN" + }; + + format!("worker {} panicked with: {}", worker, message) +} + +/// Returns `true` if the current thread is a worker thread +fn is_worker() -> bool { + rayon::current_thread_index().is_some() +} + +/// Spawn a [`Task`] onto the local workers thread pool +/// +/// There is no guaranteed order of execution, as workers may steal at any time. However, Review Comment: 👍 ########## datafusion/scheduler/src/pipeline/mod.rs: ########## @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::task::{Context, Poll}; + +use arrow::record_batch::RecordBatch; + +use crate::ArrowResult; + +pub mod execution; +pub mod repartition; + +/// A push-based interface used by the scheduler to drive query execution +/// +/// A pipeline processes data from one or more input partitions, producing output +/// to one or more output partitions. As a [`Pipeline`] may drawn on input from +/// more than one upstream [`Pipeline`], input partitions are identified by both +/// a child index, and a partition index, whereas output partitions are only +/// identified by a partition index. +/// +/// This is not intended as an eventual replacement for the physical plan representation +/// within DataFusion, [`ExecutionPlan`], but rather a generic interface that +/// parts of the physical plan are "compiled" into by the scheduler. +/// +/// # Push vs Pull Execution +/// +/// Whilst the interface exposed to the scheduler is push-based, in which member functions +/// computation is performed is intentionally left as an implementation detail of the [`Pipeline`] +/// +/// This allows flexibility to support the following different patterns, and potentially more: +/// +/// An eager, push-based pipeline, that processes a batch synchronously in [`Pipeline::push`] +/// and immediately wakes the corresponding output partition. +/// +/// A parallel, push-based pipeline, that enqueues the processing of a batch to the rayon +/// thread pool in [`Pipeline::push`], and wakes the corresponding output partition when +/// the job completes. Order and non-order preserving variants are possible +/// +/// A merge pipeline which combines data from one or more input partitions into one or +/// more output partitions. [`Pipeline::push`] adds data to an input buffer, and wakes +/// any output partitions that may now be able to make progress. This may be none none +/// if the operator is waiting on data from a different input partition +/// +/// An aggregation pipeline which combines data from one or more input partitions into +/// a single output partition. [`Pipeline::push`] would eagerly update the computed +/// aggregates, and the final [`Pipeline::close`] trigger flushing these to the output Review Comment: There is another related strategy for hash aggregation which is to do partial aggregation and when the hash table is full flush the output hash table and start again fresh (meaning many batch pushes produce some output, but some pushes would produce output - the content of the hash table). Perhaps this is similar to merge-pipeline from a scheduler perspective ########## datafusion/scheduler/src/pipeline/mod.rs: ########## @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::task::{Context, Poll}; + +use arrow::record_batch::RecordBatch; + +use crate::ArrowResult; + +pub mod execution; +pub mod repartition; + +/// A push-based interface used by the scheduler to drive query execution +/// +/// A pipeline processes data from one or more input partitions, producing output +/// to one or more output partitions. As a [`Pipeline`] may drawn on input from +/// more than one upstream [`Pipeline`], input partitions are identified by both +/// a child index, and a partition index, whereas output partitions are only +/// identified by a partition index. +/// +/// This is not intended as an eventual replacement for the physical plan representation +/// within DataFusion, [`ExecutionPlan`], but rather a generic interface that +/// parts of the physical plan are "compiled" into by the scheduler. +/// +/// # Push vs Pull Execution +/// +/// Whilst the interface exposed to the scheduler is push-based, in which member functions +/// computation is performed is intentionally left as an implementation detail of the [`Pipeline`] +/// +/// This allows flexibility to support the following different patterns, and potentially more: +/// +/// An eager, push-based pipeline, that processes a batch synchronously in [`Pipeline::push`] +/// and immediately wakes the corresponding output partition. +/// +/// A parallel, push-based pipeline, that enqueues the processing of a batch to the rayon +/// thread pool in [`Pipeline::push`], and wakes the corresponding output partition when +/// the job completes. Order and non-order preserving variants are possible +/// +/// A merge pipeline which combines data from one or more input partitions into one or +/// more output partitions. [`Pipeline::push`] adds data to an input buffer, and wakes +/// any output partitions that may now be able to make progress. This may be none none Review Comment: ```suggestion /// any output partitions that may now be able to make progress. This may be none ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
