yjshen commented on code in PR #2226:
URL: https://github.com/apache/arrow-datafusion/pull/2226#discussion_r863799676


##########
datafusion/core/src/scheduler/mod.rs:
##########
@@ -0,0 +1,454 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A [`Scheduler`] maintains a pool of dedicated worker threads on which
+//! query execution can be scheduled. This is based on the idea of 
[Morsel-Driven Parallelism]
+//! and is designed to decouple the execution parallelism from the parallelism 
expressed in
+//! the physical plan as partitions.
+//!
+//! # Implementation
+//!
+//! When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it 
up into smaller
+//! chunks called pipelines. Each pipeline may consist of one or more nodes 
from the
+//! [`ExecutionPlan`] tree.
+//!
+//! The scheduler then maintains a list of pending [`Task`], that identify a 
partition within
+//! a particular pipeline that may be able to make progress on some "morsel" 
of data. These
+//! [`Task`] are then scheduled on the worker pool, with a preference for 
scheduling work
+//! on a given "morsel" on the same thread that produced it.
+//!
+//! # Rayon
+//!
+//! Under-the-hood these [`Task`] are scheduled by [rayon], which is a 
lightweight, work-stealing
+//! scheduler optimised for CPU-bound workloads. Pipelines may exploit this 
fact, and use [rayon]'s
+//! structured concurrency primitives to express additional parallelism that 
may be exploited
+//! if there are idle threads available at runtime
+//!
+//! # Shutdown
+//!
+//! Queries scheduled on a [`Scheduler`] will run to completion even if the
+//! [`Scheduler`] is dropped
+//!
+//! [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf
+//! [rayon]: https://docs.rs/rayon/latest/rayon/
+//!
+//! # Example
+//!
+//! ```rust
+//! # use futures::TryStreamExt;
+//! # use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext};
+//! # use datafusion_scheduler::Scheduler;
+//!
+//! # #[tokio::main]
+//! # async fn main() {
+//! let scheduler = Scheduler::new(4);
+//! let config = SessionConfig::new().with_target_partitions(4);
+//! let context = SessionContext::with_config(config);
+//!
+//! context.register_csv("example", "../core/tests/example.csv", 
CsvReadOptions::new()).await.unwrap();
+//! let plan = context.sql("SELECT MIN(b) FROM example")
+//!     .await
+//!    .unwrap()
+//!    .create_physical_plan()
+//!    .await
+//!    .unwrap();
+//!
+//! let task = context.task_ctx();

Review Comment:
   The `task` is slightly misleading from the pipeline's `Task`.



