This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new dc76ec1a8 Introduce new optional scheduler, using Morsel-driven 
Parallelism + rayon (#2199) (#2226)
dc76ec1a8 is described below

commit dc76ec1a83c264bdaebdab95a63daa71f2cae743
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed May 4 15:12:16 2022 +0100

    Introduce new optional scheduler, using Morsel-driven Parallelism + rayon 
(#2199) (#2226)
    
    * Morsel-driven Parallelism using rayon (#2199)
    
    * Fix LIFO spawn ordering
    
    * Further docs for ExecutionPipeline
    
    * Deduplicate concurrent wakes
    
    * Add license headers
    
    * Sort Cargo.toml
    
    * Revert accidental change to ParquetExec
    
    * Handle wakeups triggered by other threads
    
    * Use SeqCst memory ordering
    
    * Review feedback
    
    * Add panic handler
    
    * Cleanup structs
    
    Add test of tokio interoperation
    
    * Review feedback
    
    * Use BatchPartitioner
    
    Cleanup error handling
    
    * Clarify shutdown characteristics
    
    * Fix racy test_panic
    
    * Don't overload Query nomenclature
    
    * Rename QueryResults to ExecutionResults
    
    * Further review feedback
    
    * Merge scheduler into datafusion/core
    
    * Review feedback
    
    * Fix partitioned execution
    
    * Format
    
    * Format Cargo.toml
    
    * Fix doc link
---
 CONTRIBUTING.md                                    |   2 +-
 Cargo.toml                                         |   2 +-
 datafusion/core/Cargo.toml                         |   9 +-
 datafusion/core/benches/parquet_query_sql.rs       |  65 ++-
 datafusion/core/src/lib.rs                         |   2 +
 datafusion/core/src/scheduler/mod.rs               | 454 +++++++++++++++++++
 .../core/src/scheduler/pipeline/execution.rs       | 330 ++++++++++++++
 datafusion/core/src/scheduler/pipeline/mod.rs      | 110 +++++
 .../core/src/scheduler/pipeline/repartition.rs     | 157 +++++++
 datafusion/core/src/scheduler/plan.rs              | 296 ++++++++++++
 datafusion/core/src/scheduler/task.rs              | 497 +++++++++++++++++++++
 11 files changed, 1905 insertions(+), 19 deletions(-)

diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index ab3381dff..4f0fe7163 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -150,7 +150,7 @@ The parquet SQL benchmarks can be run with
  cargo bench --bench parquet_query_sql
 ```
 
-These randomly generate a parquet file, and then benchmark queries sourced 
from [parquet_query_sql.sql](./datafusion/benches/parquet_query_sql.sql) 
against it. This can therefore be a quick way to add coverage of particular 
query and/or data paths.
+These randomly generate a parquet file, and then benchmark queries sourced 
from [parquet_query_sql.sql](./datafusion/core/benches/parquet_query_sql.sql) 
against it. This can therefore be a quick way to add coverage of particular 
query and/or data paths.
 
 If the environment variable `PARQUET_FILE` is set, the benchmark will run 
queries against this file instead of a randomly generated one. This can be 
useful for performing multiple runs, potentially with different code, against 
the same source data, or for testing against a custom dataset.
 
diff --git a/Cargo.toml b/Cargo.toml
index fefd5679a..297b394b4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,8 +17,8 @@
 
 [workspace]
 members = [
-    "datafusion/core",
     "datafusion/common",
+    "datafusion/core",
     "datafusion/expr",
     "datafusion/jit",
     "datafusion/physical-expr",
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 551d3e59b..6dc8acb3d 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -24,7 +24,7 @@ repository = "https://github.com/apache/arrow-datafusion";
 readme = "../README.md"
 authors = ["Apache Arrow <[email protected]>"]
 license = "Apache-2.0"
-keywords = [ "arrow", "query", "sql" ]
+keywords = ["arrow", "query", "sql"]
 include = [
     "benches/*.rs",
     "src/**/*.rs",
@@ -50,6 +50,8 @@ pyarrow = ["pyo3", "arrow/pyarrow", 
"datafusion-common/pyarrow"]
 regex_expressions = ["datafusion-physical-expr/regex_expressions"]
 # Used to enable row format experiment
 row = ["datafusion-row"]
+# Used to enable scheduler
+scheduler = ["rayon"]
 simd = ["arrow/simd"]
 unicode_expressions = ["datafusion-physical-expr/regex_expressions"]
 
@@ -75,9 +77,10 @@ ordered-float = "3.0"
 parking_lot = "0.12"
 parquet = { version = "13", features = ["arrow"] }
 paste = "^1.0"
-pin-project-lite= "^0.2.7"
+pin-project-lite = "^0.2.7"
 pyo3 = { version = "0.16", optional = true }
 rand = "0.8"
+rayon = { version = "1.5", optional = true }
 smallvec = { version = "1.6", features = ["union"] }
 sqlparser = "0.16"
 tempfile = "3"
@@ -88,6 +91,7 @@ uuid = { version = "1.0", features = ["v4"] }
 [dev-dependencies]
 criterion = "0.3"
 doc-comment = "0.3"
+env_logger = "0.9"
 fuzz-utils = { path = "fuzz-utils" }
 
 [[bench]]
@@ -121,6 +125,7 @@ name = "physical_plan"
 [[bench]]
 harness = false
 name = "parquet_query_sql"
+required-features = ["scheduler"]
 
 [[bench]]
 harness = false
diff --git a/datafusion/core/benches/parquet_query_sql.rs 
b/datafusion/core/benches/parquet_query_sql.rs
index 08156fad4..26a041fb4 100644
--- a/datafusion/core/benches/parquet_query_sql.rs
+++ b/datafusion/core/benches/parquet_query_sql.rs
@@ -24,7 +24,9 @@ use arrow::datatypes::{
 };
 use arrow::record_batch::RecordBatch;
 use criterion::{criterion_group, criterion_main, Criterion};
-use datafusion::prelude::{ParquetReadOptions, SessionContext};
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion::scheduler::Scheduler;
+use futures::stream::StreamExt;
 use parquet::arrow::ArrowWriter;
 use parquet::file::properties::{WriterProperties, WriterVersion};
 use rand::distributions::uniform::SampleUniform;
@@ -37,7 +39,6 @@ use std::path::Path;
 use std::sync::Arc;
 use std::time::Instant;
 use tempfile::NamedTempFile;
-use tokio_stream::StreamExt;
 
 /// The number of batches to write
 const NUM_BATCHES: usize = 2048;
@@ -193,15 +194,24 @@ fn criterion_benchmark(c: &mut Criterion) {
     assert!(Path::new(&file_path).exists(), "path not found");
     println!("Using parquet file {}", file_path);
 
-    let context = SessionContext::new();
+    let partitions = 4;
+    let config = SessionConfig::new().with_target_partitions(partitions);
+    let context = SessionContext::with_config(config);
 
-    let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap();
-    rt.block_on(context.register_parquet(
-        "t",
-        file_path.as_str(),
-        ParquetReadOptions::default(),
-    ))
-    .unwrap();
+    let scheduler = Scheduler::new(partitions);
+
+    let local_rt = tokio::runtime::Builder::new_current_thread()
+        .build()
+        .unwrap();
+
+    let query_rt = tokio::runtime::Builder::new_multi_thread()
+        .worker_threads(partitions)
+        .build()
+        .unwrap();
+
+    local_rt
+        .block_on(context.register_parquet("t", file_path.as_str(), 
Default::default()))
+        .unwrap();
 
     // We read the queries from a file so they can be changed without 
recompiling the benchmark
     let mut queries_file = 
File::open("benches/parquet_query_sql.sql").unwrap();
@@ -220,17 +230,42 @@ fn criterion_benchmark(c: &mut Criterion) {
             continue;
         }
 
-        let query = query.as_str();
-        c.bench_function(query, |b| {
+        c.bench_function(&format!("tokio: {}", query), |b| {
             b.iter(|| {
+                let query = query.clone();
                 let context = context.clone();
-                rt.block_on(async move {
-                    let query = context.sql(query).await.unwrap();
+                let (sender, mut receiver) = 
futures::channel::mpsc::unbounded();
+
+                // Spawn work to a separate tokio thread pool
+                query_rt.spawn(async move {
+                    let query = context.sql(&query).await.unwrap();
                     let mut stream = query.execute_stream().await.unwrap();
-                    while criterion::black_box(stream.next().await).is_some() 
{}
+
+                    while let Some(next) = stream.next().await {
+                        sender.unbounded_send(next).unwrap();
+                    }
+                });
+
+                local_rt.block_on(async {
+                    while receiver.next().await.transpose().unwrap().is_some() 
{}
                 })
             });
         });
+
+        c.bench_function(&format!("scheduled: {}", query), |b| {
+            b.iter(|| {
+                let query = query.clone();
+                let context = context.clone();
+
+                local_rt.block_on(async {
+                    let query = context.sql(&query).await.unwrap();
+                    let plan = query.create_physical_plan().await.unwrap();
+                    let mut stream =
+                        scheduler.schedule(plan, context.task_ctx()).unwrap();
+                    while stream.next().await.transpose().unwrap().is_some() {}
+                });
+            });
+        });
     }
 
     // Temporary file must outlive the benchmarks, it is deleted when dropped
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index 055a17f4e..c598d9a33 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -218,6 +218,8 @@ pub mod physical_optimizer;
 pub mod physical_plan;
 pub mod prelude;
 pub mod scalar;
+#[cfg(feature = "scheduler")]
+pub mod scheduler;
 pub mod sql;
 pub mod variable;
 
diff --git a/datafusion/core/src/scheduler/mod.rs 
b/datafusion/core/src/scheduler/mod.rs
new file mode 100644
index 000000000..a765ddf83
--- /dev/null
+++ b/datafusion/core/src/scheduler/mod.rs
@@ -0,0 +1,454 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A [`Scheduler`] maintains a pool of dedicated worker threads on which
+//! query execution can be scheduled. This is based on the idea of 
[Morsel-Driven Parallelism]
+//! and is designed to decouple the execution parallelism from the parallelism 
expressed in
+//! the physical plan as partitions.
+//!
+//! # Implementation
+//!
+//! When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it 
up into smaller
+//! chunks called pipelines. Each pipeline may consist of one or more nodes 
from the
+//! [`ExecutionPlan`] tree.
+//!
+//! The scheduler then maintains a list of pending [`Task`], that identify a 
partition within
+//! a particular pipeline that may be able to make progress on some "morsel" 
of data. These
+//! [`Task`] are then scheduled on the worker pool, with a preference for 
scheduling work
+//! on a given "morsel" on the same thread that produced it.
+//!
+//! # Rayon
+//!
+//! Under-the-hood these [`Task`] are scheduled by [rayon], which is a 
lightweight, work-stealing
+//! scheduler optimised for CPU-bound workloads. Pipelines may exploit this 
fact, and use [rayon]'s
+//! structured concurrency primitives to express additional parallelism that 
may be exploited
+//! if there are idle threads available at runtime
+//!
+//! # Shutdown
+//!
+//! Queries scheduled on a [`Scheduler`] will run to completion even if the
+//! [`Scheduler`] is dropped
+//!
+//! [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf
+//! [rayon]: https://docs.rs/rayon/latest/rayon/
+//!
+//! # Example
+//!
+//! ```rust
+//! # use futures::TryStreamExt;
+//! # use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext};
+//! # use datafusion_scheduler::Scheduler;
+//!
+//! # #[tokio::main]
+//! # async fn main() {
+//! let scheduler = Scheduler::new(4);
+//! let config = SessionConfig::new().with_target_partitions(4);
+//! let context = SessionContext::with_config(config);
+//!
+//! context.register_csv("example", "../core/tests/example.csv", 
CsvReadOptions::new()).await.unwrap();
+//! let plan = context.sql("SELECT MIN(b) FROM example")
+//!     .await
+//!    .unwrap()
+//!    .create_physical_plan()
+//!    .await
+//!    .unwrap();
+//!
+//! let task = context.task_ctx();
+//! let stream = scheduler.schedule(plan, task).unwrap();
+//! let scheduled: Vec<_> = stream.try_collect().await.unwrap();
+//! # }
+//! ```
+//!
+
+use std::sync::Arc;
+
+use futures::stream::BoxStream;
+use log::{debug, error};
+
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_plan::ExecutionPlan;
+
+use plan::{PipelinePlan, PipelinePlanner, RoutablePipeline};
+use task::{spawn_plan, Task};
+
+use rayon::{ThreadPool, ThreadPoolBuilder};
+
+pub use task::ExecutionResults;
+
+mod pipeline;
+mod plan;
+mod task;
+
+/// Builder for a [`Scheduler`]
+#[derive(Debug)]
+pub struct SchedulerBuilder {
+    inner: ThreadPoolBuilder,
+}
+
+impl SchedulerBuilder {
+    /// Create a new [`SchedulerConfig`] with the provided number of threads
+    pub fn new(num_threads: usize) -> Self {
+        let builder = ThreadPoolBuilder::new()
+            .num_threads(num_threads)
+            .panic_handler(|p| error!("{}", format_worker_panic(p)))
+            .thread_name(|idx| format!("df-worker-{}", idx));
+
+        Self { inner: builder }
+    }
+
+    /// Registers a custom panic handler
+    #[cfg(test)]
+    fn panic_handler<H>(self, panic_handler: H) -> Self
+    where
+        H: Fn(Box<dyn std::any::Any + Send>) + Send + Sync + 'static,
+    {
+        Self {
+            inner: self.inner.panic_handler(panic_handler),
+        }
+    }
+
+    /// Build a new [`Scheduler`]
+    fn build(self) -> Scheduler {
+        Scheduler {
+            pool: Arc::new(self.inner.build().unwrap()),
+        }
+    }
+}
+
+/// A [`Scheduler`] that can be used to schedule [`ExecutionPlan`] on a 
dedicated thread pool
+pub struct Scheduler {
+    pool: Arc<ThreadPool>,
+}
+
+impl Scheduler {
+    /// Create a new [`Scheduler`] with `num_threads` new threads in a 
dedicated thread pool
+    pub fn new(num_threads: usize) -> Self {
+        SchedulerBuilder::new(num_threads).build()
+    }
+
+    /// Schedule the provided [`ExecutionPlan`] on this [`Scheduler`].
+    ///
+    /// Returns a [`ExecutionResults`] that can be used to receive results as 
they are produced,
+    /// as a [`futures::Stream`] of [`RecordBatch`]
+    pub fn schedule(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        context: Arc<TaskContext>,
+    ) -> Result<ExecutionResults> {
+        let plan = PipelinePlanner::new(plan, context).build()?;
+        Ok(self.schedule_plan(plan))
+    }
+
+    /// Schedule the provided [`PipelinePlan`] on this [`Scheduler`].
+    pub(crate) fn schedule_plan(&self, plan: PipelinePlan) -> ExecutionResults 
{
+        spawn_plan(plan, self.spawner())
+    }
+
+    fn spawner(&self) -> Spawner {
+        Spawner {
+            pool: self.pool.clone(),
+        }
+    }
+}
+
+/// Formats a panic message for a worker
+fn format_worker_panic(panic: Box<dyn std::any::Any + Send>) -> String {
+    let maybe_idx = rayon::current_thread_index();
+    let worker: &dyn std::fmt::Display = match &maybe_idx {
+        Some(idx) => idx,
+        None => &"UNKNOWN",
+    };
+
+    let message = if let Some(msg) = panic.downcast_ref::<&str>() {
+        *msg
+    } else if let Some(msg) = panic.downcast_ref::<String>() {
+        msg.as_str()
+    } else {
+        "UNKNOWN"
+    };
+
+    format!("worker {} panicked with: {}", worker, message)
+}
+
+/// Returns `true` if the current thread is a rayon worker thread
+///
+/// Note: if there are multiple rayon pools, this will return `true` if the 
current thread
+/// belongs to ANY rayon pool, even if this isn't a worker thread of a 
[`Scheduler`] instance
+fn is_worker() -> bool {
+    rayon::current_thread_index().is_some()
+}
+
+/// Spawn a [`Task`] onto the local workers thread pool
+///
+/// There is no guaranteed order of execution, as workers may steal at any 
time. However,
+/// `spawn_local` will append to the front of the current worker's queue, 
workers pop tasks from
+/// the front of their queue, and steal tasks from the back of other workers 
queues
+///
+/// The effect is that tasks spawned using `spawn_local` will typically be 
prioritised in
+/// a LIFO order, however, this should not be relied upon
+fn spawn_local(task: Task) {
+    // Verify is a worker thread to avoid creating a global pool
+    assert!(is_worker(), "must be called from a worker");
+    rayon::spawn(|| task.do_work())
+}
+
+/// Spawn a [`Task`] onto the local workers thread pool with fifo ordering
+///
+/// There is no guaranteed order of execution, as workers may steal at any 
time. However,
+/// `spawn_local_fifo` will append to the back of the current worker's queue, 
workers pop tasks
+/// from the front of their queue, and steal tasks from the back of other 
workers queues
+///
+/// The effect is that tasks spawned using `spawn_local_fifo` will typically 
be prioritised
+/// in a FIFO order, however, this should not be relied upon
+fn spawn_local_fifo(task: Task) {
+    // Verify is a worker thread to avoid creating a global pool
+    assert!(is_worker(), "must be called from a worker");
+    rayon::spawn_fifo(|| task.do_work())
+}
+
+#[derive(Debug, Clone)]
+pub struct Spawner {
+    pool: Arc<ThreadPool>,
+}
+
+impl Spawner {
+    pub fn spawn(&self, task: Task) {
+        debug!("Spawning {:?} to any worker", task);
+        self.pool.spawn(move || task.do_work());
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use arrow::util::pretty::pretty_format_batches;
+    use std::ops::Range;
+    use std::panic::panic_any;
+
+    use futures::{StreamExt, TryStreamExt};
+    use log::info;
+    use rand::distributions::uniform::SampleUniform;
+    use rand::{thread_rng, Rng};
+
+    use crate::arrow::array::{ArrayRef, PrimitiveArray};
+    use crate::arrow::datatypes::{ArrowPrimitiveType, Float64Type, Int32Type};
+    use crate::arrow::record_batch::RecordBatch;
+    use crate::datasource::{MemTable, TableProvider};
+    use crate::physical_plan::displayable;
+    use crate::prelude::{SessionConfig, SessionContext};
+
+    use super::*;
+
+    fn generate_primitive<T, R>(
+        rng: &mut R,
+        len: usize,
+        valid_percent: f64,
+        range: Range<T::Native>,
+    ) -> ArrayRef
+    where
+        T: ArrowPrimitiveType,
+        T::Native: SampleUniform,
+        R: Rng,
+    {
+        Arc::new(PrimitiveArray::<T>::from_iter((0..len).map(|_| {
+            rng.gen_bool(valid_percent)
+                .then(|| rng.gen_range(range.clone()))
+        })))
+    }
+
+    fn generate_batch<R: Rng>(
+        rng: &mut R,
+        row_count: usize,
+        id_offset: i32,
+    ) -> RecordBatch {
+        let id_range = id_offset..(row_count as i32 + id_offset);
+        let a = generate_primitive::<Int32Type, _>(rng, row_count, 0.5, 
0..1000);
+        let b = generate_primitive::<Float64Type, _>(rng, row_count, 0.5, 0. 
..1000.);
+        let id = PrimitiveArray::<Int32Type>::from_iter_values(id_range);
+
+        RecordBatch::try_from_iter_with_nullable([
+            ("a", a, true),
+            ("b", b, true),
+            ("id", Arc::new(id), false),
+        ])
+        .unwrap()
+    }
+
+    const BATCHES_PER_PARTITION: usize = 20;
+    const ROWS_PER_BATCH: usize = 100;
+    const NUM_PARTITIONS: usize = 2;
+
+    fn make_batches() -> Vec<Vec<RecordBatch>> {
+        let mut rng = thread_rng();
+
+        let mut id_offset = 0;
+
+        (0..NUM_PARTITIONS)
+            .map(|_| {
+                (0..BATCHES_PER_PARTITION)
+                    .map(|_| {
+                        let batch = generate_batch(&mut rng, ROWS_PER_BATCH, 
id_offset);
+                        id_offset += ROWS_PER_BATCH as i32;
+                        batch
+                    })
+                    .collect()
+            })
+            .collect()
+    }
+
+    fn make_provider() -> Arc<dyn TableProvider> {
+        let batches = make_batches();
+        let schema = batches.first().unwrap().first().unwrap().schema();
+        Arc::new(MemTable::try_new(schema, make_batches()).unwrap())
+    }
+
+    fn init_logging() {
+        let _ = env_logger::builder().is_test(true).try_init();
+    }
+
+    #[tokio::test]
+    async fn test_simple() {
+        init_logging();
+
+        let scheduler = Scheduler::new(4);
+
+        let config = SessionConfig::new().with_target_partitions(4);
+        let context = SessionContext::with_config(config);
+
+        context.register_table("table1", make_provider()).unwrap();
+        context.register_table("table2", make_provider()).unwrap();
+
+        let queries = [
+            "select * from table1 order by id",
+            "select * from table1 where table1.a > 100 order by id",
+            "select distinct a from table1 where table1.b > 100 order by a",
+            "select * from table1 join table2 on table1.id = table2.id order 
by table1.id",
+            "select id from table1 union all select id from table2 order by 
id",
+            "select id from table1 union all select id from table2 where a > 
100 order by id",
+            "select id, b from (select id, b from table1 union all select id, 
b from table2 where a > 100 order by id) as t where b > 10 order by id, b",
+            "select id, MIN(b), MAX(b), AVG(b) from table1 group by id order 
by id",
+            "select count(*) from table1 where table1.a > 4",
+        ];
+
+        for sql in queries {
+            let task = context.task_ctx();
+
+            let query = context.sql(sql).await.unwrap();
+
+            let plan = query.create_physical_plan().await.unwrap();
+
+            info!("Plan: {}", displayable(plan.as_ref()).indent());
+
+            let stream = scheduler.schedule(plan, task).unwrap().stream();
+            let scheduled: Vec<_> = stream.try_collect().await.unwrap();
+            let expected = query.collect().await.unwrap();
+
+            let total_expected = expected.iter().map(|x| 
x.num_rows()).sum::<usize>();
+            let total_scheduled = scheduled.iter().map(|x| 
x.num_rows()).sum::<usize>();
+            assert_eq!(total_expected, total_scheduled);
+
+            info!("Query \"{}\" produced {} rows", sql, total_expected);
+
+            let expected = 
pretty_format_batches(&expected).unwrap().to_string();
+            let scheduled = 
pretty_format_batches(&scheduled).unwrap().to_string();
+
+            assert_eq!(
+                expected, scheduled,
+                "\n\nexpected:\n\n{}\nactual:\n\n{}\n\n",
+                expected, scheduled
+            );
+        }
+    }
+
+    #[tokio::test]
+    async fn test_partitioned() {
+        init_logging();
+
+        let scheduler = Scheduler::new(4);
+
+        let config = SessionConfig::new().with_target_partitions(4);
+        let context = SessionContext::with_config(config);
+        let plan = context
+            .read_table(make_provider())
+            .unwrap()
+            .create_physical_plan()
+            .await
+            .unwrap();
+
+        assert_eq!(plan.output_partitioning().partition_count(), 
NUM_PARTITIONS);
+
+        let results = scheduler
+            .schedule(plan.clone(), context.task_ctx())
+            .unwrap();
+
+        let batches = results.stream().try_collect::<Vec<_>>().await.unwrap();
+        assert_eq!(batches.len(), NUM_PARTITIONS * BATCHES_PER_PARTITION);
+
+        for batch in batches {
+            assert_eq!(batch.num_rows(), ROWS_PER_BATCH)
+        }
+
+        let results = scheduler.schedule(plan, context.task_ctx()).unwrap();
+        let streams = results.stream_partitioned();
+
+        let partitions: Vec<Vec<_>> =
+            futures::future::try_join_all(streams.into_iter().map(|s| 
s.try_collect()))
+                .await
+                .unwrap();
+
+        assert_eq!(partitions.len(), NUM_PARTITIONS);
+        for batches in partitions {
+            assert_eq!(batches.len(), BATCHES_PER_PARTITION);
+            for batch in batches {
+                assert_eq!(batch.num_rows(), ROWS_PER_BATCH);
+            }
+        }
+    }
+
+    #[tokio::test]
+    async fn test_panic() {
+        init_logging();
+
+        let do_test = |scheduler: Scheduler| {
+            scheduler.pool.spawn(|| panic!("test"));
+            scheduler.pool.spawn(|| panic!("{}", 1));
+            scheduler.pool.spawn(|| panic_any(21));
+        };
+
+        // The default panic handler should log panics and not abort the 
process
+        do_test(Scheduler::new(1));
+
+        // Override panic handler and capture panics to test formatting
+        let (sender, receiver) = futures::channel::mpsc::unbounded();
+        let scheduler = SchedulerBuilder::new(1)
+            .panic_handler(move |panic| {
+                let _ = sender.unbounded_send(format_worker_panic(panic));
+            })
+            .build();
+
+        do_test(scheduler);
+
+        // Sort as order not guaranteed
+        let mut buffer: Vec<_> = receiver.collect().await;
+        buffer.sort_unstable();
+
+        assert_eq!(buffer.len(), 3);
+        assert_eq!(buffer[0], "worker 0 panicked with: 1");
+        assert_eq!(buffer[1], "worker 0 panicked with: UNKNOWN");
+        assert_eq!(buffer[2], "worker 0 panicked with: test");
+    }
+}
diff --git a/datafusion/core/src/scheduler/pipeline/execution.rs 
b/datafusion/core/src/scheduler/pipeline/execution.rs
new file mode 100644
index 000000000..baf487d98
--- /dev/null
+++ b/datafusion/core/src/scheduler/pipeline/execution.rs
@@ -0,0 +1,330 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::VecDeque;
+use std::fmt::Formatter;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll, Waker};
+
+use arrow::error::ArrowError;
+use async_trait::async_trait;
+use futures::{Stream, StreamExt, TryStreamExt};
+use parking_lot::Mutex;
+
+use crate::arrow::datatypes::SchemaRef;
+use crate::arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_plan::expressions::PhysicalSortExpr;
+use crate::physical_plan::metrics::MetricsSet;
+use crate::physical_plan::{
+    displayable, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    RecordBatchStream, SendableRecordBatchStream, Statistics,
+};
+
+use crate::scheduler::pipeline::Pipeline;
+use crate::scheduler::BoxStream;
+
+/// An [`ExecutionPipeline`] wraps a portion of an [`ExecutionPlan`] and
+/// converts it to the push-based [`Pipeline`] interface
+///
+/// Internally [`ExecutionPipeline`] is still pull-based which limits its 
parallelism
+/// to that of its output partitioning, however, it provides full 
compatibility with
+/// [`ExecutionPlan`] allowing full interoperability with the existing 
ecosystem
+///
+/// Longer term we will likely want to introduce new traits that differentiate 
between
+/// pipeline-able operators like filters, and pipeline-breakers like 
aggregations, and
+/// are better aligned with a push-based execution model.
+///
+/// This in turn will allow for [`Pipeline`] implementations that are able to 
introduce
+/// parallelism beyond that expressed in their partitioning
+pub struct ExecutionPipeline {
+    proxied: Arc<dyn ExecutionPlan>,
+    inputs: Vec<Vec<Arc<Mutex<InputPartition>>>>,
+    outputs: Vec<Mutex<BoxStream<'static, ArrowResult<RecordBatch>>>>,
+}
+
+impl std::fmt::Debug for ExecutionPipeline {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let tree = debug_tree(self.proxied.as_ref());
+        f.debug_tuple("ExecutionNode").field(&tree).finish()
+    }
+}
+
+impl ExecutionPipeline {
+    pub fn new(
+        plan: Arc<dyn ExecutionPlan>,
+        task_context: Arc<TaskContext>,
+        depth: usize,
+    ) -> Result<Self> {
+        // The point in the plan at which to splice the plan graph
+        let mut splice_point = plan;
+        let mut parent_plans = Vec::with_capacity(depth.saturating_sub(1));
+        for _ in 0..depth {
+            let children = splice_point.children();
+            assert_eq!(
+                children.len(),
+                1,
+                "can only group through nodes with a single child"
+            );
+            parent_plans.push(splice_point);
+            splice_point = children.into_iter().next().unwrap();
+        }
+
+        // The children to replace with [`ProxyExecutionPlan`]
+        let children = splice_point.children();
+        let mut inputs = Vec::with_capacity(children.len());
+
+        // The spliced plan with its children replaced with 
[`ProxyExecutionPlan`]
+        let spliced = if !children.is_empty() {
+            let mut proxies: Vec<Arc<dyn ExecutionPlan>> =
+                Vec::with_capacity(children.len());
+
+            for child in children {
+                let count = child.output_partitioning().partition_count();
+
+                let mut child_inputs = Vec::with_capacity(count);
+                for _ in 0..count {
+                    child_inputs.push(Default::default())
+                }
+
+                inputs.push(child_inputs.clone());
+                proxies.push(Arc::new(ProxyExecutionPlan {
+                    inner: child,
+                    inputs: child_inputs,
+                }));
+            }
+
+            splice_point.with_new_children(proxies)?
+        } else {
+            splice_point.clone()
+        };
+
+        // Reconstruct the parent graph
+        let mut proxied = spliced;
+        for parent in parent_plans.into_iter().rev() {
+            proxied = parent.with_new_children(vec![proxied])?
+        }
+
+        // Construct the output streams
+        let output_count = proxied.output_partitioning().partition_count();
+        let outputs = (0..output_count)
+            .map(|x| {
+                let proxy_captured = proxied.clone();
+                let task_captured = task_context.clone();
+                let fut = async move {
+                    proxy_captured
+                        .execute(x, task_captured)
+                        .await
+                        .map_err(|e| ArrowError::ExternalError(Box::new(e)))
+                };
+
+                // Use futures::stream::once to handle operators that perform 
computation
+                // within `ExecutionPlan::execute`. If we evaluated these 
futures here
+                // we could potentially block indefinitely waiting for inputs 
that will
+                // never arrive as the query isn't scheduled yet
+                Mutex::new(futures::stream::once(fut).try_flatten().boxed())
+            })
+            .collect();
+
+        Ok(Self {
+            proxied,
+            inputs,
+            outputs,
+        })
+    }
+}
+
+impl Pipeline for ExecutionPipeline {
+    /// Push a [`RecordBatch`] to the given input partition
+    fn push(&self, input: RecordBatch, child: usize, partition: usize) -> 
Result<()> {
+        let mut partition = self.inputs[child][partition].lock();
+        assert!(!partition.is_closed);
+
+        partition.buffer.push_back(input);
+        for waker in partition.wait_list.drain(..) {
+            waker.wake()
+        }
+        Ok(())
+    }
+
+    fn close(&self, child: usize, partition: usize) {
+        let mut partition = self.inputs[child][partition].lock();
+        assert!(!partition.is_closed);
+
+        partition.is_closed = true;
+        for waker in partition.wait_list.drain(..) {
+            waker.wake()
+        }
+    }
+
+    fn output_partitions(&self) -> usize {
+        self.outputs.len()
+    }
+
+    /// Poll an output partition, attempting to get its output
+    fn poll_partition(
+        &self,
+        cx: &mut Context<'_>,
+        partition: usize,
+    ) -> Poll<Option<Result<RecordBatch>>> {
+        self.outputs[partition]
+            .lock()
+            .poll_next_unpin(cx)
+            .map(|opt| opt.map(|r| r.map_err(Into::into)))
+    }
+}
+
+#[derive(Debug, Default)]
+struct InputPartition {
+    buffer: VecDeque<RecordBatch>,
+    wait_list: Vec<Waker>,
+    is_closed: bool,
+}
+
+struct InputPartitionStream {
+    schema: SchemaRef,
+    partition: Arc<Mutex<InputPartition>>,
+}
+
+impl Stream for InputPartitionStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
+        let mut partition = self.partition.lock();
+        match partition.buffer.pop_front() {
+            Some(batch) => Poll::Ready(Some(Ok(batch))),
+            None if partition.is_closed => Poll::Ready(None),
+            _ => {
+                partition.wait_list.push(cx.waker().clone());
+                Poll::Pending
+            }
+        }
+    }
+}
+
+impl RecordBatchStream for InputPartitionStream {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+/// This is a hack that allows injecting [`InputPartitionStream`] in place of 
the
+/// streams yielded by the child of the wrapped [`ExecutionPlan`]
+///
+/// This is hopefully temporary pending reworking [`ExecutionPlan`]
+#[derive(Debug)]
+struct ProxyExecutionPlan {
+    inner: Arc<dyn ExecutionPlan>,
+
+    inputs: Vec<Arc<Mutex<InputPartition>>>,
+}
+
+#[async_trait]
+impl ExecutionPlan for ProxyExecutionPlan {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.inner.schema()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        self.inner.output_partitioning()
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        self.inner.output_ordering()
+    }
+
+    fn required_child_distribution(&self) -> Distribution {
+        self.inner.required_child_distribution()
+    }
+
+    fn relies_on_input_order(&self) -> bool {
+        self.inner.relies_on_input_order()
+    }
+
+    fn maintains_input_order(&self) -> bool {
+        self.inner.maintains_input_order()
+    }
+
+    fn benefits_from_input_partitioning(&self) -> bool {
+        self.inner.benefits_from_input_partitioning()
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        unimplemented!()
+    }
+
+    async fn execute(
+        &self,
+        partition: usize,
+        _context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        Ok(Box::pin(InputPartitionStream {
+            schema: self.schema(),
+            partition: self.inputs[partition].clone(),
+        }))
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        self.inner.metrics()
+    }
+
+    fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
+        write!(f, "ProxyExecutionPlan")
+    }
+
+    fn statistics(&self) -> Statistics {
+        self.inner.statistics()
+    }
+}
+
+struct NodeDescriptor {
+    operator: String,
+    children: Vec<NodeDescriptor>,
+}
+
+impl std::fmt::Debug for NodeDescriptor {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct(&self.operator)
+            .field("children", &self.children)
+            .finish()
+    }
+}
+
+fn debug_tree(plan: &dyn ExecutionPlan) -> NodeDescriptor {
+    let operator = format!("{}", displayable(plan).one_line());
+    let children = plan
+        .children()
+        .into_iter()
+        .map(|x| debug_tree(x.as_ref()))
+        .collect();
+
+    NodeDescriptor { operator, children }
+}
diff --git a/datafusion/core/src/scheduler/pipeline/mod.rs 
b/datafusion/core/src/scheduler/pipeline/mod.rs
new file mode 100644
index 000000000..824a6950e
--- /dev/null
+++ b/datafusion/core/src/scheduler/pipeline/mod.rs
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::task::{Context, Poll};
+
+use arrow::record_batch::RecordBatch;
+
+use crate::error::Result;
+
+pub mod execution;
+pub mod repartition;
+
+/// A push-based interface used by the scheduler to drive query execution
+///
+/// A pipeline processes data from one or more input partitions, producing 
output
+/// to one or more output partitions. As a [`Pipeline`] may drawn on input from
+/// more than one upstream [`Pipeline`], input partitions are identified by 
both
+/// a child index, and a partition index, whereas output partitions are only
+/// identified by a partition index.
+///
+/// This is not intended as an eventual replacement for the physical plan 
representation
+/// within DataFusion, [`ExecutionPlan`], but rather a generic interface that
+/// parts of the physical plan are "compiled" into by the scheduler.
+///
+/// # Eager vs Lazy Execution
+///
+/// Whether computation is eagerly done on push, or lazily done on pull, is
+/// intentionally left as an implementation detail of the [`Pipeline`]
+///
+/// This allows flexibility to support the following different patterns, and 
potentially more:
+///
+/// An eager, push-based pipeline, that processes a batch synchronously in 
[`Pipeline::push`]
+/// and immediately wakes the corresponding output partition.
+///
+/// A parallel, push-based pipeline, that enqueues the processing of a batch 
to the rayon
+/// thread pool in [`Pipeline::push`], and wakes the corresponding output 
partition when
+/// the job completes. Order and non-order preserving variants are possible
+///
+/// A merge pipeline which combines data from one or more input partitions 
into one or
+/// more output partitions. [`Pipeline::push`] adds data to an input buffer, 
and wakes
+/// any output partitions that may now be able to make progress. This may be 
none if
+/// the operator is waiting on data from a different input partition
+///
+/// An aggregation pipeline which combines data from one or more input 
partitions into
+/// a single output partition. [`Pipeline::push`] would eagerly update the 
computed
+/// aggregates, and the final [`Pipeline::close`] trigger flushing these to 
the output.
+/// It would also be possible to flush once the partial aggregates reach a 
certain size
+///
+/// A partition-aware aggregation pipeline, which functions similarly to the 
above, but
+/// computes aggregations per input partition, before combining these prior to 
flush.
+///
+/// An async input pipeline, which has no inputs, and wakes the output 
partition
+/// whenever new data is available
+///
+/// A JIT compiled sequence of synchronous operators, that perform multiple 
operations
+/// from the physical plan as a single [`Pipeline`]. Parallelized 
implementations
+/// are also possible
+///
+pub trait Pipeline: Send + Sync + std::fmt::Debug {
+    /// Push a [`RecordBatch`] to the given input partition
+    fn push(&self, input: RecordBatch, child: usize, partition: usize) -> 
Result<()>;
+
+    /// Mark an input partition as exhausted
+    fn close(&self, child: usize, partition: usize);
+
+    /// Returns the number of output partitions
+    fn output_partitions(&self) -> usize;
+
+    /// Attempt to pull out the next value of the given output partition, 
registering the
+    /// current task for wakeup if the value is not yet available, and 
returning `None`
+    /// if the output partition is exhausted and will never yield any further 
values
+    ///
+    /// # Return value
+    ///
+    /// There are several possible return values:
+    ///
+    /// - `Poll::Pending` indicates that this partition's next value is not 
ready yet.
+    /// Implementations should use the waker provided by `cx` to notify the 
scheduler when
+    /// progress may be able to be made
+    ///
+    /// - `Poll::Ready(Some(Ok(val)))` returns the next value from this output 
partition,
+    /// the output partition should be polled again as it may have further 
values. The returned
+    /// value will be routed to the next pipeline in the query
+    ///
+    /// - `Poll::Ready(Some(Err(e)))` returns an error that will be routed to 
the query's output
+    /// and the query execution aborted.
+    ///
+    /// - `Poll::Ready(None)` indicates that this partition is exhausted and 
will not produce any
+    /// further values.
+    ///
+    fn poll_partition(
+        &self,
+        cx: &mut Context<'_>,
+        partition: usize,
+    ) -> Poll<Option<Result<RecordBatch>>>;
+}
diff --git a/datafusion/core/src/scheduler/pipeline/repartition.rs 
b/datafusion/core/src/scheduler/pipeline/repartition.rs
new file mode 100644
index 000000000..c35ab909f
--- /dev/null
+++ b/datafusion/core/src/scheduler/pipeline/repartition.rs
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::VecDeque;
+use std::task::{Context, Poll, Waker};
+
+use parking_lot::Mutex;
+
+use crate::arrow::record_batch::RecordBatch;
+use crate::error::Result;
+use crate::physical_plan::repartition::BatchPartitioner;
+use crate::physical_plan::Partitioning;
+
+use crate::scheduler::pipeline::Pipeline;
+
+/// A [`Pipeline`] that can repartition its input
+#[derive(Debug)]
+pub struct RepartitionPipeline {
+    output_count: usize,
+    state: Mutex<RepartitionState>,
+}
+
+impl RepartitionPipeline {
+    /// Create a new [`RepartitionPipeline`] with the given `input` and 
`output` partitioning
+    pub fn try_new(input: Partitioning, output: Partitioning) -> Result<Self> {
+        let input_count = input.partition_count();
+        let output_count = output.partition_count();
+        assert_ne!(input_count, 0);
+        assert_ne!(output_count, 0);
+
+        // TODO: metrics support
+        let partitioner = BatchPartitioner::try_new(output, 
Default::default())?;
+
+        let state = Mutex::new(RepartitionState {
+            partitioner,
+            partition_closed: vec![false; input_count],
+            input_closed: false,
+            output_buffers: (0..output_count).map(|_| 
Default::default()).collect(),
+        });
+
+        Ok(Self {
+            state,
+            output_count,
+        })
+    }
+}
+
+struct RepartitionState {
+    partitioner: BatchPartitioner,
+    partition_closed: Vec<bool>,
+    input_closed: bool,
+    output_buffers: Vec<OutputBuffer>,
+}
+
+impl std::fmt::Debug for RepartitionState {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("RepartitionState")
+            .field("partition_closed", &self.partition_closed)
+            .field("input_closed", &self.input_closed)
+            .finish()
+    }
+}
+
+impl Pipeline for RepartitionPipeline {
+    fn push(&self, input: RecordBatch, child: usize, partition: usize) -> 
Result<()> {
+        assert_eq!(child, 0);
+
+        let mut state = self.state.lock();
+        assert!(
+            !state.partition_closed[partition],
+            "attempt to push to closed partition {} of 
RepartitionPipeline({:?})",
+            partition, state
+        );
+
+        let state = &mut *state;
+        state.partitioner.partition(input, |partition, batch| {
+            state.output_buffers[partition].push_batch(batch);
+            Ok(())
+        })
+    }
+
+    fn close(&self, child: usize, partition: usize) {
+        assert_eq!(child, 0);
+
+        let mut state = self.state.lock();
+        assert!(
+            !state.partition_closed[partition],
+            "attempt to close already closed partition {} of 
RepartitionPipeline({:?})",
+            partition, state
+        );
+
+        state.partition_closed[partition] = true;
+
+        // If all input streams exhausted, wake outputs
+        if state.partition_closed.iter().all(|x| *x) {
+            state.input_closed = true;
+            for buffer in &mut state.output_buffers {
+                for waker in buffer.wait_list.drain(..) {
+                    waker.wake()
+                }
+            }
+        }
+    }
+
+    fn output_partitions(&self) -> usize {
+        self.output_count
+    }
+
+    fn poll_partition(
+        &self,
+        cx: &mut Context<'_>,
+        partition: usize,
+    ) -> Poll<Option<Result<RecordBatch>>> {
+        let mut state = self.state.lock();
+        let input_closed = state.input_closed;
+        let buffer = &mut state.output_buffers[partition];
+
+        match buffer.batches.pop_front() {
+            Some(batch) => Poll::Ready(Some(Ok(batch))),
+            None if input_closed => Poll::Ready(None),
+            _ => {
+                buffer.wait_list.push(cx.waker().clone());
+                Poll::Pending
+            }
+        }
+    }
+}
+
+#[derive(Debug, Default)]
+struct OutputBuffer {
+    batches: VecDeque<RecordBatch>,
+    wait_list: Vec<Waker>,
+}
+
+impl OutputBuffer {
+    fn push_batch(&mut self, batch: RecordBatch) {
+        self.batches.push_back(batch);
+
+        for waker in self.wait_list.drain(..) {
+            waker.wake()
+        }
+    }
+}
diff --git a/datafusion/core/src/scheduler/plan.rs 
b/datafusion/core/src/scheduler/plan.rs
new file mode 100644
index 000000000..e7d5e1d33
--- /dev/null
+++ b/datafusion/core/src/scheduler/plan.rs
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::SchemaRef;
+use std::sync::Arc;
+
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::{ExecutionPlan, Partitioning};
+
+use crate::scheduler::pipeline::{
+    execution::ExecutionPipeline, repartition::RepartitionPipeline, Pipeline,
+};
+
+/// Identifies the [`Pipeline`] within the [`PipelinePlan`] to route output to
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub struct OutputLink {
+    /// The index of the [`Pipeline`] in [`PipelinePlan`] to route output to
+    pub pipeline: usize,
+
+    /// The child of the [`Pipeline`] to route output to
+    pub child: usize,
+}
+
+/// Combines a [`Pipeline`] with an [`OutputLink`] identifying where to send 
its output
+#[derive(Debug)]
+pub struct RoutablePipeline {
+    /// The pipeline that produces data
+    pub pipeline: Box<dyn Pipeline>,
+
+    /// Where to send output the output of `pipeline`
+    ///
+    /// If `None`, the output should be sent to the query output
+    pub output: Option<OutputLink>,
+}
+
+/// [`PipelinePlan`] is the scheduler's representation of the 
[`ExecutionPlan`] passed to
+/// [`super::Scheduler::schedule`]. It combines the list of [Pipeline`] with 
the information
+/// necessary to route output from one stage to the next
+#[derive(Debug)]
+pub struct PipelinePlan {
+    /// Schema of this plans output
+    pub schema: SchemaRef,
+
+    /// Number of output partitions
+    pub output_partitions: usize,
+
+    /// Pipelines that comprise this plan
+    pub pipelines: Vec<RoutablePipeline>,
+}
+
+/// When converting [`ExecutionPlan`] to [`Pipeline`] we may wish to group
+/// together multiple operators, [`OperatorGroup`] stores this state
+struct OperatorGroup {
+    /// Where to route the output of the eventual [`Pipeline`]
+    output: Option<OutputLink>,
+
+    /// The [`ExecutionPlan`] from which to start recursing
+    root: Arc<dyn ExecutionPlan>,
+
+    /// The number of times to recurse into the [`ExecutionPlan`]'s children
+    depth: usize,
+}
+
+/// A utility struct to assist converting from [`ExecutionPlan`] to 
[`PipelinePlan`]
+///
+/// The [`ExecutionPlan`] is visited in a depth-first fashion, gradually 
building
+/// up the [`RoutablePipeline`] for the [`PipelinePlan`]. As nodes are visited 
depth-first,
+/// a node is visited only after its parent has been.
+pub struct PipelinePlanner {
+    task_context: Arc<TaskContext>,
+
+    /// The schema of this plan
+    schema: SchemaRef,
+
+    /// The number of output partitions of this plan
+    output_partitions: usize,
+
+    /// The current list of completed pipelines
+    completed: Vec<RoutablePipeline>,
+
+    /// A list of [`ExecutionPlan`] still to visit, along with
+    /// where they should route their output
+    to_visit: Vec<(Arc<dyn ExecutionPlan>, Option<OutputLink>)>,
+
+    /// Stores one or more operators to combine
+    /// together into a single [`ExecutionPipeline`]
+    execution_operators: Option<OperatorGroup>,
+}
+
+impl PipelinePlanner {
+    pub fn new(plan: Arc<dyn ExecutionPlan>, task_context: Arc<TaskContext>) 
-> Self {
+        let schema = plan.schema();
+        let output_partitions = plan.output_partitioning().partition_count();
+        Self {
+            completed: vec![],
+            to_visit: vec![(plan, None)],
+            task_context,
+            execution_operators: None,
+            schema,
+            output_partitions,
+        }
+    }
+
+    /// Flush the current group of operators stored in `execution_operators`
+    /// into a single [`ExecutionPipeline]
+    fn flush_exec(&mut self) -> Result<usize> {
+        let group = self.execution_operators.take().unwrap();
+        let node_idx = self.completed.len();
+        self.completed.push(RoutablePipeline {
+            pipeline: Box::new(ExecutionPipeline::new(
+                group.root,
+                self.task_context.clone(),
+                group.depth,
+            )?),
+            output: group.output,
+        });
+        Ok(node_idx)
+    }
+
+    /// Visit a non-special cased [`ExecutionPlan`]
+    fn visit_exec(
+        &mut self,
+        plan: Arc<dyn ExecutionPlan>,
+        parent: Option<OutputLink>,
+    ) -> Result<()> {
+        let children = plan.children();
+
+        // Add the operator to the current group of operators to be combined
+        // into a single [`ExecutionPipeline`].
+        //
+        // TODO: More sophisticated policy, just because we can combine them 
doesn't mean we should
+        match self.execution_operators.as_mut() {
+            Some(buffer) => {
+                assert_eq!(parent, buffer.output, "QueryBuilder out of sync");
+                buffer.depth += 1;
+            }
+            None => {
+                self.execution_operators = Some(OperatorGroup {
+                    output: parent,
+                    root: plan,
+                    depth: 0,
+                })
+            }
+        }
+
+        match children.len() {
+            1 => {
+                // Enqueue the children with the parent of the `OperatorGroup`
+                self.to_visit
+                    .push((children.into_iter().next().unwrap(), parent))
+            }
+            _ => {
+                // We can only recursively group through nodes with a single 
child, therefore
+                // if this node has multiple children, we now need to flush 
the buffer and
+                // enqueue its children with this new pipeline as its parent
+                let node = self.flush_exec()?;
+                self.enqueue_children(children, node);
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Add the given list of children to the stack of [`ExecutionPlan`] to 
visit
+    fn enqueue_children(
+        &mut self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+        parent_node_idx: usize,
+    ) {
+        for (child_idx, child) in children.into_iter().enumerate() {
+            self.to_visit.push((
+                child,
+                Some(OutputLink {
+                    pipeline: parent_node_idx,
+                    child: child_idx,
+                }),
+            ))
+        }
+    }
+
+    /// Push a new [`RoutablePipeline`] and enqueue its children to be visited
+    fn push_pipeline(
+        &mut self,
+        node: RoutablePipeline,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) {
+        let node_idx = self.completed.len();
+        self.completed.push(node);
+        self.enqueue_children(children, node_idx)
+    }
+
+    /// Push a new [`RepartitionPipeline`] first flushing any buffered 
[`OperatorGroup`]
+    fn push_repartition(
+        &mut self,
+        input: Partitioning,
+        output: Partitioning,
+        parent: Option<OutputLink>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<()> {
+        let parent = match &self.execution_operators {
+            Some(buffer) => {
+                assert_eq!(buffer.output, parent, "QueryBuilder out of sync");
+                Some(OutputLink {
+                    pipeline: self.flush_exec()?,
+                    child: 0, // Must be the only child
+                })
+            }
+            None => parent,
+        };
+
+        let node = Box::new(RepartitionPipeline::try_new(input, output)?);
+        self.push_pipeline(
+            RoutablePipeline {
+                pipeline: node,
+                output: parent,
+            },
+            children,
+        );
+        Ok(())
+    }
+
+    /// Visit an [`ExecutionPlan`] operator and add it to the [`PipelinePlan`] 
being built
+    fn visit_operator(
+        &mut self,
+        plan: Arc<dyn ExecutionPlan>,
+        parent: Option<OutputLink>,
+    ) -> Result<()> {
+        if let Some(repartition) = 
plan.as_any().downcast_ref::<RepartitionExec>() {
+            self.push_repartition(
+                repartition.input().output_partitioning(),
+                repartition.output_partitioning(),
+                parent,
+                repartition.children(),
+            )
+        } else if let Some(coalesce) =
+            plan.as_any().downcast_ref::<CoalescePartitionsExec>()
+        {
+            self.push_repartition(
+                coalesce.input().output_partitioning(),
+                Partitioning::RoundRobinBatch(1),
+                parent,
+                coalesce.children(),
+            )
+        } else {
+            self.visit_exec(plan, parent)
+        }
+    }
+
+    /// Build a [`PipelinePlan`] from the [`ExecutionPlan`] provided to 
[`PipelinePlanner::new`]
+    ///
+    /// This will group all operators possible into a single 
[`ExecutionPipeline`], only
+    /// creating new pipelines when:
+    ///
+    /// - encountering an operator with multiple children
+    /// - encountering a repartitioning operator
+    ///
+    /// This latter case is because currently the repartitioning operators in 
DataFusion are
+    /// coupled with the non-scheduler-based parallelism story
+    ///
+    /// The above logic is liable to change, is considered an implementation 
detail of the
+    /// scheduler, and should not be relied upon by operators
+    ///
+    pub fn build(mut self) -> Result<PipelinePlan> {
+        // We do a depth-first scan of the operator tree, extracting a list of 
[`QueryNode`]
+        while let Some((plan, parent)) = self.to_visit.pop() {
+            self.visit_operator(plan, parent)?;
+        }
+
+        if self.execution_operators.is_some() {
+            self.flush_exec()?;
+        }
+
+        Ok(PipelinePlan {
+            schema: self.schema,
+            output_partitions: self.output_partitions,
+            pipelines: self.completed,
+        })
+    }
+}
diff --git a/datafusion/core/src/scheduler/task.rs 
b/datafusion/core/src/scheduler/task.rs
new file mode 100644
index 000000000..e91985437
--- /dev/null
+++ b/datafusion/core/src/scheduler/task.rs
@@ -0,0 +1,497 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::stream::RecordBatchStreamAdapter;
+use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
+use crate::scheduler::{
+    is_worker, plan::PipelinePlan, spawn_local, spawn_local_fifo, 
RoutablePipeline,
+    Spawner,
+};
+use arrow::datatypes::SchemaRef;
+use arrow::error::ArrowError;
+use arrow::record_batch::RecordBatch;
+use futures::channel::mpsc;
+use futures::task::ArcWake;
+use futures::{ready, Stream, StreamExt};
+use log::{debug, trace};
+use std::pin::Pin;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Weak};
+use std::task::{Context, Poll};
+
+/// Spawns a [`PipelinePlan`] using the provided [`Spawner`]
+pub fn spawn_plan(plan: PipelinePlan, spawner: Spawner) -> ExecutionResults {
+    debug!("Spawning pipeline plan: {:#?}", plan);
+
+    let (senders, receivers) = (0..plan.output_partitions)
+        .map(|_| mpsc::unbounded())
+        .unzip::<_, _, Vec<_>, Vec<_>>();
+
+    let context = Arc::new(ExecutionContext {
+        spawner,
+        pipelines: plan.pipelines,
+        schema: plan.schema,
+        output: senders,
+    });
+
+    for (pipeline_idx, query_pipeline) in context.pipelines.iter().enumerate() 
{
+        for partition in 0..query_pipeline.pipeline.output_partitions() {
+            context.spawner.spawn(Task {
+                context: context.clone(),
+                waker: Arc::new(TaskWaker {
+                    context: Arc::downgrade(&context),
+                    wake_count: AtomicUsize::new(1),
+                    pipeline: pipeline_idx,
+                    partition,
+                }),
+            });
+        }
+    }
+
+    let partitions = receivers
+        .into_iter()
+        .map(|receiver| ExecutionResultStream {
+            receiver: receiver,
+            context: context.clone(),
+        })
+        .collect();
+
+    ExecutionResults {
+        streams: partitions,
+        context,
+    }
+}
+
+/// A [`Task`] identifies an output partition within a given pipeline that may 
be able to
+/// make progress. The [`Scheduler`][super::Scheduler] maintains a list of 
outstanding
+/// [`Task`] and distributes them amongst its worker threads.
+pub struct Task {
+    /// Maintain a link to the [`ExecutionContext`] this is necessary to be 
able to
+    /// route the output of the partition to its destination
+    context: Arc<ExecutionContext>,
+
+    /// A [`ArcWake`] that can be used to re-schedule this [`Task`] for 
execution
+    waker: Arc<TaskWaker>,
+}
+
+impl std::fmt::Debug for Task {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let output = &self.context.pipelines[self.waker.pipeline].output;
+
+        f.debug_struct("Task")
+            .field("pipeline", &self.waker.pipeline)
+            .field("partition", &self.waker.partition)
+            .field("output", &output)
+            .finish()
+    }
+}
+
+impl Task {
+    fn handle_error(
+        &self,
+        partition: usize,
+        routable: &RoutablePipeline,
+        error: DataFusionError,
+    ) {
+        self.context.send_query_output(partition, Err(error));
+        if let Some(link) = routable.output {
+            trace!(
+                "Closing pipeline: {:?}, partition: {}, due to error",
+                link,
+                self.waker.partition,
+            );
+
+            self.context.pipelines[link.pipeline]
+                .pipeline
+                .close(link.child, self.waker.partition);
+        }
+    }
+
+    /// Call [`Pipeline::poll_partition`], attempting to make progress on 
query execution
+    pub fn do_work(self) {
+        assert!(is_worker(), "Task::do_work called outside of worker pool");
+        if self.context.is_cancelled() {
+            return;
+        }
+
+        // Capture the wake count prior to calling [`Pipeline::poll_partition`]
+        // this allows us to detect concurrent wake ups and handle them 
correctly
+        let wake_count = self.waker.wake_count.load(Ordering::SeqCst);
+
+        let node = self.waker.pipeline;
+        let partition = self.waker.partition;
+
+        let waker = futures::task::waker_ref(&self.waker);
+        let mut cx = Context::from_waker(&*waker);
+
+        let pipelines = &self.context.pipelines;
+        let routable = &pipelines[node];
+        match routable.pipeline.poll_partition(&mut cx, partition) {
+            Poll::Ready(Some(Ok(batch))) => {
+                trace!("Poll {:?}: Ok: {}", self, batch.num_rows());
+                match routable.output {
+                    Some(link) => {
+                        trace!(
+                            "Publishing batch to pipeline {:?} partition {}",
+                            link,
+                            partition
+                        );
+
+                        let r = pipelines[link.pipeline]
+                            .pipeline
+                            .push(batch, link.child, partition);
+
+                        if let Err(e) = r {
+                            self.handle_error(partition, routable, e);
+
+                            // Return without rescheduling this output again
+                            return;
+                        }
+                    }
+                    None => {
+                        trace!("Publishing batch to output");
+                        self.context.send_query_output(partition, Ok(batch))
+                    }
+                }
+
+                // Reschedule this pipeline again
+                //
+                // We want to prioritise running tasks triggered by the most 
recent
+                // batch, so reschedule with FIFO ordering
+                //
+                // Note: We must schedule after we have routed the batch, 
otherwise
+                // we introduce a potential ordering race where the newly 
scheduled
+                // task runs before this task finishes routing the output
+                spawn_local_fifo(self);
+            }
+            Poll::Ready(Some(Err(e))) => {
+                trace!("Poll {:?}: Error: {:?}", self, e);
+                self.handle_error(partition, routable, e)
+            }
+            Poll::Ready(None) => {
+                trace!("Poll {:?}: None", self);
+                match routable.output {
+                    Some(link) => {
+                        trace!("Closing pipeline: {:?}, partition: {}", link, 
partition);
+                        pipelines[link.pipeline]
+                            .pipeline
+                            .close(link.child, partition)
+                    }
+                    None => self.context.finish(partition),
+                }
+            }
+            Poll::Pending => {
+                trace!("Poll {:?}: Pending", self);
+                // Attempt to reset the wake count with the value obtained 
prior
+                // to calling [`Pipeline::poll_partition`].
+                //
+                // If this fails it indicates a wakeup was received whilst 
executing
+                // [`Pipeline::poll_partition`] and we should reschedule the 
task
+                let reset = self.waker.wake_count.compare_exchange(
+                    wake_count,
+                    0,
+                    Ordering::SeqCst,
+                    Ordering::SeqCst,
+                );
+
+                if reset.is_err() {
+                    trace!("Wakeup triggered whilst polling: {:?}", self);
+                    spawn_local(self);
+                }
+            }
+        }
+    }
+}
+
+/// The results of the execution of a query
+pub struct ExecutionResults {
+    /// [`ExecutionResultStream`] for each partition of this query
+    streams: Vec<ExecutionResultStream>,
+
+    /// Keep a reference to the [`ExecutionContext`] so it isn't dropped early
+    context: Arc<ExecutionContext>,
+}
+
+impl ExecutionResults {
+    /// Returns a [`SendableRecordBatchStream`] of this execution
+    ///
+    /// In the event of multiple output partitions, the output will be 
interleaved
+    pub fn stream(self) -> SendableRecordBatchStream {
+        let schema = self.context.schema.clone();
+        Box::pin(RecordBatchStreamAdapter::new(
+            schema,
+            futures::stream::select_all(self.streams),
+        ))
+    }
+
+    /// Returns a [`SendableRecordBatchStream`] for each partition of this 
execution
+    pub fn stream_partitioned(self) -> Vec<SendableRecordBatchStream> {
+        self.streams.into_iter().map(|s| Box::pin(s) as _).collect()
+    }
+}
+
+/// A result stream for the execution of a query
+struct ExecutionResultStream {
+    receiver: mpsc::UnboundedReceiver<Option<Result<RecordBatch>>>,
+
+    /// Keep a reference to the [`ExecutionContext`] so it isn't dropped early
+    context: Arc<ExecutionContext>,
+}
+
+impl Stream for ExecutionResultStream {
+    type Item = arrow::error::Result<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        let opt = ready!(self.receiver.poll_next_unpin(cx)).flatten();
+        Poll::Ready(opt.map(|r| r.map_err(|e| 
ArrowError::ExternalError(Box::new(e)))))
+    }
+}
+
+impl RecordBatchStream for ExecutionResultStream {
+    fn schema(&self) -> SchemaRef {
+        self.context.schema.clone()
+    }
+}
+
+/// The shared state of all [`Task`] created from the same [`PipelinePlan`]
+#[derive(Debug)]
+struct ExecutionContext {
+    /// Spawner for this query
+    spawner: Spawner,
+
+    /// List of pipelines that belong to this query, pipelines are addressed
+    /// based on their index within this list
+    pipelines: Vec<RoutablePipeline>,
+
+    /// Schema of this plans output
+    pub schema: SchemaRef,
+
+    /// The output streams, per partition, for this query's execution
+    output: Vec<mpsc::UnboundedSender<Option<Result<RecordBatch>>>>,
+}
+
+impl Drop for ExecutionContext {
+    fn drop(&mut self) {
+        debug!("ExecutionContext dropped");
+    }
+}
+
+impl ExecutionContext {
+    /// Returns `true` if this query has been dropped, specifically if the
+    /// stream returned by [`super::Scheduler::schedule`] has been dropped
+    fn is_cancelled(&self) -> bool {
+        self.output.iter().all(|x| x.is_closed())
+    }
+
+    /// Sends `output` to this query's output stream
+    fn send_query_output(&self, partition: usize, output: Result<RecordBatch>) 
{
+        let _ = self.output[partition].unbounded_send(Some(output));
+    }
+
+    /// Mark this partition as finished
+    fn finish(&self, partition: usize) {
+        let _ = self.output[partition].unbounded_send(None);
+    }
+}
+
+struct TaskWaker {
+    /// Store a weak reference to the [`ExecutionContext`] to avoid reference 
cycles if this
+    /// [`Waker`] is stored within a [`Pipeline`] owned by the 
[`ExecutionContext`]
+    context: Weak<ExecutionContext>,
+
+    /// A counter that stores the number of times this has been awoken
+    ///
+    /// A value > 0, implies the task is either in the ready queue or
+    /// currently being executed
+    ///
+    /// `TaskWaker::wake` always increments the `wake_count`, however, it only
+    /// re-enqueues the [`Task`] if the value prior to increment was 0
+    ///
+    /// This ensures that a given [`Task`] is not enqueued multiple times
+    ///
+    /// We store an integer, as opposed to a boolean, so that wake ups that
+    /// occur during [`Pipeline::poll_partition`] can be detected and handled
+    /// after it has finished executing
+    ///
+    wake_count: AtomicUsize,
+
+    /// The index of the pipeline within `query` to poll
+    pipeline: usize,
+
+    /// The partition of the pipeline within `query` to poll
+    partition: usize,
+}
+
+impl ArcWake for TaskWaker {
+    fn wake(self: Arc<Self>) {
+        if self.wake_count.fetch_add(1, Ordering::SeqCst) != 0 {
+            trace!("Ignoring duplicate wakeup");
+            return;
+        }
+
+        if let Some(context) = self.context.upgrade() {
+            let task = Task {
+                context,
+                waker: self.clone(),
+            };
+
+            trace!("Wakeup {:?}", task);
+
+            // If called from a worker, spawn to the current worker's
+            // local queue, otherwise reschedule on any worker
+            match is_worker() {
+                true => spawn_local(task),
+                false => task.context.spawner.clone().spawn(task),
+            }
+        } else {
+            trace!("Dropped wakeup");
+        }
+    }
+
+    fn wake_by_ref(s: &Arc<Self>) {
+        ArcWake::wake(s.clone())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::error::Result;
+    use crate::scheduler::{pipeline::Pipeline, plan::RoutablePipeline, 
Scheduler};
+    use arrow::array::{ArrayRef, Int32Array};
+    use arrow::datatypes::{DataType, Field, Schema};
+    use arrow::record_batch::RecordBatch;
+    use futures::{channel::oneshot, ready, FutureExt, StreamExt};
+    use parking_lot::Mutex;
+    use std::fmt::Debug;
+    use std::time::Duration;
+
+    /// Tests that waker can be sent to tokio pool
+    #[derive(Debug)]
+    struct TokioPipeline {
+        handle: tokio::runtime::Handle,
+        state: Mutex<State>,
+    }
+
+    #[derive(Debug)]
+    enum State {
+        Init,
+        Wait(oneshot::Receiver<Result<RecordBatch>>),
+        Finished,
+    }
+
+    impl Default for State {
+        fn default() -> Self {
+            Self::Init
+        }
+    }
+
+    impl Pipeline for TokioPipeline {
+        fn push(
+            &self,
+            _input: RecordBatch,
+            _child: usize,
+            _partition: usize,
+        ) -> Result<()> {
+            unreachable!()
+        }
+
+        fn close(&self, _child: usize, _partition: usize) {}
+
+        fn output_partitions(&self) -> usize {
+            1
+        }
+
+        fn poll_partition(
+            &self,
+            cx: &mut Context<'_>,
+            _partition: usize,
+        ) -> Poll<Option<Result<RecordBatch>>> {
+            let mut state = self.state.lock();
+            loop {
+                match &mut *state {
+                    State::Init => {
+                        let (sender, receiver) = oneshot::channel();
+                        self.handle.spawn(async move {
+                            
tokio::time::sleep(Duration::from_millis(10)).await;
+                            let array = Int32Array::from_iter_values([1, 2, 
3]);
+                            sender.send(
+                                RecordBatch::try_from_iter([(
+                                    "int",
+                                    Arc::new(array) as ArrayRef,
+                                )])
+                                .map_err(DataFusionError::ArrowError),
+                            )
+                        });
+                        *state = State::Wait(receiver)
+                    }
+                    State::Wait(r) => {
+                        let v = ready!(r.poll_unpin(cx)).ok();
+                        *state = State::Finished;
+                        return Poll::Ready(v);
+                    }
+                    State::Finished => return Poll::Ready(None),
+                }
+            }
+        }
+    }
+
+    #[test]
+    fn test_tokio_waker() {
+        let scheduler = Scheduler::new(2);
+
+        // A tokio runtime
+        let runtime = tokio::runtime::Builder::new_current_thread()
+            .enable_time()
+            .build()
+            .unwrap();
+
+        // A pipeline that dispatches to a tokio worker
+        let pipeline = TokioPipeline {
+            handle: runtime.handle().clone(),
+            state: Default::default(),
+        };
+
+        let plan = PipelinePlan {
+            schema: Arc::new(Schema::new(vec![Field::new(
+                "int",
+                DataType::Int32,
+                false,
+            )])),
+            output_partitions: 1,
+            pipelines: vec![RoutablePipeline {
+                pipeline: Box::new(pipeline),
+                output: None,
+            }],
+        };
+
+        let mut receiver = scheduler.schedule_plan(plan).stream();
+
+        runtime.block_on(async move {
+            // Should wait for output
+            let batch = receiver.next().await.unwrap().unwrap();
+            assert_eq!(batch.num_rows(), 3);
+
+            // Next batch should be none
+            assert!(receiver.next().await.is_none());
+        })
+    }
+}

Reply via email to