This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new dc76ec1a8 Introduce new optional scheduler, using Morsel-driven
Parallelism + rayon (#2199) (#2226)
dc76ec1a8 is described below
commit dc76ec1a83c264bdaebdab95a63daa71f2cae743
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed May 4 15:12:16 2022 +0100
Introduce new optional scheduler, using Morsel-driven Parallelism + rayon
(#2199) (#2226)
* Morsel-driven Parallelism using rayon (#2199)
* Fix LIFO spawn ordering
* Further docs for ExecutionPipeline
* Deduplicate concurrent wakes
* Add license headers
* Sort Cargo.toml
* Revert accidental change to ParquetExec
* Handle wakeups triggered by other threads
* Use SeqCst memory ordering
* Review feedback
* Add panic handler
* Cleanup structs
Add test of tokio interoperation
* Review feedback
* Use BatchPartitioner
Cleanup error handling
* Clarify shutdown characteristics
* Fix racy test_panic
* Don't overload Query nomenclature
* Rename QueryResults to ExecutionResults
* Further review feedback
* Merge scheduler into datafusion/core
* Review feedback
* Fix partitioned execution
* Format
* Format Cargo.toml
* Fix doc link
---
CONTRIBUTING.md | 2 +-
Cargo.toml | 2 +-
datafusion/core/Cargo.toml | 9 +-
datafusion/core/benches/parquet_query_sql.rs | 65 ++-
datafusion/core/src/lib.rs | 2 +
datafusion/core/src/scheduler/mod.rs | 454 +++++++++++++++++++
.../core/src/scheduler/pipeline/execution.rs | 330 ++++++++++++++
datafusion/core/src/scheduler/pipeline/mod.rs | 110 +++++
.../core/src/scheduler/pipeline/repartition.rs | 157 +++++++
datafusion/core/src/scheduler/plan.rs | 296 ++++++++++++
datafusion/core/src/scheduler/task.rs | 497 +++++++++++++++++++++
11 files changed, 1905 insertions(+), 19 deletions(-)
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index ab3381dff..4f0fe7163 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -150,7 +150,7 @@ The parquet SQL benchmarks can be run with
cargo bench --bench parquet_query_sql
```
-These randomly generate a parquet file, and then benchmark queries sourced
from [parquet_query_sql.sql](./datafusion/benches/parquet_query_sql.sql)
against it. This can therefore be a quick way to add coverage of particular
query and/or data paths.
+These randomly generate a parquet file, and then benchmark queries sourced
from [parquet_query_sql.sql](./datafusion/core/benches/parquet_query_sql.sql)
against it. This can therefore be a quick way to add coverage of particular
query and/or data paths.
If the environment variable `PARQUET_FILE` is set, the benchmark will run
queries against this file instead of a randomly generated one. This can be
useful for performing multiple runs, potentially with different code, against
the same source data, or for testing against a custom dataset.
diff --git a/Cargo.toml b/Cargo.toml
index fefd5679a..297b394b4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,8 +17,8 @@
[workspace]
members = [
- "datafusion/core",
"datafusion/common",
+ "datafusion/core",
"datafusion/expr",
"datafusion/jit",
"datafusion/physical-expr",
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 551d3e59b..6dc8acb3d 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -24,7 +24,7 @@ repository = "https://github.com/apache/arrow-datafusion"
readme = "../README.md"
authors = ["Apache Arrow <[email protected]>"]
license = "Apache-2.0"
-keywords = [ "arrow", "query", "sql" ]
+keywords = ["arrow", "query", "sql"]
include = [
"benches/*.rs",
"src/**/*.rs",
@@ -50,6 +50,8 @@ pyarrow = ["pyo3", "arrow/pyarrow",
"datafusion-common/pyarrow"]
regex_expressions = ["datafusion-physical-expr/regex_expressions"]
# Used to enable row format experiment
row = ["datafusion-row"]
+# Used to enable scheduler
+scheduler = ["rayon"]
simd = ["arrow/simd"]
unicode_expressions = ["datafusion-physical-expr/regex_expressions"]
@@ -75,9 +77,10 @@ ordered-float = "3.0"
parking_lot = "0.12"
parquet = { version = "13", features = ["arrow"] }
paste = "^1.0"
-pin-project-lite= "^0.2.7"
+pin-project-lite = "^0.2.7"
pyo3 = { version = "0.16", optional = true }
rand = "0.8"
+rayon = { version = "1.5", optional = true }
smallvec = { version = "1.6", features = ["union"] }
sqlparser = "0.16"
tempfile = "3"
@@ -88,6 +91,7 @@ uuid = { version = "1.0", features = ["v4"] }
[dev-dependencies]
criterion = "0.3"
doc-comment = "0.3"
+env_logger = "0.9"
fuzz-utils = { path = "fuzz-utils" }
[[bench]]
@@ -121,6 +125,7 @@ name = "physical_plan"
[[bench]]
harness = false
name = "parquet_query_sql"
+required-features = ["scheduler"]
[[bench]]
harness = false
diff --git a/datafusion/core/benches/parquet_query_sql.rs
b/datafusion/core/benches/parquet_query_sql.rs
index 08156fad4..26a041fb4 100644
--- a/datafusion/core/benches/parquet_query_sql.rs
+++ b/datafusion/core/benches/parquet_query_sql.rs
@@ -24,7 +24,9 @@ use arrow::datatypes::{
};
use arrow::record_batch::RecordBatch;
use criterion::{criterion_group, criterion_main, Criterion};
-use datafusion::prelude::{ParquetReadOptions, SessionContext};
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion::scheduler::Scheduler;
+use futures::stream::StreamExt;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::{WriterProperties, WriterVersion};
use rand::distributions::uniform::SampleUniform;
@@ -37,7 +39,6 @@ use std::path::Path;
use std::sync::Arc;
use std::time::Instant;
use tempfile::NamedTempFile;
-use tokio_stream::StreamExt;
/// The number of batches to write
const NUM_BATCHES: usize = 2048;
@@ -193,15 +194,24 @@ fn criterion_benchmark(c: &mut Criterion) {
assert!(Path::new(&file_path).exists(), "path not found");
println!("Using parquet file {}", file_path);
- let context = SessionContext::new();
+ let partitions = 4;
+ let config = SessionConfig::new().with_target_partitions(partitions);
+ let context = SessionContext::with_config(config);
- let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap();
- rt.block_on(context.register_parquet(
- "t",
- file_path.as_str(),
- ParquetReadOptions::default(),
- ))
- .unwrap();
+ let scheduler = Scheduler::new(partitions);
+
+ let local_rt = tokio::runtime::Builder::new_current_thread()
+ .build()
+ .unwrap();
+
+ let query_rt = tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(partitions)
+ .build()
+ .unwrap();
+
+ local_rt
+ .block_on(context.register_parquet("t", file_path.as_str(),
Default::default()))
+ .unwrap();
// We read the queries from a file so they can be changed without
recompiling the benchmark
let mut queries_file =
File::open("benches/parquet_query_sql.sql").unwrap();
@@ -220,17 +230,42 @@ fn criterion_benchmark(c: &mut Criterion) {
continue;
}
- let query = query.as_str();
- c.bench_function(query, |b| {
+ c.bench_function(&format!("tokio: {}", query), |b| {
b.iter(|| {
+ let query = query.clone();
let context = context.clone();
- rt.block_on(async move {
- let query = context.sql(query).await.unwrap();
+ let (sender, mut receiver) =
futures::channel::mpsc::unbounded();
+
+ // Spawn work to a separate tokio thread pool
+ query_rt.spawn(async move {
+ let query = context.sql(&query).await.unwrap();
let mut stream = query.execute_stream().await.unwrap();
- while criterion::black_box(stream.next().await).is_some()
{}
+
+ while let Some(next) = stream.next().await {
+ sender.unbounded_send(next).unwrap();
+ }
+ });
+
+ local_rt.block_on(async {
+ while receiver.next().await.transpose().unwrap().is_some()
{}
})
});
});
+
+ c.bench_function(&format!("scheduled: {}", query), |b| {
+ b.iter(|| {
+ let query = query.clone();
+ let context = context.clone();
+
+ local_rt.block_on(async {
+ let query = context.sql(&query).await.unwrap();
+ let plan = query.create_physical_plan().await.unwrap();
+ let mut stream =
+ scheduler.schedule(plan, context.task_ctx()).unwrap();
+ while stream.next().await.transpose().unwrap().is_some() {}
+ });
+ });
+ });
}
// Temporary file must outlive the benchmarks, it is deleted when dropped
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index 055a17f4e..c598d9a33 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -218,6 +218,8 @@ pub mod physical_optimizer;
pub mod physical_plan;
pub mod prelude;
pub mod scalar;
+#[cfg(feature = "scheduler")]
+pub mod scheduler;
pub mod sql;
pub mod variable;
diff --git a/datafusion/core/src/scheduler/mod.rs
b/datafusion/core/src/scheduler/mod.rs
new file mode 100644
index 000000000..a765ddf83
--- /dev/null
+++ b/datafusion/core/src/scheduler/mod.rs
@@ -0,0 +1,454 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A [`Scheduler`] maintains a pool of dedicated worker threads on which
+//! query execution can be scheduled. This is based on the idea of
[Morsel-Driven Parallelism]
+//! and is designed to decouple the execution parallelism from the parallelism
expressed in
+//! the physical plan as partitions.
+//!
+//! # Implementation
+//!
+//! When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it
up into smaller
+//! chunks called pipelines. Each pipeline may consist of one or more nodes
from the
+//! [`ExecutionPlan`] tree.
+//!
+//! The scheduler then maintains a list of pending [`Task`], that identify a
partition within
+//! a particular pipeline that may be able to make progress on some "morsel"
of data. These
+//! [`Task`] are then scheduled on the worker pool, with a preference for
scheduling work
+//! on a given "morsel" on the same thread that produced it.
+//!
+//! # Rayon
+//!
+//! Under-the-hood these [`Task`] are scheduled by [rayon], which is a
lightweight, work-stealing
+//! scheduler optimised for CPU-bound workloads. Pipelines may exploit this
fact, and use [rayon]'s
+//! structured concurrency primitives to express additional parallelism that
may be exploited
+//! if there are idle threads available at runtime
+//!
+//! # Shutdown
+//!
+//! Queries scheduled on a [`Scheduler`] will run to completion even if the
+//! [`Scheduler`] is dropped
+//!
+//! [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf
+//! [rayon]: https://docs.rs/rayon/latest/rayon/
+//!
+//! # Example
+//!
+//! ```rust
+//! # use futures::TryStreamExt;
+//! # use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext};
+//! # use datafusion_scheduler::Scheduler;
+//!
+//! # #[tokio::main]
+//! # async fn main() {
+//! let scheduler = Scheduler::new(4);
+//! let config = SessionConfig::new().with_target_partitions(4);
+//! let context = SessionContext::with_config(config);
+//!
+//! context.register_csv("example", "../core/tests/example.csv",
CsvReadOptions::new()).await.unwrap();
+//! let plan = context.sql("SELECT MIN(b) FROM example")
+//! .await
+//! .unwrap()
+//! .create_physical_plan()
+//! .await
+//! .unwrap();
+//!
+//! let task = context.task_ctx();
+//! let stream = scheduler.schedule(plan, task).unwrap();
+//! let scheduled: Vec<_> = stream.try_collect().await.unwrap();
+//! # }
+//! ```
+//!
+
+use std::sync::Arc;
+
+use futures::stream::BoxStream;
+use log::{debug, error};
+
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_plan::ExecutionPlan;
+
+use plan::{PipelinePlan, PipelinePlanner, RoutablePipeline};
+use task::{spawn_plan, Task};
+
+use rayon::{ThreadPool, ThreadPoolBuilder};
+
+pub use task::ExecutionResults;
+
+mod pipeline;
+mod plan;
+mod task;
+
+/// Builder for a [`Scheduler`]
+#[derive(Debug)]
+pub struct SchedulerBuilder {
+ inner: ThreadPoolBuilder,
+}
+
+impl SchedulerBuilder {
+ /// Create a new [`SchedulerConfig`] with the provided number of threads
+ pub fn new(num_threads: usize) -> Self {
+ let builder = ThreadPoolBuilder::new()
+ .num_threads(num_threads)
+ .panic_handler(|p| error!("{}", format_worker_panic(p)))
+ .thread_name(|idx| format!("df-worker-{}", idx));
+
+ Self { inner: builder }
+ }
+
+ /// Registers a custom panic handler
+ #[cfg(test)]
+ fn panic_handler<H>(self, panic_handler: H) -> Self
+ where
+ H: Fn(Box<dyn std::any::Any + Send>) + Send + Sync + 'static,
+ {
+ Self {
+ inner: self.inner.panic_handler(panic_handler),
+ }
+ }
+
+ /// Build a new [`Scheduler`]
+ fn build(self) -> Scheduler {
+ Scheduler {
+ pool: Arc::new(self.inner.build().unwrap()),
+ }
+ }
+}
+
+/// A [`Scheduler`] that can be used to schedule [`ExecutionPlan`] on a
dedicated thread pool
+pub struct Scheduler {
+ pool: Arc<ThreadPool>,
+}
+
+impl Scheduler {
+ /// Create a new [`Scheduler`] with `num_threads` new threads in a
dedicated thread pool
+ pub fn new(num_threads: usize) -> Self {
+ SchedulerBuilder::new(num_threads).build()
+ }
+
+ /// Schedule the provided [`ExecutionPlan`] on this [`Scheduler`].
+ ///
+ /// Returns a [`ExecutionResults`] that can be used to receive results as
they are produced,
+ /// as a [`futures::Stream`] of [`RecordBatch`]
+ pub fn schedule(
+ &self,
+ plan: Arc<dyn ExecutionPlan>,
+ context: Arc<TaskContext>,
+ ) -> Result<ExecutionResults> {
+ let plan = PipelinePlanner::new(plan, context).build()?;
+ Ok(self.schedule_plan(plan))
+ }
+
+ /// Schedule the provided [`PipelinePlan`] on this [`Scheduler`].
+ pub(crate) fn schedule_plan(&self, plan: PipelinePlan) -> ExecutionResults
{
+ spawn_plan(plan, self.spawner())
+ }
+
+ fn spawner(&self) -> Spawner {
+ Spawner {
+ pool: self.pool.clone(),
+ }
+ }
+}
+
+/// Formats a panic message for a worker
+fn format_worker_panic(panic: Box<dyn std::any::Any + Send>) -> String {
+ let maybe_idx = rayon::current_thread_index();
+ let worker: &dyn std::fmt::Display = match &maybe_idx {
+ Some(idx) => idx,
+ None => &"UNKNOWN",
+ };
+
+ let message = if let Some(msg) = panic.downcast_ref::<&str>() {
+ *msg
+ } else if let Some(msg) = panic.downcast_ref::<String>() {
+ msg.as_str()
+ } else {
+ "UNKNOWN"
+ };
+
+ format!("worker {} panicked with: {}", worker, message)
+}
+
+/// Returns `true` if the current thread is a rayon worker thread
+///
+/// Note: if there are multiple rayon pools, this will return `true` if the
current thread
+/// belongs to ANY rayon pool, even if this isn't a worker thread of a
[`Scheduler`] instance
+fn is_worker() -> bool {
+ rayon::current_thread_index().is_some()
+}
+
+/// Spawn a [`Task`] onto the local workers thread pool
+///
+/// There is no guaranteed order of execution, as workers may steal at any
time. However,
+/// `spawn_local` will append to the front of the current worker's queue,
workers pop tasks from
+/// the front of their queue, and steal tasks from the back of other workers
queues
+///
+/// The effect is that tasks spawned using `spawn_local` will typically be
prioritised in
+/// a LIFO order, however, this should not be relied upon
+fn spawn_local(task: Task) {
+ // Verify is a worker thread to avoid creating a global pool
+ assert!(is_worker(), "must be called from a worker");
+ rayon::spawn(|| task.do_work())
+}
+
+/// Spawn a [`Task`] onto the local workers thread pool with fifo ordering
+///
+/// There is no guaranteed order of execution, as workers may steal at any
time. However,
+/// `spawn_local_fifo` will append to the back of the current worker's queue,
workers pop tasks
+/// from the front of their queue, and steal tasks from the back of other
workers queues
+///
+/// The effect is that tasks spawned using `spawn_local_fifo` will typically
be prioritised
+/// in a FIFO order, however, this should not be relied upon
+fn spawn_local_fifo(task: Task) {
+ // Verify is a worker thread to avoid creating a global pool
+ assert!(is_worker(), "must be called from a worker");
+ rayon::spawn_fifo(|| task.do_work())
+}
+
+#[derive(Debug, Clone)]
+pub struct Spawner {
+ pool: Arc<ThreadPool>,
+}
+
+impl Spawner {
+ pub fn spawn(&self, task: Task) {
+ debug!("Spawning {:?} to any worker", task);
+ self.pool.spawn(move || task.do_work());
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use arrow::util::pretty::pretty_format_batches;
+ use std::ops::Range;
+ use std::panic::panic_any;
+
+ use futures::{StreamExt, TryStreamExt};
+ use log::info;
+ use rand::distributions::uniform::SampleUniform;
+ use rand::{thread_rng, Rng};
+
+ use crate::arrow::array::{ArrayRef, PrimitiveArray};
+ use crate::arrow::datatypes::{ArrowPrimitiveType, Float64Type, Int32Type};
+ use crate::arrow::record_batch::RecordBatch;
+ use crate::datasource::{MemTable, TableProvider};
+ use crate::physical_plan::displayable;
+ use crate::prelude::{SessionConfig, SessionContext};
+
+ use super::*;
+
+ fn generate_primitive<T, R>(
+ rng: &mut R,
+ len: usize,
+ valid_percent: f64,
+ range: Range<T::Native>,
+ ) -> ArrayRef
+ where
+ T: ArrowPrimitiveType,
+ T::Native: SampleUniform,
+ R: Rng,
+ {
+ Arc::new(PrimitiveArray::<T>::from_iter((0..len).map(|_| {
+ rng.gen_bool(valid_percent)
+ .then(|| rng.gen_range(range.clone()))
+ })))
+ }
+
+ fn generate_batch<R: Rng>(
+ rng: &mut R,
+ row_count: usize,
+ id_offset: i32,
+ ) -> RecordBatch {
+ let id_range = id_offset..(row_count as i32 + id_offset);
+ let a = generate_primitive::<Int32Type, _>(rng, row_count, 0.5,
0..1000);
+ let b = generate_primitive::<Float64Type, _>(rng, row_count, 0.5, 0.
..1000.);
+ let id = PrimitiveArray::<Int32Type>::from_iter_values(id_range);
+
+ RecordBatch::try_from_iter_with_nullable([
+ ("a", a, true),
+ ("b", b, true),
+ ("id", Arc::new(id), false),
+ ])
+ .unwrap()
+ }
+
+ const BATCHES_PER_PARTITION: usize = 20;
+ const ROWS_PER_BATCH: usize = 100;
+ const NUM_PARTITIONS: usize = 2;
+
+ fn make_batches() -> Vec<Vec<RecordBatch>> {
+ let mut rng = thread_rng();
+
+ let mut id_offset = 0;
+
+ (0..NUM_PARTITIONS)
+ .map(|_| {
+ (0..BATCHES_PER_PARTITION)
+ .map(|_| {
+ let batch = generate_batch(&mut rng, ROWS_PER_BATCH,
id_offset);
+ id_offset += ROWS_PER_BATCH as i32;
+ batch
+ })
+ .collect()
+ })
+ .collect()
+ }
+
+ fn make_provider() -> Arc<dyn TableProvider> {
+ let batches = make_batches();
+ let schema = batches.first().unwrap().first().unwrap().schema();
+ Arc::new(MemTable::try_new(schema, make_batches()).unwrap())
+ }
+
+ fn init_logging() {
+ let _ = env_logger::builder().is_test(true).try_init();
+ }
+
+ #[tokio::test]
+ async fn test_simple() {
+ init_logging();
+
+ let scheduler = Scheduler::new(4);
+
+ let config = SessionConfig::new().with_target_partitions(4);
+ let context = SessionContext::with_config(config);
+
+ context.register_table("table1", make_provider()).unwrap();
+ context.register_table("table2", make_provider()).unwrap();
+
+ let queries = [
+ "select * from table1 order by id",
+ "select * from table1 where table1.a > 100 order by id",
+ "select distinct a from table1 where table1.b > 100 order by a",
+ "select * from table1 join table2 on table1.id = table2.id order
by table1.id",
+ "select id from table1 union all select id from table2 order by
id",
+ "select id from table1 union all select id from table2 where a >
100 order by id",
+ "select id, b from (select id, b from table1 union all select id,
b from table2 where a > 100 order by id) as t where b > 10 order by id, b",
+ "select id, MIN(b), MAX(b), AVG(b) from table1 group by id order
by id",
+ "select count(*) from table1 where table1.a > 4",
+ ];
+
+ for sql in queries {
+ let task = context.task_ctx();
+
+ let query = context.sql(sql).await.unwrap();
+
+ let plan = query.create_physical_plan().await.unwrap();
+
+ info!("Plan: {}", displayable(plan.as_ref()).indent());
+
+ let stream = scheduler.schedule(plan, task).unwrap().stream();
+ let scheduled: Vec<_> = stream.try_collect().await.unwrap();
+ let expected = query.collect().await.unwrap();
+
+ let total_expected = expected.iter().map(|x|
x.num_rows()).sum::<usize>();
+ let total_scheduled = scheduled.iter().map(|x|
x.num_rows()).sum::<usize>();
+ assert_eq!(total_expected, total_scheduled);
+
+ info!("Query \"{}\" produced {} rows", sql, total_expected);
+
+ let expected =
pretty_format_batches(&expected).unwrap().to_string();
+ let scheduled =
pretty_format_batches(&scheduled).unwrap().to_string();
+
+ assert_eq!(
+ expected, scheduled,
+ "\n\nexpected:\n\n{}\nactual:\n\n{}\n\n",
+ expected, scheduled
+ );
+ }
+ }
+
+ #[tokio::test]
+ async fn test_partitioned() {
+ init_logging();
+
+ let scheduler = Scheduler::new(4);
+
+ let config = SessionConfig::new().with_target_partitions(4);
+ let context = SessionContext::with_config(config);
+ let plan = context
+ .read_table(make_provider())
+ .unwrap()
+ .create_physical_plan()
+ .await
+ .unwrap();
+
+ assert_eq!(plan.output_partitioning().partition_count(),
NUM_PARTITIONS);
+
+ let results = scheduler
+ .schedule(plan.clone(), context.task_ctx())
+ .unwrap();
+
+ let batches = results.stream().try_collect::<Vec<_>>().await.unwrap();
+ assert_eq!(batches.len(), NUM_PARTITIONS * BATCHES_PER_PARTITION);
+
+ for batch in batches {
+ assert_eq!(batch.num_rows(), ROWS_PER_BATCH)
+ }
+
+ let results = scheduler.schedule(plan, context.task_ctx()).unwrap();
+ let streams = results.stream_partitioned();
+
+ let partitions: Vec<Vec<_>> =
+ futures::future::try_join_all(streams.into_iter().map(|s|
s.try_collect()))
+ .await
+ .unwrap();
+
+ assert_eq!(partitions.len(), NUM_PARTITIONS);
+ for batches in partitions {
+ assert_eq!(batches.len(), BATCHES_PER_PARTITION);
+ for batch in batches {
+ assert_eq!(batch.num_rows(), ROWS_PER_BATCH);
+ }
+ }
+ }
+
+ #[tokio::test]
+ async fn test_panic() {
+ init_logging();
+
+ let do_test = |scheduler: Scheduler| {
+ scheduler.pool.spawn(|| panic!("test"));
+ scheduler.pool.spawn(|| panic!("{}", 1));
+ scheduler.pool.spawn(|| panic_any(21));
+ };
+
+ // The default panic handler should log panics and not abort the
process
+ do_test(Scheduler::new(1));
+
+ // Override panic handler and capture panics to test formatting
+ let (sender, receiver) = futures::channel::mpsc::unbounded();
+ let scheduler = SchedulerBuilder::new(1)
+ .panic_handler(move |panic| {
+ let _ = sender.unbounded_send(format_worker_panic(panic));
+ })
+ .build();
+
+ do_test(scheduler);
+
+ // Sort as order not guaranteed
+ let mut buffer: Vec<_> = receiver.collect().await;
+ buffer.sort_unstable();
+
+ assert_eq!(buffer.len(), 3);
+ assert_eq!(buffer[0], "worker 0 panicked with: 1");
+ assert_eq!(buffer[1], "worker 0 panicked with: UNKNOWN");
+ assert_eq!(buffer[2], "worker 0 panicked with: test");
+ }
+}
diff --git a/datafusion/core/src/scheduler/pipeline/execution.rs
b/datafusion/core/src/scheduler/pipeline/execution.rs
new file mode 100644
index 000000000..baf487d98
--- /dev/null
+++ b/datafusion/core/src/scheduler/pipeline/execution.rs
@@ -0,0 +1,330 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::VecDeque;
+use std::fmt::Formatter;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll, Waker};
+
+use arrow::error::ArrowError;
+use async_trait::async_trait;
+use futures::{Stream, StreamExt, TryStreamExt};
+use parking_lot::Mutex;
+
+use crate::arrow::datatypes::SchemaRef;
+use crate::arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_plan::expressions::PhysicalSortExpr;
+use crate::physical_plan::metrics::MetricsSet;
+use crate::physical_plan::{
+ displayable, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+ RecordBatchStream, SendableRecordBatchStream, Statistics,
+};
+
+use crate::scheduler::pipeline::Pipeline;
+use crate::scheduler::BoxStream;
+
+/// An [`ExecutionPipeline`] wraps a portion of an [`ExecutionPlan`] and
+/// converts it to the push-based [`Pipeline`] interface
+///
+/// Internally [`ExecutionPipeline`] is still pull-based which limits its
parallelism
+/// to that of its output partitioning, however, it provides full
compatibility with
+/// [`ExecutionPlan`] allowing full interoperability with the existing
ecosystem
+///
+/// Longer term we will likely want to introduce new traits that differentiate
between
+/// pipeline-able operators like filters, and pipeline-breakers like
aggregations, and
+/// are better aligned with a push-based execution model.
+///
+/// This in turn will allow for [`Pipeline`] implementations that are able to
introduce
+/// parallelism beyond that expressed in their partitioning
+pub struct ExecutionPipeline {
+ proxied: Arc<dyn ExecutionPlan>,
+ inputs: Vec<Vec<Arc<Mutex<InputPartition>>>>,
+ outputs: Vec<Mutex<BoxStream<'static, ArrowResult<RecordBatch>>>>,
+}
+
+impl std::fmt::Debug for ExecutionPipeline {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let tree = debug_tree(self.proxied.as_ref());
+ f.debug_tuple("ExecutionNode").field(&tree).finish()
+ }
+}
+
+impl ExecutionPipeline {
+ pub fn new(
+ plan: Arc<dyn ExecutionPlan>,
+ task_context: Arc<TaskContext>,
+ depth: usize,
+ ) -> Result<Self> {
+ // The point in the plan at which to splice the plan graph
+ let mut splice_point = plan;
+ let mut parent_plans = Vec::with_capacity(depth.saturating_sub(1));
+ for _ in 0..depth {
+ let children = splice_point.children();
+ assert_eq!(
+ children.len(),
+ 1,
+ "can only group through nodes with a single child"
+ );
+ parent_plans.push(splice_point);
+ splice_point = children.into_iter().next().unwrap();
+ }
+
+ // The children to replace with [`ProxyExecutionPlan`]
+ let children = splice_point.children();
+ let mut inputs = Vec::with_capacity(children.len());
+
+ // The spliced plan with its children replaced with
[`ProxyExecutionPlan`]
+ let spliced = if !children.is_empty() {
+ let mut proxies: Vec<Arc<dyn ExecutionPlan>> =
+ Vec::with_capacity(children.len());
+
+ for child in children {
+ let count = child.output_partitioning().partition_count();
+
+ let mut child_inputs = Vec::with_capacity(count);
+ for _ in 0..count {
+ child_inputs.push(Default::default())
+ }
+
+ inputs.push(child_inputs.clone());
+ proxies.push(Arc::new(ProxyExecutionPlan {
+ inner: child,
+ inputs: child_inputs,
+ }));
+ }
+
+ splice_point.with_new_children(proxies)?
+ } else {
+ splice_point.clone()
+ };
+
+ // Reconstruct the parent graph
+ let mut proxied = spliced;
+ for parent in parent_plans.into_iter().rev() {
+ proxied = parent.with_new_children(vec![proxied])?
+ }
+
+ // Construct the output streams
+ let output_count = proxied.output_partitioning().partition_count();
+ let outputs = (0..output_count)
+ .map(|x| {
+ let proxy_captured = proxied.clone();
+ let task_captured = task_context.clone();
+ let fut = async move {
+ proxy_captured
+ .execute(x, task_captured)
+ .await
+ .map_err(|e| ArrowError::ExternalError(Box::new(e)))
+ };
+
+ // Use futures::stream::once to handle operators that perform
computation
+ // within `ExecutionPlan::execute`. If we evaluated these
futures here
+ // we could potentially block indefinitely waiting for inputs
that will
+ // never arrive as the query isn't scheduled yet
+ Mutex::new(futures::stream::once(fut).try_flatten().boxed())
+ })
+ .collect();
+
+ Ok(Self {
+ proxied,
+ inputs,
+ outputs,
+ })
+ }
+}
+
+impl Pipeline for ExecutionPipeline {
+ /// Push a [`RecordBatch`] to the given input partition
+ fn push(&self, input: RecordBatch, child: usize, partition: usize) ->
Result<()> {
+ let mut partition = self.inputs[child][partition].lock();
+ assert!(!partition.is_closed);
+
+ partition.buffer.push_back(input);
+ for waker in partition.wait_list.drain(..) {
+ waker.wake()
+ }
+ Ok(())
+ }
+
+ fn close(&self, child: usize, partition: usize) {
+ let mut partition = self.inputs[child][partition].lock();
+ assert!(!partition.is_closed);
+
+ partition.is_closed = true;
+ for waker in partition.wait_list.drain(..) {
+ waker.wake()
+ }
+ }
+
+ fn output_partitions(&self) -> usize {
+ self.outputs.len()
+ }
+
+ /// Poll an output partition, attempting to get its output
+ fn poll_partition(
+ &self,
+ cx: &mut Context<'_>,
+ partition: usize,
+ ) -> Poll<Option<Result<RecordBatch>>> {
+ self.outputs[partition]
+ .lock()
+ .poll_next_unpin(cx)
+ .map(|opt| opt.map(|r| r.map_err(Into::into)))
+ }
+}
+
+#[derive(Debug, Default)]
+struct InputPartition {
+ buffer: VecDeque<RecordBatch>,
+ wait_list: Vec<Waker>,
+ is_closed: bool,
+}
+
+struct InputPartitionStream {
+ schema: SchemaRef,
+ partition: Arc<Mutex<InputPartition>>,
+}
+
+impl Stream for InputPartitionStream {
+ type Item = ArrowResult<RecordBatch>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
+ let mut partition = self.partition.lock();
+ match partition.buffer.pop_front() {
+ Some(batch) => Poll::Ready(Some(Ok(batch))),
+ None if partition.is_closed => Poll::Ready(None),
+ _ => {
+ partition.wait_list.push(cx.waker().clone());
+ Poll::Pending
+ }
+ }
+ }
+}
+
+impl RecordBatchStream for InputPartitionStream {
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+}
+
+/// This is a hack that allows injecting [`InputPartitionStream`] in place of
the
+/// streams yielded by the child of the wrapped [`ExecutionPlan`]
+///
+/// This is hopefully temporary pending reworking [`ExecutionPlan`]
+#[derive(Debug)]
+struct ProxyExecutionPlan {
+ inner: Arc<dyn ExecutionPlan>,
+
+ inputs: Vec<Arc<Mutex<InputPartition>>>,
+}
+
+#[async_trait]
+impl ExecutionPlan for ProxyExecutionPlan {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.inner.schema()
+ }
+
+ fn output_partitioning(&self) -> Partitioning {
+ self.inner.output_partitioning()
+ }
+
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ self.inner.output_ordering()
+ }
+
+ fn required_child_distribution(&self) -> Distribution {
+ self.inner.required_child_distribution()
+ }
+
+ fn relies_on_input_order(&self) -> bool {
+ self.inner.relies_on_input_order()
+ }
+
+ fn maintains_input_order(&self) -> bool {
+ self.inner.maintains_input_order()
+ }
+
+ fn benefits_from_input_partitioning(&self) -> bool {
+ self.inner.benefits_from_input_partitioning()
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ _children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ unimplemented!()
+ }
+
+ async fn execute(
+ &self,
+ partition: usize,
+ _context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ Ok(Box::pin(InputPartitionStream {
+ schema: self.schema(),
+ partition: self.inputs[partition].clone(),
+ }))
+ }
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ self.inner.metrics()
+ }
+
+ fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) ->
std::fmt::Result {
+ write!(f, "ProxyExecutionPlan")
+ }
+
+ fn statistics(&self) -> Statistics {
+ self.inner.statistics()
+ }
+}
+
+struct NodeDescriptor {
+ operator: String,
+ children: Vec<NodeDescriptor>,
+}
+
+impl std::fmt::Debug for NodeDescriptor {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct(&self.operator)
+ .field("children", &self.children)
+ .finish()
+ }
+}
+
+fn debug_tree(plan: &dyn ExecutionPlan) -> NodeDescriptor {
+ let operator = format!("{}", displayable(plan).one_line());
+ let children = plan
+ .children()
+ .into_iter()
+ .map(|x| debug_tree(x.as_ref()))
+ .collect();
+
+ NodeDescriptor { operator, children }
+}
diff --git a/datafusion/core/src/scheduler/pipeline/mod.rs
b/datafusion/core/src/scheduler/pipeline/mod.rs
new file mode 100644
index 000000000..824a6950e
--- /dev/null
+++ b/datafusion/core/src/scheduler/pipeline/mod.rs
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::task::{Context, Poll};
+
+use arrow::record_batch::RecordBatch;
+
+use crate::error::Result;
+
+pub mod execution;
+pub mod repartition;
+
+/// A push-based interface used by the scheduler to drive query execution
+///
+/// A pipeline processes data from one or more input partitions, producing
output
+/// to one or more output partitions. As a [`Pipeline`] may drawn on input from
+/// more than one upstream [`Pipeline`], input partitions are identified by
both
+/// a child index, and a partition index, whereas output partitions are only
+/// identified by a partition index.
+///
+/// This is not intended as an eventual replacement for the physical plan
representation
+/// within DataFusion, [`ExecutionPlan`], but rather a generic interface that
+/// parts of the physical plan are "compiled" into by the scheduler.
+///
+/// # Eager vs Lazy Execution
+///
+/// Whether computation is eagerly done on push, or lazily done on pull, is
+/// intentionally left as an implementation detail of the [`Pipeline`]
+///
+/// This allows flexibility to support the following different patterns, and
potentially more:
+///
+/// An eager, push-based pipeline, that processes a batch synchronously in
[`Pipeline::push`]
+/// and immediately wakes the corresponding output partition.
+///
+/// A parallel, push-based pipeline, that enqueues the processing of a batch
to the rayon
+/// thread pool in [`Pipeline::push`], and wakes the corresponding output
partition when
+/// the job completes. Order and non-order preserving variants are possible
+///
+/// A merge pipeline which combines data from one or more input partitions
into one or
+/// more output partitions. [`Pipeline::push`] adds data to an input buffer,
and wakes
+/// any output partitions that may now be able to make progress. This may be
none if
+/// the operator is waiting on data from a different input partition
+///
+/// An aggregation pipeline which combines data from one or more input
partitions into
+/// a single output partition. [`Pipeline::push`] would eagerly update the
computed
+/// aggregates, and the final [`Pipeline::close`] trigger flushing these to
the output.
+/// It would also be possible to flush once the partial aggregates reach a
certain size
+///
+/// A partition-aware aggregation pipeline, which functions similarly to the
above, but
+/// computes aggregations per input partition, before combining these prior to
flush.
+///
+/// An async input pipeline, which has no inputs, and wakes the output
partition
+/// whenever new data is available
+///
+/// A JIT compiled sequence of synchronous operators, that perform multiple
operations
+/// from the physical plan as a single [`Pipeline`]. Parallelized
implementations
+/// are also possible
+///
+pub trait Pipeline: Send + Sync + std::fmt::Debug {
+ /// Push a [`RecordBatch`] to the given input partition
+ fn push(&self, input: RecordBatch, child: usize, partition: usize) ->
Result<()>;
+
+ /// Mark an input partition as exhausted
+ fn close(&self, child: usize, partition: usize);
+
+ /// Returns the number of output partitions
+ fn output_partitions(&self) -> usize;
+
+ /// Attempt to pull out the next value of the given output partition,
registering the
+ /// current task for wakeup if the value is not yet available, and
returning `None`
+ /// if the output partition is exhausted and will never yield any further
values
+ ///
+ /// # Return value
+ ///
+ /// There are several possible return values:
+ ///
+ /// - `Poll::Pending` indicates that this partition's next value is not
ready yet.
+ /// Implementations should use the waker provided by `cx` to notify the
scheduler when
+ /// progress may be able to be made
+ ///
+ /// - `Poll::Ready(Some(Ok(val)))` returns the next value from this output
partition,
+ /// the output partition should be polled again as it may have further
values. The returned
+ /// value will be routed to the next pipeline in the query
+ ///
+ /// - `Poll::Ready(Some(Err(e)))` returns an error that will be routed to
the query's output
+ /// and the query execution aborted.
+ ///
+ /// - `Poll::Ready(None)` indicates that this partition is exhausted and
will not produce any
+ /// further values.
+ ///
+ fn poll_partition(
+ &self,
+ cx: &mut Context<'_>,
+ partition: usize,
+ ) -> Poll<Option<Result<RecordBatch>>>;
+}
diff --git a/datafusion/core/src/scheduler/pipeline/repartition.rs
b/datafusion/core/src/scheduler/pipeline/repartition.rs
new file mode 100644
index 000000000..c35ab909f
--- /dev/null
+++ b/datafusion/core/src/scheduler/pipeline/repartition.rs
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::VecDeque;
+use std::task::{Context, Poll, Waker};
+
+use parking_lot::Mutex;
+
+use crate::arrow::record_batch::RecordBatch;
+use crate::error::Result;
+use crate::physical_plan::repartition::BatchPartitioner;
+use crate::physical_plan::Partitioning;
+
+use crate::scheduler::pipeline::Pipeline;
+
+/// A [`Pipeline`] that can repartition its input
+#[derive(Debug)]
+pub struct RepartitionPipeline {
+ output_count: usize,
+ state: Mutex<RepartitionState>,
+}
+
+impl RepartitionPipeline {
+ /// Create a new [`RepartitionPipeline`] with the given `input` and
`output` partitioning
+ pub fn try_new(input: Partitioning, output: Partitioning) -> Result<Self> {
+ let input_count = input.partition_count();
+ let output_count = output.partition_count();
+ assert_ne!(input_count, 0);
+ assert_ne!(output_count, 0);
+
+ // TODO: metrics support
+ let partitioner = BatchPartitioner::try_new(output,
Default::default())?;
+
+ let state = Mutex::new(RepartitionState {
+ partitioner,
+ partition_closed: vec![false; input_count],
+ input_closed: false,
+ output_buffers: (0..output_count).map(|_|
Default::default()).collect(),
+ });
+
+ Ok(Self {
+ state,
+ output_count,
+ })
+ }
+}
+
+struct RepartitionState {
+ partitioner: BatchPartitioner,
+ partition_closed: Vec<bool>,
+ input_closed: bool,
+ output_buffers: Vec<OutputBuffer>,
+}
+
+impl std::fmt::Debug for RepartitionState {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("RepartitionState")
+ .field("partition_closed", &self.partition_closed)
+ .field("input_closed", &self.input_closed)
+ .finish()
+ }
+}
+
+impl Pipeline for RepartitionPipeline {
+ fn push(&self, input: RecordBatch, child: usize, partition: usize) ->
Result<()> {
+ assert_eq!(child, 0);
+
+ let mut state = self.state.lock();
+ assert!(
+ !state.partition_closed[partition],
+ "attempt to push to closed partition {} of
RepartitionPipeline({:?})",
+ partition, state
+ );
+
+ let state = &mut *state;
+ state.partitioner.partition(input, |partition, batch| {
+ state.output_buffers[partition].push_batch(batch);
+ Ok(())
+ })
+ }
+
+ fn close(&self, child: usize, partition: usize) {
+ assert_eq!(child, 0);
+
+ let mut state = self.state.lock();
+ assert!(
+ !state.partition_closed[partition],
+ "attempt to close already closed partition {} of
RepartitionPipeline({:?})",
+ partition, state
+ );
+
+ state.partition_closed[partition] = true;
+
+ // If all input streams exhausted, wake outputs
+ if state.partition_closed.iter().all(|x| *x) {
+ state.input_closed = true;
+ for buffer in &mut state.output_buffers {
+ for waker in buffer.wait_list.drain(..) {
+ waker.wake()
+ }
+ }
+ }
+ }
+
+ fn output_partitions(&self) -> usize {
+ self.output_count
+ }
+
+ fn poll_partition(
+ &self,
+ cx: &mut Context<'_>,
+ partition: usize,
+ ) -> Poll<Option<Result<RecordBatch>>> {
+ let mut state = self.state.lock();
+ let input_closed = state.input_closed;
+ let buffer = &mut state.output_buffers[partition];
+
+ match buffer.batches.pop_front() {
+ Some(batch) => Poll::Ready(Some(Ok(batch))),
+ None if input_closed => Poll::Ready(None),
+ _ => {
+ buffer.wait_list.push(cx.waker().clone());
+ Poll::Pending
+ }
+ }
+ }
+}
+
+#[derive(Debug, Default)]
+struct OutputBuffer {
+ batches: VecDeque<RecordBatch>,
+ wait_list: Vec<Waker>,
+}
+
+impl OutputBuffer {
+ fn push_batch(&mut self, batch: RecordBatch) {
+ self.batches.push_back(batch);
+
+ for waker in self.wait_list.drain(..) {
+ waker.wake()
+ }
+ }
+}
diff --git a/datafusion/core/src/scheduler/plan.rs
b/datafusion/core/src/scheduler/plan.rs
new file mode 100644
index 000000000..e7d5e1d33
--- /dev/null
+++ b/datafusion/core/src/scheduler/plan.rs
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::SchemaRef;
+use std::sync::Arc;
+
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::{ExecutionPlan, Partitioning};
+
+use crate::scheduler::pipeline::{
+ execution::ExecutionPipeline, repartition::RepartitionPipeline, Pipeline,
+};
+
+/// Identifies the [`Pipeline`] within the [`PipelinePlan`] to route output to
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub struct OutputLink {
+ /// The index of the [`Pipeline`] in [`PipelinePlan`] to route output to
+ pub pipeline: usize,
+
+ /// The child of the [`Pipeline`] to route output to
+ pub child: usize,
+}
+
+/// Combines a [`Pipeline`] with an [`OutputLink`] identifying where to send
its output
+#[derive(Debug)]
+pub struct RoutablePipeline {
+ /// The pipeline that produces data
+ pub pipeline: Box<dyn Pipeline>,
+
+ /// Where to send output the output of `pipeline`
+ ///
+ /// If `None`, the output should be sent to the query output
+ pub output: Option<OutputLink>,
+}
+
+/// [`PipelinePlan`] is the scheduler's representation of the
[`ExecutionPlan`] passed to
+/// [`super::Scheduler::schedule`]. It combines the list of [Pipeline`] with
the information
+/// necessary to route output from one stage to the next
+#[derive(Debug)]
+pub struct PipelinePlan {
+ /// Schema of this plans output
+ pub schema: SchemaRef,
+
+ /// Number of output partitions
+ pub output_partitions: usize,
+
+ /// Pipelines that comprise this plan
+ pub pipelines: Vec<RoutablePipeline>,
+}
+
+/// When converting [`ExecutionPlan`] to [`Pipeline`] we may wish to group
+/// together multiple operators, [`OperatorGroup`] stores this state
+struct OperatorGroup {
+ /// Where to route the output of the eventual [`Pipeline`]
+ output: Option<OutputLink>,
+
+ /// The [`ExecutionPlan`] from which to start recursing
+ root: Arc<dyn ExecutionPlan>,
+
+ /// The number of times to recurse into the [`ExecutionPlan`]'s children
+ depth: usize,
+}
+
+/// A utility struct to assist converting from [`ExecutionPlan`] to
[`PipelinePlan`]
+///
+/// The [`ExecutionPlan`] is visited in a depth-first fashion, gradually
building
+/// up the [`RoutablePipeline`] for the [`PipelinePlan`]. As nodes are visited
depth-first,
+/// a node is visited only after its parent has been.
+pub struct PipelinePlanner {
+ task_context: Arc<TaskContext>,
+
+ /// The schema of this plan
+ schema: SchemaRef,
+
+ /// The number of output partitions of this plan
+ output_partitions: usize,
+
+ /// The current list of completed pipelines
+ completed: Vec<RoutablePipeline>,
+
+ /// A list of [`ExecutionPlan`] still to visit, along with
+ /// where they should route their output
+ to_visit: Vec<(Arc<dyn ExecutionPlan>, Option<OutputLink>)>,
+
+ /// Stores one or more operators to combine
+ /// together into a single [`ExecutionPipeline`]
+ execution_operators: Option<OperatorGroup>,
+}
+
+impl PipelinePlanner {
+ pub fn new(plan: Arc<dyn ExecutionPlan>, task_context: Arc<TaskContext>)
-> Self {
+ let schema = plan.schema();
+ let output_partitions = plan.output_partitioning().partition_count();
+ Self {
+ completed: vec![],
+ to_visit: vec![(plan, None)],
+ task_context,
+ execution_operators: None,
+ schema,
+ output_partitions,
+ }
+ }
+
+ /// Flush the current group of operators stored in `execution_operators`
+ /// into a single [`ExecutionPipeline]
+ fn flush_exec(&mut self) -> Result<usize> {
+ let group = self.execution_operators.take().unwrap();
+ let node_idx = self.completed.len();
+ self.completed.push(RoutablePipeline {
+ pipeline: Box::new(ExecutionPipeline::new(
+ group.root,
+ self.task_context.clone(),
+ group.depth,
+ )?),
+ output: group.output,
+ });
+ Ok(node_idx)
+ }
+
+ /// Visit a non-special cased [`ExecutionPlan`]
+ fn visit_exec(
+ &mut self,
+ plan: Arc<dyn ExecutionPlan>,
+ parent: Option<OutputLink>,
+ ) -> Result<()> {
+ let children = plan.children();
+
+ // Add the operator to the current group of operators to be combined
+ // into a single [`ExecutionPipeline`].
+ //
+ // TODO: More sophisticated policy, just because we can combine them
doesn't mean we should
+ match self.execution_operators.as_mut() {
+ Some(buffer) => {
+ assert_eq!(parent, buffer.output, "QueryBuilder out of sync");
+ buffer.depth += 1;
+ }
+ None => {
+ self.execution_operators = Some(OperatorGroup {
+ output: parent,
+ root: plan,
+ depth: 0,
+ })
+ }
+ }
+
+ match children.len() {
+ 1 => {
+ // Enqueue the children with the parent of the `OperatorGroup`
+ self.to_visit
+ .push((children.into_iter().next().unwrap(), parent))
+ }
+ _ => {
+ // We can only recursively group through nodes with a single
child, therefore
+ // if this node has multiple children, we now need to flush
the buffer and
+ // enqueue its children with this new pipeline as its parent
+ let node = self.flush_exec()?;
+ self.enqueue_children(children, node);
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Add the given list of children to the stack of [`ExecutionPlan`] to
visit
+ fn enqueue_children(
+ &mut self,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ parent_node_idx: usize,
+ ) {
+ for (child_idx, child) in children.into_iter().enumerate() {
+ self.to_visit.push((
+ child,
+ Some(OutputLink {
+ pipeline: parent_node_idx,
+ child: child_idx,
+ }),
+ ))
+ }
+ }
+
+ /// Push a new [`RoutablePipeline`] and enqueue its children to be visited
+ fn push_pipeline(
+ &mut self,
+ node: RoutablePipeline,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) {
+ let node_idx = self.completed.len();
+ self.completed.push(node);
+ self.enqueue_children(children, node_idx)
+ }
+
+ /// Push a new [`RepartitionPipeline`] first flushing any buffered
[`OperatorGroup`]
+ fn push_repartition(
+ &mut self,
+ input: Partitioning,
+ output: Partitioning,
+ parent: Option<OutputLink>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<()> {
+ let parent = match &self.execution_operators {
+ Some(buffer) => {
+ assert_eq!(buffer.output, parent, "QueryBuilder out of sync");
+ Some(OutputLink {
+ pipeline: self.flush_exec()?,
+ child: 0, // Must be the only child
+ })
+ }
+ None => parent,
+ };
+
+ let node = Box::new(RepartitionPipeline::try_new(input, output)?);
+ self.push_pipeline(
+ RoutablePipeline {
+ pipeline: node,
+ output: parent,
+ },
+ children,
+ );
+ Ok(())
+ }
+
+ /// Visit an [`ExecutionPlan`] operator and add it to the [`PipelinePlan`]
being built
+ fn visit_operator(
+ &mut self,
+ plan: Arc<dyn ExecutionPlan>,
+ parent: Option<OutputLink>,
+ ) -> Result<()> {
+ if let Some(repartition) =
plan.as_any().downcast_ref::<RepartitionExec>() {
+ self.push_repartition(
+ repartition.input().output_partitioning(),
+ repartition.output_partitioning(),
+ parent,
+ repartition.children(),
+ )
+ } else if let Some(coalesce) =
+ plan.as_any().downcast_ref::<CoalescePartitionsExec>()
+ {
+ self.push_repartition(
+ coalesce.input().output_partitioning(),
+ Partitioning::RoundRobinBatch(1),
+ parent,
+ coalesce.children(),
+ )
+ } else {
+ self.visit_exec(plan, parent)
+ }
+ }
+
+ /// Build a [`PipelinePlan`] from the [`ExecutionPlan`] provided to
[`PipelinePlanner::new`]
+ ///
+ /// This will group all operators possible into a single
[`ExecutionPipeline`], only
+ /// creating new pipelines when:
+ ///
+ /// - encountering an operator with multiple children
+ /// - encountering a repartitioning operator
+ ///
+ /// This latter case is because currently the repartitioning operators in
DataFusion are
+ /// coupled with the non-scheduler-based parallelism story
+ ///
+ /// The above logic is liable to change, is considered an implementation
detail of the
+ /// scheduler, and should not be relied upon by operators
+ ///
+ pub fn build(mut self) -> Result<PipelinePlan> {
+ // We do a depth-first scan of the operator tree, extracting a list of
[`QueryNode`]
+ while let Some((plan, parent)) = self.to_visit.pop() {
+ self.visit_operator(plan, parent)?;
+ }
+
+ if self.execution_operators.is_some() {
+ self.flush_exec()?;
+ }
+
+ Ok(PipelinePlan {
+ schema: self.schema,
+ output_partitions: self.output_partitions,
+ pipelines: self.completed,
+ })
+ }
+}
diff --git a/datafusion/core/src/scheduler/task.rs
b/datafusion/core/src/scheduler/task.rs
new file mode 100644
index 000000000..e91985437
--- /dev/null
+++ b/datafusion/core/src/scheduler/task.rs
@@ -0,0 +1,497 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::stream::RecordBatchStreamAdapter;
+use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
+use crate::scheduler::{
+ is_worker, plan::PipelinePlan, spawn_local, spawn_local_fifo,
RoutablePipeline,
+ Spawner,
+};
+use arrow::datatypes::SchemaRef;
+use arrow::error::ArrowError;
+use arrow::record_batch::RecordBatch;
+use futures::channel::mpsc;
+use futures::task::ArcWake;
+use futures::{ready, Stream, StreamExt};
+use log::{debug, trace};
+use std::pin::Pin;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Weak};
+use std::task::{Context, Poll};
+
+/// Spawns a [`PipelinePlan`] using the provided [`Spawner`]
+pub fn spawn_plan(plan: PipelinePlan, spawner: Spawner) -> ExecutionResults {
+ debug!("Spawning pipeline plan: {:#?}", plan);
+
+ let (senders, receivers) = (0..plan.output_partitions)
+ .map(|_| mpsc::unbounded())
+ .unzip::<_, _, Vec<_>, Vec<_>>();
+
+ let context = Arc::new(ExecutionContext {
+ spawner,
+ pipelines: plan.pipelines,
+ schema: plan.schema,
+ output: senders,
+ });
+
+ for (pipeline_idx, query_pipeline) in context.pipelines.iter().enumerate()
{
+ for partition in 0..query_pipeline.pipeline.output_partitions() {
+ context.spawner.spawn(Task {
+ context: context.clone(),
+ waker: Arc::new(TaskWaker {
+ context: Arc::downgrade(&context),
+ wake_count: AtomicUsize::new(1),
+ pipeline: pipeline_idx,
+ partition,
+ }),
+ });
+ }
+ }
+
+ let partitions = receivers
+ .into_iter()
+ .map(|receiver| ExecutionResultStream {
+ receiver: receiver,
+ context: context.clone(),
+ })
+ .collect();
+
+ ExecutionResults {
+ streams: partitions,
+ context,
+ }
+}
+
+/// A [`Task`] identifies an output partition within a given pipeline that may
be able to
+/// make progress. The [`Scheduler`][super::Scheduler] maintains a list of
outstanding
+/// [`Task`] and distributes them amongst its worker threads.
+pub struct Task {
+ /// Maintain a link to the [`ExecutionContext`] this is necessary to be
able to
+ /// route the output of the partition to its destination
+ context: Arc<ExecutionContext>,
+
+ /// A [`ArcWake`] that can be used to re-schedule this [`Task`] for
execution
+ waker: Arc<TaskWaker>,
+}
+
+impl std::fmt::Debug for Task {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let output = &self.context.pipelines[self.waker.pipeline].output;
+
+ f.debug_struct("Task")
+ .field("pipeline", &self.waker.pipeline)
+ .field("partition", &self.waker.partition)
+ .field("output", &output)
+ .finish()
+ }
+}
+
+impl Task {
+ fn handle_error(
+ &self,
+ partition: usize,
+ routable: &RoutablePipeline,
+ error: DataFusionError,
+ ) {
+ self.context.send_query_output(partition, Err(error));
+ if let Some(link) = routable.output {
+ trace!(
+ "Closing pipeline: {:?}, partition: {}, due to error",
+ link,
+ self.waker.partition,
+ );
+
+ self.context.pipelines[link.pipeline]
+ .pipeline
+ .close(link.child, self.waker.partition);
+ }
+ }
+
+ /// Call [`Pipeline::poll_partition`], attempting to make progress on
query execution
+ pub fn do_work(self) {
+ assert!(is_worker(), "Task::do_work called outside of worker pool");
+ if self.context.is_cancelled() {
+ return;
+ }
+
+ // Capture the wake count prior to calling [`Pipeline::poll_partition`]
+ // this allows us to detect concurrent wake ups and handle them
correctly
+ let wake_count = self.waker.wake_count.load(Ordering::SeqCst);
+
+ let node = self.waker.pipeline;
+ let partition = self.waker.partition;
+
+ let waker = futures::task::waker_ref(&self.waker);
+ let mut cx = Context::from_waker(&*waker);
+
+ let pipelines = &self.context.pipelines;
+ let routable = &pipelines[node];
+ match routable.pipeline.poll_partition(&mut cx, partition) {
+ Poll::Ready(Some(Ok(batch))) => {
+ trace!("Poll {:?}: Ok: {}", self, batch.num_rows());
+ match routable.output {
+ Some(link) => {
+ trace!(
+ "Publishing batch to pipeline {:?} partition {}",
+ link,
+ partition
+ );
+
+ let r = pipelines[link.pipeline]
+ .pipeline
+ .push(batch, link.child, partition);
+
+ if let Err(e) = r {
+ self.handle_error(partition, routable, e);
+
+ // Return without rescheduling this output again
+ return;
+ }
+ }
+ None => {
+ trace!("Publishing batch to output");
+ self.context.send_query_output(partition, Ok(batch))
+ }
+ }
+
+ // Reschedule this pipeline again
+ //
+ // We want to prioritise running tasks triggered by the most
recent
+ // batch, so reschedule with FIFO ordering
+ //
+ // Note: We must schedule after we have routed the batch,
otherwise
+ // we introduce a potential ordering race where the newly
scheduled
+ // task runs before this task finishes routing the output
+ spawn_local_fifo(self);
+ }
+ Poll::Ready(Some(Err(e))) => {
+ trace!("Poll {:?}: Error: {:?}", self, e);
+ self.handle_error(partition, routable, e)
+ }
+ Poll::Ready(None) => {
+ trace!("Poll {:?}: None", self);
+ match routable.output {
+ Some(link) => {
+ trace!("Closing pipeline: {:?}, partition: {}", link,
partition);
+ pipelines[link.pipeline]
+ .pipeline
+ .close(link.child, partition)
+ }
+ None => self.context.finish(partition),
+ }
+ }
+ Poll::Pending => {
+ trace!("Poll {:?}: Pending", self);
+ // Attempt to reset the wake count with the value obtained
prior
+ // to calling [`Pipeline::poll_partition`].
+ //
+ // If this fails it indicates a wakeup was received whilst
executing
+ // [`Pipeline::poll_partition`] and we should reschedule the
task
+ let reset = self.waker.wake_count.compare_exchange(
+ wake_count,
+ 0,
+ Ordering::SeqCst,
+ Ordering::SeqCst,
+ );
+
+ if reset.is_err() {
+ trace!("Wakeup triggered whilst polling: {:?}", self);
+ spawn_local(self);
+ }
+ }
+ }
+ }
+}
+
+/// The results of the execution of a query
+pub struct ExecutionResults {
+ /// [`ExecutionResultStream`] for each partition of this query
+ streams: Vec<ExecutionResultStream>,
+
+ /// Keep a reference to the [`ExecutionContext`] so it isn't dropped early
+ context: Arc<ExecutionContext>,
+}
+
+impl ExecutionResults {
+ /// Returns a [`SendableRecordBatchStream`] of this execution
+ ///
+ /// In the event of multiple output partitions, the output will be
interleaved
+ pub fn stream(self) -> SendableRecordBatchStream {
+ let schema = self.context.schema.clone();
+ Box::pin(RecordBatchStreamAdapter::new(
+ schema,
+ futures::stream::select_all(self.streams),
+ ))
+ }
+
+ /// Returns a [`SendableRecordBatchStream`] for each partition of this
execution
+ pub fn stream_partitioned(self) -> Vec<SendableRecordBatchStream> {
+ self.streams.into_iter().map(|s| Box::pin(s) as _).collect()
+ }
+}
+
+/// A result stream for the execution of a query
+struct ExecutionResultStream {
+ receiver: mpsc::UnboundedReceiver<Option<Result<RecordBatch>>>,
+
+ /// Keep a reference to the [`ExecutionContext`] so it isn't dropped early
+ context: Arc<ExecutionContext>,
+}
+
+impl Stream for ExecutionResultStream {
+ type Item = arrow::error::Result<RecordBatch>;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let opt = ready!(self.receiver.poll_next_unpin(cx)).flatten();
+ Poll::Ready(opt.map(|r| r.map_err(|e|
ArrowError::ExternalError(Box::new(e)))))
+ }
+}
+
+impl RecordBatchStream for ExecutionResultStream {
+ fn schema(&self) -> SchemaRef {
+ self.context.schema.clone()
+ }
+}
+
+/// The shared state of all [`Task`] created from the same [`PipelinePlan`]
+#[derive(Debug)]
+struct ExecutionContext {
+ /// Spawner for this query
+ spawner: Spawner,
+
+ /// List of pipelines that belong to this query, pipelines are addressed
+ /// based on their index within this list
+ pipelines: Vec<RoutablePipeline>,
+
+ /// Schema of this plans output
+ pub schema: SchemaRef,
+
+ /// The output streams, per partition, for this query's execution
+ output: Vec<mpsc::UnboundedSender<Option<Result<RecordBatch>>>>,
+}
+
+impl Drop for ExecutionContext {
+ fn drop(&mut self) {
+ debug!("ExecutionContext dropped");
+ }
+}
+
+impl ExecutionContext {
+ /// Returns `true` if this query has been dropped, specifically if the
+ /// stream returned by [`super::Scheduler::schedule`] has been dropped
+ fn is_cancelled(&self) -> bool {
+ self.output.iter().all(|x| x.is_closed())
+ }
+
+ /// Sends `output` to this query's output stream
+ fn send_query_output(&self, partition: usize, output: Result<RecordBatch>)
{
+ let _ = self.output[partition].unbounded_send(Some(output));
+ }
+
+ /// Mark this partition as finished
+ fn finish(&self, partition: usize) {
+ let _ = self.output[partition].unbounded_send(None);
+ }
+}
+
+struct TaskWaker {
+ /// Store a weak reference to the [`ExecutionContext`] to avoid reference
cycles if this
+ /// [`Waker`] is stored within a [`Pipeline`] owned by the
[`ExecutionContext`]
+ context: Weak<ExecutionContext>,
+
+ /// A counter that stores the number of times this has been awoken
+ ///
+ /// A value > 0, implies the task is either in the ready queue or
+ /// currently being executed
+ ///
+ /// `TaskWaker::wake` always increments the `wake_count`, however, it only
+ /// re-enqueues the [`Task`] if the value prior to increment was 0
+ ///
+ /// This ensures that a given [`Task`] is not enqueued multiple times
+ ///
+ /// We store an integer, as opposed to a boolean, so that wake ups that
+ /// occur during [`Pipeline::poll_partition`] can be detected and handled
+ /// after it has finished executing
+ ///
+ wake_count: AtomicUsize,
+
+ /// The index of the pipeline within `query` to poll
+ pipeline: usize,
+
+ /// The partition of the pipeline within `query` to poll
+ partition: usize,
+}
+
+impl ArcWake for TaskWaker {
+ fn wake(self: Arc<Self>) {
+ if self.wake_count.fetch_add(1, Ordering::SeqCst) != 0 {
+ trace!("Ignoring duplicate wakeup");
+ return;
+ }
+
+ if let Some(context) = self.context.upgrade() {
+ let task = Task {
+ context,
+ waker: self.clone(),
+ };
+
+ trace!("Wakeup {:?}", task);
+
+ // If called from a worker, spawn to the current worker's
+ // local queue, otherwise reschedule on any worker
+ match is_worker() {
+ true => spawn_local(task),
+ false => task.context.spawner.clone().spawn(task),
+ }
+ } else {
+ trace!("Dropped wakeup");
+ }
+ }
+
+ fn wake_by_ref(s: &Arc<Self>) {
+ ArcWake::wake(s.clone())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::error::Result;
+ use crate::scheduler::{pipeline::Pipeline, plan::RoutablePipeline,
Scheduler};
+ use arrow::array::{ArrayRef, Int32Array};
+ use arrow::datatypes::{DataType, Field, Schema};
+ use arrow::record_batch::RecordBatch;
+ use futures::{channel::oneshot, ready, FutureExt, StreamExt};
+ use parking_lot::Mutex;
+ use std::fmt::Debug;
+ use std::time::Duration;
+
+ /// Tests that waker can be sent to tokio pool
+ #[derive(Debug)]
+ struct TokioPipeline {
+ handle: tokio::runtime::Handle,
+ state: Mutex<State>,
+ }
+
+ #[derive(Debug)]
+ enum State {
+ Init,
+ Wait(oneshot::Receiver<Result<RecordBatch>>),
+ Finished,
+ }
+
+ impl Default for State {
+ fn default() -> Self {
+ Self::Init
+ }
+ }
+
+ impl Pipeline for TokioPipeline {
+ fn push(
+ &self,
+ _input: RecordBatch,
+ _child: usize,
+ _partition: usize,
+ ) -> Result<()> {
+ unreachable!()
+ }
+
+ fn close(&self, _child: usize, _partition: usize) {}
+
+ fn output_partitions(&self) -> usize {
+ 1
+ }
+
+ fn poll_partition(
+ &self,
+ cx: &mut Context<'_>,
+ _partition: usize,
+ ) -> Poll<Option<Result<RecordBatch>>> {
+ let mut state = self.state.lock();
+ loop {
+ match &mut *state {
+ State::Init => {
+ let (sender, receiver) = oneshot::channel();
+ self.handle.spawn(async move {
+
tokio::time::sleep(Duration::from_millis(10)).await;
+ let array = Int32Array::from_iter_values([1, 2,
3]);
+ sender.send(
+ RecordBatch::try_from_iter([(
+ "int",
+ Arc::new(array) as ArrayRef,
+ )])
+ .map_err(DataFusionError::ArrowError),
+ )
+ });
+ *state = State::Wait(receiver)
+ }
+ State::Wait(r) => {
+ let v = ready!(r.poll_unpin(cx)).ok();
+ *state = State::Finished;
+ return Poll::Ready(v);
+ }
+ State::Finished => return Poll::Ready(None),
+ }
+ }
+ }
+ }
+
+ #[test]
+ fn test_tokio_waker() {
+ let scheduler = Scheduler::new(2);
+
+ // A tokio runtime
+ let runtime = tokio::runtime::Builder::new_current_thread()
+ .enable_time()
+ .build()
+ .unwrap();
+
+ // A pipeline that dispatches to a tokio worker
+ let pipeline = TokioPipeline {
+ handle: runtime.handle().clone(),
+ state: Default::default(),
+ };
+
+ let plan = PipelinePlan {
+ schema: Arc::new(Schema::new(vec![Field::new(
+ "int",
+ DataType::Int32,
+ false,
+ )])),
+ output_partitions: 1,
+ pipelines: vec![RoutablePipeline {
+ pipeline: Box::new(pipeline),
+ output: None,
+ }],
+ };
+
+ let mut receiver = scheduler.schedule_plan(plan).stream();
+
+ runtime.block_on(async move {
+ // Should wait for output
+ let batch = receiver.next().await.unwrap().unwrap();
+ assert_eq!(batch.num_rows(), 3);
+
+ // Next batch should be none
+ assert!(receiver.next().await.is_none());
+ })
+ }
+}