##########
datafusion/core/src/scheduler/mod.rs:
##########
@@ -0,0 +1,454 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A [`Scheduler`] maintains a pool of dedicated worker threads on which
+//! query execution can be scheduled. This is based on the idea of 
[Morsel-Driven Parallelism]
+//! and is designed to decouple the execution parallelism from the parallelism 
expressed in
+//! the physical plan as partitions.
+//!
+//! # Implementation
+//!
+//! When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it 
up into smaller
+//! chunks called pipelines. Each pipeline may consist of one or more nodes 
from the
+//! [`ExecutionPlan`] tree.
+//!
+//! The scheduler then maintains a list of pending [`Task`], that identify a 
partition within
+//! a particular pipeline that may be able to make progress on some "morsel" 
of data. These
+//! [`Task`] are then scheduled on the worker pool, with a preference for 
scheduling work
+//! on a given "morsel" on the same thread that produced it.
+//!
+//! # Rayon
+//!
+//! Under-the-hood these [`Task`] are scheduled by [rayon], which is a 
lightweight, work-stealing
+//! scheduler optimised for CPU-bound workloads. Pipelines may exploit this 
fact, and use [rayon]'s
+//! structured concurrency primitives to express additional parallelism that 
may be exploited
+//! if there are idle threads available at runtime
+//!
+//! # Shutdown
+//!
+//! Queries scheduled on a [`Scheduler`] will run to completion even if the
+//! [`Scheduler`] is dropped
+//!
+//! [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf
+//! [rayon]: https://docs.rs/rayon/latest/rayon/
+//!
+//! # Example
+//!
+//! ```rust
+//! # use futures::TryStreamExt;
+//! # use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext};
+//! # use datafusion_scheduler::Scheduler;
+//!
+//! # #[tokio::main]
+//! # async fn main() {
+//! let scheduler = Scheduler::new(4);
+//! let config = SessionConfig::new().with_target_partitions(4);
+//! let context = SessionContext::with_config(config);
+//!
+//! context.register_csv("example", "../core/tests/example.csv", 
CsvReadOptions::new()).await.unwrap();
+//! let plan = context.sql("SELECT MIN(b) FROM example")
+//!     .await
+//!    .unwrap()
+//!    .create_physical_plan()
+//!    .await
+//!    .unwrap();
+//!
+//! let task = context.task_ctx();
+//! let stream = scheduler.schedule(plan, task).unwrap();
+//! let scheduled: Vec<_> = stream.try_collect().await.unwrap();
+//! # }
+//! ```
+//!
+
+use std::sync::Arc;
+
+use futures::stream::BoxStream;
+use log::{debug, error};
+
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_plan::ExecutionPlan;
+
+use plan::{PipelinePlan, PipelinePlanner, RoutablePipeline};
+use task::{spawn_plan, Task};
+
+use rayon::{ThreadPool, ThreadPoolBuilder};
+
+pub use task::ExecutionResults;
+
+mod pipeline;
+mod plan;
+mod task;
+
+/// Builder for a [`Scheduler`]
+#[derive(Debug)]
+pub struct SchedulerBuilder {
+    inner: ThreadPoolBuilder,
+}
+
+impl SchedulerBuilder {
+    /// Create a new [`SchedulerConfig`] with the provided number of threads
+    pub fn new(num_threads: usize) -> Self {
+        let builder = ThreadPoolBuilder::new()
+            .num_threads(num_threads)
+            .panic_handler(|p| error!("{}", format_worker_panic(p)))
+            .thread_name(|idx| format!("df-worker-{}", idx));
+
+        Self { inner: builder }
+    }
+
+    /// Registers a custom panic handler
+    #[cfg(test)]
+    fn panic_handler<H>(self, panic_handler: H) -> Self
+    where
+        H: Fn(Box<dyn std::any::Any + Send>) + Send + Sync + 'static,
+    {
+        Self {
+            inner: self.inner.panic_handler(panic_handler),
+        }
+    }
+
+    /// Build a new [`Scheduler`]
+    fn build(self) -> Scheduler {
+        Scheduler {
+            pool: Arc::new(self.inner.build().unwrap()),
+        }
+    }
+}
+
+/// A [`Scheduler`] that can be used to schedule [`ExecutionPlan`] on a 
dedicated thread pool
+pub struct Scheduler {
+    pool: Arc<ThreadPool>,
+}
+
+impl Scheduler {
+    /// Create a new [`Scheduler`] with `num_threads` new threads in a 
dedicated thread pool
+    pub fn new(num_threads: usize) -> Self {
+        SchedulerBuilder::new(num_threads).build()
+    }
+
+    /// Schedule the provided [`ExecutionPlan`] on this [`Scheduler`].
+    ///
+    /// Returns a [`ExecutionResults`] that can be used to receive results as 
they are produced,
+    /// as a [`futures::Stream`] of [`RecordBatch`]
+    pub fn schedule(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        context: Arc<TaskContext>,
+    ) -> Result<ExecutionResults> {
+        let plan = PipelinePlanner::new(plan, context).build()?;
+        Ok(self.schedule_plan(plan))
+    }
+
+    /// Schedule the provided [`PipelinePlan`] on this [`Scheduler`].
+    pub(crate) fn schedule_plan(&self, plan: PipelinePlan) -> ExecutionResults 
{
+        spawn_plan(plan, self.spawner())
+    }
+
+    fn spawner(&self) -> Spawner {
+        Spawner {
+            pool: self.pool.clone(),
+        }
+    }
+}
+
+/// Formats a panic message for a worker
+fn format_worker_panic(panic: Box<dyn std::any::Any + Send>) -> String {
+    let maybe_idx = rayon::current_thread_index();
+    let worker: &dyn std::fmt::Display = match &maybe_idx {
+        Some(idx) => idx,
+        None => &"UNKNOWN",
+    };
+
+    let message = if let Some(msg) = panic.downcast_ref::<&str>() {
+        *msg
+    } else if let Some(msg) = panic.downcast_ref::<String>() {
+        msg.as_str()
+    } else {
+        "UNKNOWN"
+    };
+
+    format!("worker {} panicked with: {}", worker, message)
+}
+
+/// Returns `true` if the current thread is a rayon worker thread
+///
+/// Note: if there are multiple rayon pools, this will return `true` if the 
current thread
+/// belongs to ANY rayon pool, even if this isn't a worker thread of a 
[`Scheduler`] instance
+fn is_worker() -> bool {
+    rayon::current_thread_index().is_some()
+}
+
+/// Spawn a [`Task`] onto the local workers thread pool
+///
+/// There is no guaranteed order of execution, as workers may steal at any 
time. However,
+/// `spawn_local` will append to the front of the current worker's queue, 
workers pop tasks from
+/// the front of their queue, and steal tasks from the back of other workers 
queues
+///
+/// The effect is that tasks spawned using `spawn_local` will typically be 
prioritised in
+/// a LIFO order, however, this should not be relied upon
+fn spawn_local(task: Task) {
+    // Verify is a worker thread to avoid creating a global pool
+    assert!(is_worker(), "must be called from a worker");
+    rayon::spawn(|| task.do_work())
+}
+
+/// Spawn a [`Task`] onto the local workers thread pool with fifo ordering
+///
+/// There is no guaranteed order of execution, as workers may steal at any 
time. However,
+/// `spawn_local_fifo` will append to the back of the current worker's queue, 
workers pop tasks
+/// from the front of their queue, and steal tasks from the back of other 
workers queues
+///
+/// The effect is that tasks spawned using `spawn_local_fifo` will typically 
be prioritised
+/// in a FIFO order, however, this should not be relied upon
+fn spawn_local_fifo(task: Task) {
+    // Verify is a worker thread to avoid creating a global pool
+    assert!(is_worker(), "must be called from a worker");
+    rayon::spawn_fifo(|| task.do_work())
+}
+
+#[derive(Debug, Clone)]
+pub struct Spawner {

Review Comment:
   Doc for the `Spawner`?  We could check the clippy by 
   
   ```
   cargo clippy --all-targets --features=scheduler --workspace -- -D warnings
   ```



##########
datafusion/core/src/scheduler/mod.rs:
##########
@@ -0,0 +1,454 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A [`Scheduler`] maintains a pool of dedicated worker threads on which
+//! query execution can be scheduled. This is based on the idea of 
[Morsel-Driven Parallelism]
+//! and is designed to decouple the execution parallelism from the parallelism 
expressed in
+//! the physical plan as partitions.
+//!
+//! # Implementation
+//!
+//! When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it 
up into smaller
+//! chunks called pipelines. Each pipeline may consist of one or more nodes 
from the
+//! [`ExecutionPlan`] tree.
+//!
+//! The scheduler then maintains a list of pending [`Task`], that identify a 
partition within
+//! a particular pipeline that may be able to make progress on some "morsel" 
of data. These
+//! [`Task`] are then scheduled on the worker pool, with a preference for 
scheduling work
+//! on a given "morsel" on the same thread that produced it.
+//!
+//! # Rayon
+//!
+//! Under-the-hood these [`Task`] are scheduled by [rayon], which is a 
lightweight, work-stealing
+//! scheduler optimised for CPU-bound workloads. Pipelines may exploit this 
fact, and use [rayon]'s
+//! structured concurrency primitives to express additional parallelism that 
may be exploited
+//! if there are idle threads available at runtime
+//!
+//! # Shutdown
+//!
+//! Queries scheduled on a [`Scheduler`] will run to completion even if the
+//! [`Scheduler`] is dropped
+//!
+//! [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf
+//! [rayon]: https://docs.rs/rayon/latest/rayon/
+//!
+//! # Example
+//!
+//! ```rust
+//! # use futures::TryStreamExt;
+//! # use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext};
+//! # use datafusion_scheduler::Scheduler;
+//!
+//! # #[tokio::main]
+//! # async fn main() {
+//! let scheduler = Scheduler::new(4);
+//! let config = SessionConfig::new().with_target_partitions(4);
+//! let context = SessionContext::with_config(config);
+//!
+//! context.register_csv("example", "../core/tests/example.csv", 
CsvReadOptions::new()).await.unwrap();
+//! let plan = context.sql("SELECT MIN(b) FROM example")
+//!     .await
+//!    .unwrap()
+//!    .create_physical_plan()
+//!    .await
+//!    .unwrap();

Review Comment:
   nit: the above four lines are 1 space less of indent.



##########
datafusion/core/src/scheduler/pipeline/execution.rs:
##########
@@ -0,0 +1,330 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::VecDeque;
+use std::fmt::Formatter;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll, Waker};
+
+use arrow::error::ArrowError;
+use async_trait::async_trait;
+use futures::{Stream, StreamExt, TryStreamExt};
+use parking_lot::Mutex;
+
+use crate::arrow::datatypes::SchemaRef;
+use crate::arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_plan::expressions::PhysicalSortExpr;
+use crate::physical_plan::metrics::MetricsSet;
+use crate::physical_plan::{
+    displayable, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    RecordBatchStream, SendableRecordBatchStream, Statistics,
+};
+
+use crate::scheduler::pipeline::Pipeline;
+use crate::scheduler::BoxStream;
+
+/// An [`ExecutionPipeline`] wraps a portion of an [`ExecutionPlan`] and
+/// converts it to the push-based [`Pipeline`] interface
+///
+/// Internally [`ExecutionPipeline`] is still pull-based which limits its 
parallelism
+/// to that of its output partitioning, however, it provides full 
compatibility with
+/// [`ExecutionPlan`] allowing full interoperability with the existing 
ecosystem
+///
+/// Longer term we will likely want to introduce new traits that differentiate 
between
+/// pipeline-able operators like filters, and pipeline-breakers like 
aggregations, and
+/// are better aligned with a push-based execution model.

Review Comment:
   👍



##########
datafusion/core/src/scheduler/pipeline/mod.rs:
##########
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::task::{Context, Poll};
+
+use arrow::record_batch::RecordBatch;
+
+use crate::error::Result;
+
+pub mod execution;
+pub mod repartition;
+
+/// A push-based interface used by the scheduler to drive query execution
+///
+/// A pipeline processes data from one or more input partitions, producing 
output
+/// to one or more output partitions. As a [`Pipeline`] may drawn on input from
+/// more than one upstream [`Pipeline`], input partitions are identified by 
both
+/// a child index, and a partition index, whereas output partitions are only
+/// identified by a partition index.
+///
+/// This is not intended as an eventual replacement for the physical plan 
representation
+/// within DataFusion, [`ExecutionPlan`], but rather a generic interface that
+/// parts of the physical plan are "compiled" into by the scheduler.
+///
+/// # Eager vs Lazy Execution
+///
+/// Whether computation is eagerly done on push, or lazily done on pull, is
+/// intentionally left as an implementation detail of the [`Pipeline`]
+///
+/// This allows flexibility to support the following different patterns, and 
potentially more:
+///
+/// An eager, push-based pipeline, that processes a batch synchronously in 
[`Pipeline::push`]
+/// and immediately wakes the corresponding output partition.
+///
+/// A parallel, push-based pipeline, that enqueues the processing of a batch 
to the rayon
+/// thread pool in [`Pipeline::push`], and wakes the corresponding output 
partition when
+/// the job completes. Order and non-order preserving variants are possible
+///
+/// A merge pipeline which combines data from one or more input partitions 
into one or
+/// more output partitions. [`Pipeline::push`] adds data to an input buffer, 
and wakes
+/// any output partitions that may now be able to make progress. This may be 
none if
+/// the operator is waiting on data from a different input partition
+///
+/// An aggregation pipeline which combines data from one or more input 
partitions into
+/// a single output partition. [`Pipeline::push`] would eagerly update the 
computed
+/// aggregates, and the final [`Pipeline::close`] trigger flushing these to 
the output.

Review Comment:
   `close` marks the end of input, and `poll_partition` flushes aggregate 
states to the output?



##########
datafusion/core/src/scheduler/pipeline/mod.rs:
##########
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::task::{Context, Poll};
+
+use arrow::record_batch::RecordBatch;
+
+use crate::error::Result;
+
+pub mod execution;
+pub mod repartition;
+
+/// A push-based interface used by the scheduler to drive query execution
+///
+/// A pipeline processes data from one or more input partitions, producing 
output
+/// to one or more output partitions. As a [`Pipeline`] may drawn on input from
+/// more than one upstream [`Pipeline`], input partitions are identified by 
both
+/// a child index, and a partition index, whereas output partitions are only
+/// identified by a partition index.
+///
+/// This is not intended as an eventual replacement for the physical plan 
representation
+/// within DataFusion, [`ExecutionPlan`], but rather a generic interface that
+/// parts of the physical plan are "compiled" into by the scheduler.
+///
+/// # Eager vs Lazy Execution
+///
+/// Whether computation is eagerly done on push, or lazily done on pull, is
+/// intentionally left as an implementation detail of the [`Pipeline`]
+///
+/// This allows flexibility to support the following different patterns, and 
potentially more:
+///
+/// An eager, push-based pipeline, that processes a batch synchronously in 
[`Pipeline::push`]
+/// and immediately wakes the corresponding output partition.
+///
+/// A parallel, push-based pipeline, that enqueues the processing of a batch 
to the rayon
+/// thread pool in [`Pipeline::push`], and wakes the corresponding output 
partition when
+/// the job completes. Order and non-order preserving variants are possible
+///
+/// A merge pipeline which combines data from one or more input partitions 
into one or
+/// more output partitions. [`Pipeline::push`] adds data to an input buffer, 
and wakes
+/// any output partitions that may now be able to make progress. This may be 
none if
+/// the operator is waiting on data from a different input partition
+///
+/// An aggregation pipeline which combines data from one or more input 
partitions into
+/// a single output partition. [`Pipeline::push`] would eagerly update the 
computed
+/// aggregates, and the final [`Pipeline::close`] trigger flushing these to 
the output.
+/// It would also be possible to flush once the partial aggregates reach a 
certain size
+///
+/// A partition-aware aggregation pipeline, which functions similarly to the 
above, but
+/// computes aggregations per input partition, before combining these prior to 
flush.
+///
+/// An async input pipeline, which has no inputs, and wakes the output 
partition
+/// whenever new data is available
+///
+/// A JIT compiled sequence of synchronous operators, that perform multiple 
operations
+/// from the physical plan as a single [`Pipeline`]. Parallelized 
implementations
+/// are also possible
+///
+pub trait Pipeline: Send + Sync + std::fmt::Debug {
+    /// Push a [`RecordBatch`] to the given input partition
+    fn push(&self, input: RecordBatch, child: usize, partition: usize) -> 
Result<()>;

Review Comment:
   I suppose the `child` comes from the `ExecutionPlan` children that provides 
input for the plan, but it's a little bit vague in the pipeline context, 
`upstream/downstream` are more meaningful names for pipelines?



##########
datafusion/core/src/scheduler/task.rs:
##########
@@ -0,0 +1,497 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::stream::RecordBatchStreamAdapter;
+use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
+use crate::scheduler::{
+    is_worker, plan::PipelinePlan, spawn_local, spawn_local_fifo, 
RoutablePipeline,
+    Spawner,
+};
+use arrow::datatypes::SchemaRef;
+use arrow::error::ArrowError;
+use arrow::record_batch::RecordBatch;
+use futures::channel::mpsc;
+use futures::task::ArcWake;
+use futures::{ready, Stream, StreamExt};
+use log::{debug, trace};
+use std::pin::Pin;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Weak};
+use std::task::{Context, Poll};
+
+/// Spawns a [`PipelinePlan`] using the provided [`Spawner`]
+pub fn spawn_plan(plan: PipelinePlan, spawner: Spawner) -> ExecutionResults {
+    debug!("Spawning pipeline plan: {:#?}", plan);
+
+    let (senders, receivers) = (0..plan.output_partitions)
+        .map(|_| mpsc::unbounded())
+        .unzip::<_, _, Vec<_>, Vec<_>>();
+
+    let context = Arc::new(ExecutionContext {
+        spawner,
+        pipelines: plan.pipelines,
+        schema: plan.schema,
+        output: senders,
+    });
+
+    for (pipeline_idx, query_pipeline) in context.pipelines.iter().enumerate() 
{
+        for partition in 0..query_pipeline.pipeline.output_partitions() {
+            context.spawner.spawn(Task {
+                context: context.clone(),
+                waker: Arc::new(TaskWaker {
+                    context: Arc::downgrade(&context),
+                    wake_count: AtomicUsize::new(1),
+                    pipeline: pipeline_idx,
+                    partition,
+                }),
+            });
+        }
+    }
+
+    let partitions = receivers
+        .into_iter()
+        .map(|receiver| ExecutionResultStream {
+            receiver: receiver,
+            context: context.clone(),
+        })
+        .collect();
+
+    ExecutionResults {
+        streams: partitions,
+        context,
+    }
+}
+
+/// A [`Task`] identifies an output partition within a given pipeline that may 
be able to
+/// make progress. The [`Scheduler`][super::Scheduler] maintains a list of 
outstanding
+/// [`Task`] and distributes them amongst its worker threads.
+pub struct Task {
+    /// Maintain a link to the [`ExecutionContext`] this is necessary to be 
able to
+    /// route the output of the partition to its destination
+    context: Arc<ExecutionContext>,
+
+    /// A [`ArcWake`] that can be used to re-schedule this [`Task`] for 
execution
+    waker: Arc<TaskWaker>,
+}
+
+impl std::fmt::Debug for Task {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let output = &self.context.pipelines[self.waker.pipeline].output;
+
+        f.debug_struct("Task")
+            .field("pipeline", &self.waker.pipeline)
+            .field("partition", &self.waker.partition)
+            .field("output", &output)
+            .finish()
+    }
+}
+
+impl Task {
+    fn handle_error(
+        &self,
+        partition: usize,
+        routable: &RoutablePipeline,
+        error: DataFusionError,
+    ) {
+        self.context.send_query_output(partition, Err(error));
+        if let Some(link) = routable.output {
+            trace!(
+                "Closing pipeline: {:?}, partition: {}, due to error",
+                link,
+                self.waker.partition,
+            );
+
+            self.context.pipelines[link.pipeline]
+                .pipeline
+                .close(link.child, self.waker.partition);
+        }
+    }
+
+    /// Call [`Pipeline::poll_partition`], attempting to make progress on 
query execution
+    pub fn do_work(self) {
+        assert!(is_worker(), "Task::do_work called outside of worker pool");
+        if self.context.is_cancelled() {
+            return;
+        }
+
+        // Capture the wake count prior to calling [`Pipeline::poll_partition`]
+        // this allows us to detect concurrent wake ups and handle them 
correctly
+        let wake_count = self.waker.wake_count.load(Ordering::SeqCst);
+
+        let node = self.waker.pipeline;
+        let partition = self.waker.partition;
+
+        let waker = futures::task::waker_ref(&self.waker);
+        let mut cx = Context::from_waker(&*waker);
+
+        let pipelines = &self.context.pipelines;
+        let routable = &pipelines[node];
+        match routable.pipeline.poll_partition(&mut cx, partition) {
+            Poll::Ready(Some(Ok(batch))) => {
+                trace!("Poll {:?}: Ok: {}", self, batch.num_rows());
+                match routable.output {
+                    Some(link) => {
+                        trace!(
+                            "Publishing batch to pipeline {:?} partition {}",
+                            link,
+                            partition
+                        );
+
+                        let r = pipelines[link.pipeline]
+                            .pipeline
+                            .push(batch, link.child, partition);
+
+                        if let Err(e) = r {
+                            self.handle_error(partition, routable, e);
+
+                            // Return without rescheduling this output again
+                            return;
+                        }
+                    }
+                    None => {
+                        trace!("Publishing batch to output");
+                        self.context.send_query_output(partition, Ok(batch))
+                    }
+                }
+
+                // Reschedule this pipeline again
+                //
+                // We want to prioritise running tasks triggered by the most 
recent
+                // batch, so reschedule with FIFO ordering
+                //
+                // Note: We must schedule after we have routed the batch, 
otherwise
+                // we introduce a potential ordering race where the newly 
scheduled
+                // task runs before this task finishes routing the output
+                spawn_local_fifo(self);
+            }
+            Poll::Ready(Some(Err(e))) => {
+                trace!("Poll {:?}: Error: {:?}", self, e);
+                self.handle_error(partition, routable, e)
+            }
+            Poll::Ready(None) => {
+                trace!("Poll {:?}: None", self);
+                match routable.output {
+                    Some(link) => {
+                        trace!("Closing pipeline: {:?}, partition: {}", link, 
partition);
+                        pipelines[link.pipeline]
+                            .pipeline
+                            .close(link.child, partition)
+                    }
+                    None => self.context.finish(partition),
+                }
+            }
+            Poll::Pending => {
+                trace!("Poll {:?}: Pending", self);
+                // Attempt to reset the wake count with the value obtained 
prior
+                // to calling [`Pipeline::poll_partition`].
+                //
+                // If this fails it indicates a wakeup was received whilst 
executing
+                // [`Pipeline::poll_partition`] and we should reschedule the 
task
+                let reset = self.waker.wake_count.compare_exchange(
+                    wake_count,
+                    0,
+                    Ordering::SeqCst,
+                    Ordering::SeqCst,
+                );
+
+                if reset.is_err() {
+                    trace!("Wakeup triggered whilst polling: {:?}", self);
+                    spawn_local(self);
+                }
+            }
+        }
+    }
+}
+
+/// The results of the execution of a query
+pub struct ExecutionResults {
+    /// [`ExecutionResultStream`] for each partition of this query
+    streams: Vec<ExecutionResultStream>,
+
+    /// Keep a reference to the [`ExecutionContext`] so it isn't dropped early
+    context: Arc<ExecutionContext>,
+}
+
+impl ExecutionResults {
+    /// Returns a [`SendableRecordBatchStream`] of this execution
+    ///
+    /// In the event of multiple output partitions, the output will be 
interleaved
+    pub fn stream(self) -> SendableRecordBatchStream {
+        let schema = self.context.schema.clone();
+        Box::pin(RecordBatchStreamAdapter::new(
+            schema,
+            futures::stream::select_all(self.streams),
+        ))
+    }
+
+    /// Returns a [`SendableRecordBatchStream`] for each partition of this 
execution
+    pub fn stream_partitioned(self) -> Vec<SendableRecordBatchStream> {
+        self.streams.into_iter().map(|s| Box::pin(s) as _).collect()
+    }
+}
+
+/// A result stream for the execution of a query
+struct ExecutionResultStream {
+    receiver: mpsc::UnboundedReceiver<Option<Result<RecordBatch>>>,
+
+    /// Keep a reference to the [`ExecutionContext`] so it isn't dropped early
+    context: Arc<ExecutionContext>,
+}
+
+impl Stream for ExecutionResultStream {
+    type Item = arrow::error::Result<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        let opt = ready!(self.receiver.poll_next_unpin(cx)).flatten();
+        Poll::Ready(opt.map(|r| r.map_err(|e| 
ArrowError::ExternalError(Box::new(e)))))
+    }
+}
+
+impl RecordBatchStream for ExecutionResultStream {
+    fn schema(&self) -> SchemaRef {
+        self.context.schema.clone()
+    }
+}
+
+/// The shared state of all [`Task`] created from the same [`PipelinePlan`]
+#[derive(Debug)]
+struct ExecutionContext {
+    /// Spawner for this query
+    spawner: Spawner,
+
+    /// List of pipelines that belong to this query, pipelines are addressed
+    /// based on their index within this list
+    pipelines: Vec<RoutablePipeline>,
+
+    /// Schema of this plans output
+    pub schema: SchemaRef,
+
+    /// The output streams, per partition, for this query's execution
+    output: Vec<mpsc::UnboundedSender<Option<Result<RecordBatch>>>>,
+}
+
+impl Drop for ExecutionContext {
+    fn drop(&mut self) {
+        debug!("ExecutionContext dropped");
+    }
+}
+
+impl ExecutionContext {
+    /// Returns `true` if this query has been dropped, specifically if the
+    /// stream returned by [`super::Scheduler::schedule`] has been dropped
+    fn is_cancelled(&self) -> bool {
+        self.output.iter().all(|x| x.is_closed())
+    }
+
+    /// Sends `output` to this query's output stream
+    fn send_query_output(&self, partition: usize, output: Result<RecordBatch>) 
{
+        let _ = self.output[partition].unbounded_send(Some(output));
+    }
+
+    /// Mark this partition as finished
+    fn finish(&self, partition: usize) {
+        let _ = self.output[partition].unbounded_send(None);
+    }
+}
+
+struct TaskWaker {
+    /// Store a weak reference to the [`ExecutionContext`] to avoid reference 
cycles if this
+    /// [`Waker`] is stored within a [`Pipeline`] owned by the 
[`ExecutionContext`]
+    context: Weak<ExecutionContext>,
+
+    /// A counter that stores the number of times this has been awoken
+    ///
+    /// A value > 0, implies the task is either in the ready queue or
+    /// currently being executed
+    ///
+    /// `TaskWaker::wake` always increments the `wake_count`, however, it only
+    /// re-enqueues the [`Task`] if the value prior to increment was 0
+    ///
+    /// This ensures that a given [`Task`] is not enqueued multiple times
+    ///
+    /// We store an integer, as opposed to a boolean, so that wake ups that
+    /// occur during [`Pipeline::poll_partition`] can be detected and handled
+    /// after it has finished executing
+    ///
+    wake_count: AtomicUsize,
+
+    /// The index of the pipeline within `query` to poll
+    pipeline: usize,
+
+    /// The partition of the pipeline within `query` to poll
+    partition: usize,

Review Comment:
   👍 Wake up the waker and re-enqueue the task.



##########
datafusion/core/src/scheduler/mod.rs:
##########
@@ -0,0 +1,454 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A [`Scheduler`] maintains a pool of dedicated worker threads on which
+//! query execution can be scheduled. This is based on the idea of 
[Morsel-Driven Parallelism]
+//! and is designed to decouple the execution parallelism from the parallelism 
expressed in
+//! the physical plan as partitions.
+//!
+//! # Implementation
+//!
+//! When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it 
up into smaller
+//! chunks called pipelines. Each pipeline may consist of one or more nodes 
from the
+//! [`ExecutionPlan`] tree.
+//!
+//! The scheduler then maintains a list of pending [`Task`], that identify a 
partition within
+//! a particular pipeline that may be able to make progress on some "morsel" 
of data. These
+//! [`Task`] are then scheduled on the worker pool, with a preference for 
scheduling work
+//! on a given "morsel" on the same thread that produced it.
+//!
+//! # Rayon
+//!
+//! Under-the-hood these [`Task`] are scheduled by [rayon], which is a 
lightweight, work-stealing
+//! scheduler optimised for CPU-bound workloads. Pipelines may exploit this 
fact, and use [rayon]'s
+//! structured concurrency primitives to express additional parallelism that 
may be exploited
+//! if there are idle threads available at runtime
+//!
+//! # Shutdown
+//!
+//! Queries scheduled on a [`Scheduler`] will run to completion even if the
+//! [`Scheduler`] is dropped
+//!
+//! [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf
+//! [rayon]: https://docs.rs/rayon/latest/rayon/
+//!
+//! # Example
+//!
+//! ```rust
+//! # use futures::TryStreamExt;
+//! # use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext};
+//! # use datafusion_scheduler::Scheduler;
+//!
+//! # #[tokio::main]
+//! # async fn main() {
+//! let scheduler = Scheduler::new(4);
+//! let config = SessionConfig::new().with_target_partitions(4);
+//! let context = SessionContext::with_config(config);
+//!
+//! context.register_csv("example", "../core/tests/example.csv", 
CsvReadOptions::new()).await.unwrap();
+//! let plan = context.sql("SELECT MIN(b) FROM example")
+//!     .await
+//!    .unwrap()
+//!    .create_physical_plan()
+//!    .await
+//!    .unwrap();
+//!
+//! let task = context.task_ctx();
+//! let stream = scheduler.schedule(plan, task).unwrap();
+//! let scheduled: Vec<_> = stream.try_collect().await.unwrap();
+//! # }
+//! ```
+//!
+
+use std::sync::Arc;
+
+use futures::stream::BoxStream;
+use log::{debug, error};
+
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_plan::ExecutionPlan;
+
+use plan::{PipelinePlan, PipelinePlanner, RoutablePipeline};
+use task::{spawn_plan, Task};
+
+use rayon::{ThreadPool, ThreadPoolBuilder};
+
+pub use task::ExecutionResults;
+
+mod pipeline;
+mod plan;
+mod task;
+
+/// Builder for a [`Scheduler`]
+#[derive(Debug)]
+pub struct SchedulerBuilder {
+    inner: ThreadPoolBuilder,
+}
+
+impl SchedulerBuilder {
+    /// Create a new [`SchedulerConfig`] with the provided number of threads
+    pub fn new(num_threads: usize) -> Self {
+        let builder = ThreadPoolBuilder::new()
+            .num_threads(num_threads)
+            .panic_handler(|p| error!("{}", format_worker_panic(p)))
+            .thread_name(|idx| format!("df-worker-{}", idx));
+
+        Self { inner: builder }
+    }
+
+    /// Registers a custom panic handler
+    #[cfg(test)]
+    fn panic_handler<H>(self, panic_handler: H) -> Self
+    where
+        H: Fn(Box<dyn std::any::Any + Send>) + Send + Sync + 'static,
+    {
+        Self {
+            inner: self.inner.panic_handler(panic_handler),
+        }
+    }
+
+    /// Build a new [`Scheduler`]
+    fn build(self) -> Scheduler {
+        Scheduler {
+            pool: Arc::new(self.inner.build().unwrap()),
+        }
+    }
+}
+
+/// A [`Scheduler`] that can be used to schedule [`ExecutionPlan`] on a 
dedicated thread pool
+pub struct Scheduler {
+    pool: Arc<ThreadPool>,
+}
+
+impl Scheduler {
+    /// Create a new [`Scheduler`] with `num_threads` new threads in a 
dedicated thread pool
+    pub fn new(num_threads: usize) -> Self {
+        SchedulerBuilder::new(num_threads).build()
+    }
+
+    /// Schedule the provided [`ExecutionPlan`] on this [`Scheduler`].
+    ///
+    /// Returns a [`ExecutionResults`] that can be used to receive results as 
they are produced,
+    /// as a [`futures::Stream`] of [`RecordBatch`]
+    pub fn schedule(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        context: Arc<TaskContext>,
+    ) -> Result<ExecutionResults> {
+        let plan = PipelinePlanner::new(plan, context).build()?;
+        Ok(self.schedule_plan(plan))
+    }
+
+    /// Schedule the provided [`PipelinePlan`] on this [`Scheduler`].
+    pub(crate) fn schedule_plan(&self, plan: PipelinePlan) -> ExecutionResults 
{
+        spawn_plan(plan, self.spawner())
+    }
+
+    fn spawner(&self) -> Spawner {
+        Spawner {
+            pool: self.pool.clone(),
+        }
+    }
+}
+
+/// Formats a panic message for a worker
+fn format_worker_panic(panic: Box<dyn std::any::Any + Send>) -> String {
+    let maybe_idx = rayon::current_thread_index();
+    let worker: &dyn std::fmt::Display = match &maybe_idx {
+        Some(idx) => idx,
+        None => &"UNKNOWN",
+    };
+
+    let message = if let Some(msg) = panic.downcast_ref::<&str>() {
+        *msg
+    } else if let Some(msg) = panic.downcast_ref::<String>() {
+        msg.as_str()
+    } else {
+        "UNKNOWN"
+    };
+
+    format!("worker {} panicked with: {}", worker, message)
+}
+
+/// Returns `true` if the current thread is a rayon worker thread
+///
+/// Note: if there are multiple rayon pools, this will return `true` if the 
current thread
+/// belongs to ANY rayon pool, even if this isn't a worker thread of a 
[`Scheduler`] instance
+fn is_worker() -> bool {
+    rayon::current_thread_index().is_some()
+}
+
+/// Spawn a [`Task`] onto the local workers thread pool
+///
+/// There is no guaranteed order of execution, as workers may steal at any 
time. However,
+/// `spawn_local` will append to the front of the current worker's queue, 
workers pop tasks from
+/// the front of their queue, and steal tasks from the back of other workers 
queues

Review Comment:
   👍 TIL



##########
datafusion/core/src/scheduler/pipeline/mod.rs:
##########
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::task::{Context, Poll};
+
+use arrow::record_batch::RecordBatch;
+
+use crate::error::Result;
+
+pub mod execution;
+pub mod repartition;
+
+/// A push-based interface used by the scheduler to drive query execution
+///
+/// A pipeline processes data from one or more input partitions, producing 
output
+/// to one or more output partitions. As a [`Pipeline`] may drawn on input from
+/// more than one upstream [`Pipeline`], input partitions are identified by 
both
+/// a child index, and a partition index, whereas output partitions are only
+/// identified by a partition index.
+///
+/// This is not intended as an eventual replacement for the physical plan 
representation
+/// within DataFusion, [`ExecutionPlan`], but rather a generic interface that
+/// parts of the physical plan are "compiled" into by the scheduler.
+///
+/// # Eager vs Lazy Execution
+///
+/// Whether computation is eagerly done on push, or lazily done on pull, is
+/// intentionally left as an implementation detail of the [`Pipeline`]
+///
+/// This allows flexibility to support the following different patterns, and 
potentially more:
+///
+/// An eager, push-based pipeline, that processes a batch synchronously in 
[`Pipeline::push`]
+/// and immediately wakes the corresponding output partition.
+///
+/// A parallel, push-based pipeline, that enqueues the processing of a batch 
to the rayon
+/// thread pool in [`Pipeline::push`], and wakes the corresponding output 
partition when
+/// the job completes. Order and non-order preserving variants are possible
+///
+/// A merge pipeline which combines data from one or more input partitions 
into one or
+/// more output partitions. [`Pipeline::push`] adds data to an input buffer, 
and wakes
+/// any output partitions that may now be able to make progress. This may be 
none if
+/// the operator is waiting on data from a different input partition
+///
+/// An aggregation pipeline which combines data from one or more input 
partitions into
+/// a single output partition. [`Pipeline::push`] would eagerly update the 
computed
+/// aggregates, and the final [`Pipeline::close`] trigger flushing these to 
the output.
+/// It would also be possible to flush once the partial aggregates reach a 
certain size

Review Comment:
   👍



##########
datafusion/core/src/scheduler/mod.rs:
##########
@@ -0,0 +1,454 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A [`Scheduler`] maintains a pool of dedicated worker threads on which
+//! query execution can be scheduled. This is based on the idea of 
[Morsel-Driven Parallelism]
+//! and is designed to decouple the execution parallelism from the parallelism 
expressed in
+//! the physical plan as partitions.
+//!
+//! # Implementation
+//!
+//! When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it 
up into smaller
+//! chunks called pipelines. Each pipeline may consist of one or more nodes 
from the
+//! [`ExecutionPlan`] tree.
+//!
+//! The scheduler then maintains a list of pending [`Task`], that identify a 
partition within
+//! a particular pipeline that may be able to make progress on some "morsel" 
of data. These
+//! [`Task`] are then scheduled on the worker pool, with a preference for 
scheduling work
+//! on a given "morsel" on the same thread that produced it.
+//!
+//! # Rayon
+//!
+//! Under-the-hood these [`Task`] are scheduled by [rayon], which is a 
lightweight, work-stealing
+//! scheduler optimised for CPU-bound workloads. Pipelines may exploit this 
fact, and use [rayon]'s
+//! structured concurrency primitives to express additional parallelism that 
may be exploited
+//! if there are idle threads available at runtime
+//!
+//! # Shutdown
+//!
+//! Queries scheduled on a [`Scheduler`] will run to completion even if the
+//! [`Scheduler`] is dropped
+//!
+//! [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf
+//! [rayon]: https://docs.rs/rayon/latest/rayon/
+//!
+//! # Example
+//!
+//! ```rust
+//! # use futures::TryStreamExt;
+//! # use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext};
+//! # use datafusion_scheduler::Scheduler;
+//!
+//! # #[tokio::main]
+//! # async fn main() {
+//! let scheduler = Scheduler::new(4);
+//! let config = SessionConfig::new().with_target_partitions(4);
+//! let context = SessionContext::with_config(config);
+//!
+//! context.register_csv("example", "../core/tests/example.csv", 
CsvReadOptions::new()).await.unwrap();
+//! let plan = context.sql("SELECT MIN(b) FROM example")
+//!     .await
+//!    .unwrap()
+//!    .create_physical_plan()
+//!    .await
+//!    .unwrap();
+//!
+//! let task = context.task_ctx();
+//! let stream = scheduler.schedule(plan, task).unwrap();
+//! let scheduled: Vec<_> = stream.try_collect().await.unwrap();
+//! # }
+//! ```
+//!
+
+use std::sync::Arc;
+
+use futures::stream::BoxStream;
+use log::{debug, error};
+
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_plan::ExecutionPlan;
+
+use plan::{PipelinePlan, PipelinePlanner, RoutablePipeline};
+use task::{spawn_plan, Task};
+
+use rayon::{ThreadPool, ThreadPoolBuilder};
+
+pub use task::ExecutionResults;
+
+mod pipeline;
+mod plan;
+mod task;
+
+/// Builder for a [`Scheduler`]
+#[derive(Debug)]
+pub struct SchedulerBuilder {
+    inner: ThreadPoolBuilder,
+}
+
+impl SchedulerBuilder {
+    /// Create a new [`SchedulerConfig`] with the provided number of threads
+    pub fn new(num_threads: usize) -> Self {
+        let builder = ThreadPoolBuilder::new()
+            .num_threads(num_threads)
+            .panic_handler(|p| error!("{}", format_worker_panic(p)))
+            .thread_name(|idx| format!("df-worker-{}", idx));
+
+        Self { inner: builder }
+    }
+
+    /// Registers a custom panic handler
+    #[cfg(test)]
+    fn panic_handler<H>(self, panic_handler: H) -> Self
+    where
+        H: Fn(Box<dyn std::any::Any + Send>) + Send + Sync + 'static,
+    {
+        Self {
+            inner: self.inner.panic_handler(panic_handler),
+        }
+    }
+
+    /// Build a new [`Scheduler`]
+    fn build(self) -> Scheduler {
+        Scheduler {
+            pool: Arc::new(self.inner.build().unwrap()),
+        }
+    }
+}
+
+/// A [`Scheduler`] that can be used to schedule [`ExecutionPlan`] on a 
dedicated thread pool
+pub struct Scheduler {
+    pool: Arc<ThreadPool>,
+}
+
+impl Scheduler {
+    /// Create a new [`Scheduler`] with `num_threads` new threads in a 
dedicated thread pool
+    pub fn new(num_threads: usize) -> Self {
+        SchedulerBuilder::new(num_threads).build()
+    }
+
+    /// Schedule the provided [`ExecutionPlan`] on this [`Scheduler`].
+    ///
+    /// Returns a [`ExecutionResults`] that can be used to receive results as 
they are produced,
+    /// as a [`futures::Stream`] of [`RecordBatch`]
+    pub fn schedule(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        context: Arc<TaskContext>,
+    ) -> Result<ExecutionResults> {
+        let plan = PipelinePlanner::new(plan, context).build()?;
+        Ok(self.schedule_plan(plan))
+    }
+
+    /// Schedule the provided [`PipelinePlan`] on this [`Scheduler`].
+    pub(crate) fn schedule_plan(&self, plan: PipelinePlan) -> ExecutionResults 
{
+        spawn_plan(plan, self.spawner())
+    }
+
+    fn spawner(&self) -> Spawner {
+        Spawner {
+            pool: self.pool.clone(),
+        }
+    }
+}
+
+/// Formats a panic message for a worker
+fn format_worker_panic(panic: Box<dyn std::any::Any + Send>) -> String {
+    let maybe_idx = rayon::current_thread_index();
+    let worker: &dyn std::fmt::Display = match &maybe_idx {
+        Some(idx) => idx,
+        None => &"UNKNOWN",
+    };
+
+    let message = if let Some(msg) = panic.downcast_ref::<&str>() {
+        *msg
+    } else if let Some(msg) = panic.downcast_ref::<String>() {
+        msg.as_str()
+    } else {
+        "UNKNOWN"
+    };
+
+    format!("worker {} panicked with: {}", worker, message)
+}
+
+/// Returns `true` if the current thread is a rayon worker thread
+///
+/// Note: if there are multiple rayon pools, this will return `true` if the 
current thread
+/// belongs to ANY rayon pool, even if this isn't a worker thread of a 
[`Scheduler`] instance
+fn is_worker() -> bool {
+    rayon::current_thread_index().is_some()
+}
+
+/// Spawn a [`Task`] onto the local workers thread pool
+///
+/// There is no guaranteed order of execution, as workers may steal at any 
time. However,
+/// `spawn_local` will append to the front of the current worker's queue, 
workers pop tasks from
+/// the front of their queue, and steal tasks from the back of other workers 
queues
+///
+/// The effect is that tasks spawned using `spawn_local` will typically be 
prioritised in
+/// a LIFO order, however, this should not be relied upon
+fn spawn_local(task: Task) {
+    // Verify is a worker thread to avoid creating a global pool
+    assert!(is_worker(), "must be called from a worker");
+    rayon::spawn(|| task.do_work())
+}
+
+/// Spawn a [`Task`] onto the local workers thread pool with fifo ordering
+///
+/// There is no guaranteed order of execution, as workers may steal at any 
time. However,
+/// `spawn_local_fifo` will append to the back of the current worker's queue, 
workers pop tasks
+/// from the front of their queue, and steal tasks from the back of other 
workers queues
+///
+/// The effect is that tasks spawned using `spawn_local_fifo` will typically 
be prioritised
+/// in a FIFO order, however, this should not be relied upon
+fn spawn_local_fifo(task: Task) {
+    // Verify is a worker thread to avoid creating a global pool
+    assert!(is_worker(), "must be called from a worker");
+    rayon::spawn_fifo(|| task.do_work())
+}
+
+#[derive(Debug, Clone)]
+pub struct Spawner {
+    pool: Arc<ThreadPool>,
+}
+
+impl Spawner {
+    pub fn spawn(&self, task: Task) {
+        debug!("Spawning {:?} to any worker", task);
+        self.pool.spawn(move || task.do_work());
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use arrow::util::pretty::pretty_format_batches;
+    use std::ops::Range;
+    use std::panic::panic_any;
+
+    use futures::{StreamExt, TryStreamExt};
+    use log::info;
+    use rand::distributions::uniform::SampleUniform;
+    use rand::{thread_rng, Rng};
+
+    use crate::arrow::array::{ArrayRef, PrimitiveArray};
+    use crate::arrow::datatypes::{ArrowPrimitiveType, Float64Type, Int32Type};
+    use crate::arrow::record_batch::RecordBatch;
+    use crate::datasource::{MemTable, TableProvider};
+    use crate::physical_plan::displayable;
+    use crate::prelude::{SessionConfig, SessionContext};
+
+    use super::*;
+
+    fn generate_primitive<T, R>(
+        rng: &mut R,
+        len: usize,
+        valid_percent: f64,
+        range: Range<T::Native>,
+    ) -> ArrayRef
+    where
+        T: ArrowPrimitiveType,
+        T::Native: SampleUniform,
+        R: Rng,
+    {
+        Arc::new(PrimitiveArray::<T>::from_iter((0..len).map(|_| {
+            rng.gen_bool(valid_percent)
+                .then(|| rng.gen_range(range.clone()))
+        })))
+    }
+
+    fn generate_batch<R: Rng>(
+        rng: &mut R,
+        row_count: usize,
+        id_offset: i32,
+    ) -> RecordBatch {
+        let id_range = id_offset..(row_count as i32 + id_offset);
+        let a = generate_primitive::<Int32Type, _>(rng, row_count, 0.5, 
0..1000);
+        let b = generate_primitive::<Float64Type, _>(rng, row_count, 0.5, 0. 
..1000.);
+        let id = PrimitiveArray::<Int32Type>::from_iter_values(id_range);
+
+        RecordBatch::try_from_iter_with_nullable([
+            ("a", a, true),
+            ("b", b, true),
+            ("id", Arc::new(id), false),
+        ])
+        .unwrap()
+    }
+
+    const BATCHES_PER_PARTITION: usize = 20;
+    const ROWS_PER_BATCH: usize = 100;
+    const NUM_PARTITIONS: usize = 2;
+
+    fn make_batches() -> Vec<Vec<RecordBatch>> {
+        let mut rng = thread_rng();
+
+        let mut id_offset = 0;
+
+        (0..NUM_PARTITIONS)
+            .map(|_| {
+                (0..BATCHES_PER_PARTITION)
+                    .map(|_| {
+                        let batch = generate_batch(&mut rng, ROWS_PER_BATCH, 
id_offset);
+                        id_offset += ROWS_PER_BATCH as i32;
+                        batch
+                    })
+                    .collect()
+            })
+            .collect()
+    }
+
+    fn make_provider() -> Arc<dyn TableProvider> {
+        let batches = make_batches();
+        let schema = batches.first().unwrap().first().unwrap().schema();
+        Arc::new(MemTable::try_new(schema, make_batches()).unwrap())
+    }
+
+    fn init_logging() {
+        let _ = env_logger::builder().is_test(true).try_init();
+    }
+
+    #[tokio::test]
+    async fn test_simple() {
+        init_logging();
+
+        let scheduler = Scheduler::new(4);
+
+        let config = SessionConfig::new().with_target_partitions(4);
+        let context = SessionContext::with_config(config);
+
+        context.register_table("table1", make_provider()).unwrap();
+        context.register_table("table2", make_provider()).unwrap();
+
+        let queries = [
+            "select * from table1 order by id",
+            "select * from table1 where table1.a > 100 order by id",
+            "select distinct a from table1 where table1.b > 100 order by a",
+            "select * from table1 join table2 on table1.id = table2.id order 
by table1.id",
+            "select id from table1 union all select id from table2 order by 
id",
+            "select id from table1 union all select id from table2 where a > 
100 order by id",
+            "select id, b from (select id, b from table1 union all select id, 
b from table2 where a > 100 order by id) as t where b > 10 order by id, b",
+            "select id, MIN(b), MAX(b), AVG(b) from table1 group by id order 
by id",
+            "select count(*) from table1 where table1.a > 4",

Review Comment:
   It would be beneficial if we could include one or several SQLs with their 
execution pipelines in the upper module doc.



##########
datafusion/core/src/scheduler/pipeline/mod.rs:
##########
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::task::{Context, Poll};
+
+use arrow::record_batch::RecordBatch;
+
+use crate::error::Result;
+
+pub mod execution;
+pub mod repartition;
+
+/// A push-based interface used by the scheduler to drive query execution
+///
+/// A pipeline processes data from one or more input partitions, producing 
output
+/// to one or more output partitions. As a [`Pipeline`] may drawn on input from
+/// more than one upstream [`Pipeline`], input partitions are identified by 
both
+/// a child index, and a partition index, whereas output partitions are only
+/// identified by a partition index.

Review Comment:
   Pipeline inputs are identified by both a upstream index, and a partition 
index, whereas pipeline outputs are only
   identified by a partition index.



##########
datafusion/core/src/scheduler/plan.rs:
##########
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::SchemaRef;
+use std::sync::Arc;
+
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::{ExecutionPlan, Partitioning};
+
+use crate::scheduler::pipeline::{
+    execution::ExecutionPipeline, repartition::RepartitionPipeline, Pipeline,
+};
+
+/// Identifies the [`Pipeline`] within the [`PipelinePlan`] to route output to
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub struct OutputLink {
+    /// The index of the [`Pipeline`] in [`PipelinePlan`] to route output to
+    pub pipeline: usize,
+
+    /// The child of the [`Pipeline`] to route output to
+    pub child: usize,
+}
+
+/// Combines a [`Pipeline`] with an [`OutputLink`] identifying where to send 
its output
+#[derive(Debug)]
+pub struct RoutablePipeline {
+    /// The pipeline that produces data
+    pub pipeline: Box<dyn Pipeline>,
+
+    /// Where to send output the output of `pipeline`
+    ///
+    /// If `None`, the output should be sent to the query output
+    pub output: Option<OutputLink>,
+}
+
+/// [`PipelinePlan`] is the scheduler's representation of the 
[`ExecutionPlan`] passed to
+/// [`super::Scheduler::schedule`]. It combines the list of [Pipeline`] with 
the information
+/// necessary to route output from one stage to the next
+#[derive(Debug)]
+pub struct PipelinePlan {
+    /// Schema of this plans output
+    pub schema: SchemaRef,
+
+    /// Number of output partitions
+    pub output_partitions: usize,
+
+    /// Pipelines that comprise this plan
+    pub pipelines: Vec<RoutablePipeline>,
+}
+
+/// When converting [`ExecutionPlan`] to [`Pipeline`] we may wish to group
+/// together multiple operators, [`OperatorGroup`] stores this state
+struct OperatorGroup {
+    /// Where to route the output of the eventual [`Pipeline`]
+    output: Option<OutputLink>,
+
+    /// The [`ExecutionPlan`] from which to start recursing
+    root: Arc<dyn ExecutionPlan>,
+
+    /// The number of times to recurse into the [`ExecutionPlan`]'s children
+    depth: usize,
+}
+
+/// A utility struct to assist converting from [`ExecutionPlan`] to 
[`PipelinePlan`]
+///
+/// The [`ExecutionPlan`] is visited in a depth-first fashion, gradually 
building
+/// up the [`RoutablePipeline`] for the [`PipelinePlan`]. As nodes are visited 
depth-first,
+/// a node is visited only after its parent has been.
+pub struct PipelinePlanner {
+    task_context: Arc<TaskContext>,
+
+    /// The schema of this plan
+    schema: SchemaRef,
+
+    /// The number of output partitions of this plan
+    output_partitions: usize,
+
+    /// The current list of completed pipelines
+    completed: Vec<RoutablePipeline>,
+
+    /// A list of [`ExecutionPlan`] still to visit, along with
+    /// where they should route their output
+    to_visit: Vec<(Arc<dyn ExecutionPlan>, Option<OutputLink>)>,
+
+    /// Stores one or more operators to combine
+    /// together into a single [`ExecutionPipeline`]
+    execution_operators: Option<OperatorGroup>,
+}
+
+impl PipelinePlanner {
+    pub fn new(plan: Arc<dyn ExecutionPlan>, task_context: Arc<TaskContext>) 
-> Self {
+        let schema = plan.schema();
+        let output_partitions = plan.output_partitioning().partition_count();
+        Self {
+            completed: vec![],
+            to_visit: vec![(plan, None)],
+            task_context,
+            execution_operators: None,
+            schema,
+            output_partitions,
+        }
+    }
+
+    /// Flush the current group of operators stored in `execution_operators`
+    /// into a single [`ExecutionPipeline]
+    fn flush_exec(&mut self) -> Result<usize> {
+        let group = self.execution_operators.take().unwrap();
+        let node_idx = self.completed.len();
+        self.completed.push(RoutablePipeline {
+            pipeline: Box::new(ExecutionPipeline::new(
+                group.root,
+                self.task_context.clone(),
+                group.depth,
+            )?),
+            output: group.output,
+        });
+        Ok(node_idx)
+    }
+
+    /// Visit a non-special cased [`ExecutionPlan`]
+    fn visit_exec(
+        &mut self,
+        plan: Arc<dyn ExecutionPlan>,
+        parent: Option<OutputLink>,
+    ) -> Result<()> {
+        let children = plan.children();
+
+        // Add the operator to the current group of operators to be combined
+        // into a single [`ExecutionPipeline`].
+        //
+        // TODO: More sophisticated policy, just because we can combine them 
doesn't mean we should

Review Comment:
   The policy will be best combined with a new push-aware ExecutionPlan API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to