alamb commented on code in PR #2226:
URL: https://github.com/apache/arrow-datafusion/pull/2226#discussion_r851347286


##########
datafusion/scheduler/src/pipeline/mod.rs:
##########
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::task::{Context, Poll};
+
+use arrow::record_batch::RecordBatch;
+
+use crate::ArrowResult;
+
+pub mod execution;
+pub mod repartition;
+
+/// A push-based interface used by the scheduler to drive query execution
+///
+/// A pipeline processes data from one or more input partitions, producing 
output
+/// to one or more output partitions. As a [`Pipeline`] may drawn on input from
+/// more than one upstream [`Pipeline`], input partitions are identified by 
both
+/// a child index, and a partition index, whereas output partitions are only
+/// identified by a partition index.
+///
+/// This is not intended as an eventual replacement for the physical plan 
representation
+/// within DataFusion, [`ExecutionPlan`], but rather a generic interface that
+/// parts of the physical plan are "compiled" into by the scheduler.
+///
+/// # Push vs Pull Execution
+///
+/// Whilst the interface exposed to the scheduler is push-based, in which 
member functions

Review Comment:
   ```suggestion
   /// Whilst the interface exposed to the scheduler is push-based, the order 
of member function
   ```



##########
datafusion/scheduler/src/pipeline/execution.rs:
##########
@@ -0,0 +1,324 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::VecDeque;
+use std::fmt::Formatter;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll, Waker};
+
+use arrow::error::ArrowError;
+use async_trait::async_trait;
+use futures::{Stream, StreamExt, TryStreamExt};
+use parking_lot::Mutex;
+
+use datafusion::arrow::datatypes::SchemaRef;
+use datafusion::arrow::{error::Result as ArrowResult, 
record_batch::RecordBatch};
+use datafusion::error::Result;
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::expressions::PhysicalSortExpr;
+use datafusion::physical_plan::metrics::MetricsSet;
+use datafusion::physical_plan::{
+    displayable, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    RecordBatchStream, SendableRecordBatchStream, Statistics,
+};
+
+use crate::pipeline::Pipeline;
+use crate::BoxStream;
+
+/// An [`ExecutionPipeline`] wraps a portion of an [`ExecutionPlan`] and
+/// converts it to the push-based [`Pipeline`] interface
+///
+/// Internally [`ExecutionPipeline`] is still pull-based which limits its 
parallelism
+/// to that of its output partitioning, however, it provides full 
compatibility with
+/// [`ExecutionPlan`] allowing full interoperability with the existing 
ecosystem
+///
+/// Longer term we will likely want to introduce new traits that differentiate 
between
+/// stateless operators like filters, and stateful aggregations, and are 
better aligned
+/// with a push-based execution model. This in turn will allow for 
[`Pipeline`] implementations
+/// that are able to introduce parallelism beyond that expressed in their 
partitioning

Review Comment:
   I still don't understand this distinction stateful and non stateful 
operators.
   
   Maybe the difference is the "pipeline breaking" ability (the standard 
database term for operators that may not produce output until they have seen  
some/all of their input)



##########
datafusion/scheduler/src/lib.rs:
##########
@@ -0,0 +1,386 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use futures::stream::{BoxStream, StreamExt};
+use log::{debug, error};
+
+use datafusion::arrow::error::Result as ArrowResult;
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::error::Result;
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::ExecutionPlan;
+
+use crate::query::Query;
+use crate::task::{spawn_query, Task};
+
+use rayon::{ThreadPool, ThreadPoolBuilder};
+
+mod pipeline;
+mod query;
+mod task;
+
+/// Builder for a [`Scheduler`]
+#[derive(Debug)]
+pub struct SchedulerBuilder {
+    inner: ThreadPoolBuilder,
+}
+
+impl SchedulerBuilder {
+    /// Create a new [`SchedulerConfig`] with the provided number of threads
+    pub fn new(num_threads: usize) -> Self {
+        let builder = ThreadPoolBuilder::new()
+            .num_threads(num_threads)
+            .panic_handler(|p| error!("{}", format_worker_panic(p)))
+            .thread_name(|idx| format!("df-worker-{}", idx));
+
+        Self { inner: builder }
+    }
+
+    /// Registers a custom panic handler
+    ///
+    /// Used by tests
+    #[allow(dead_code)]
+    fn panic_handler<H>(self, panic_handler: H) -> Self
+    where
+        H: Fn(Box<dyn std::any::Any + Send>) + Send + Sync + 'static,
+    {
+        Self {
+            inner: self.inner.panic_handler(panic_handler),
+        }
+    }
+
+    /// Build a new [`Scheduler`]
+    fn build(self) -> Scheduler {
+        Scheduler {
+            pool: Arc::new(self.inner.build().unwrap()),
+        }
+    }
+}
+
+/// A [`Scheduler`] maintains a pool of dedicated worker threads on which
+/// query execution can be scheduled. This is based on the idea of 
[Morsel-Driven Parallelism]
+/// and is designed to decouple the execution parallelism from the parallelism 
expressed in
+/// the physical plan as partitions.
+///
+/// # Implementation
+///
+/// When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it 
up into smaller
+/// chunks called pipelines. Each pipeline may consist of one or more nodes 
from the
+/// [`ExecutionPlan`] tree.
+///
+/// The scheduler then maintains a list of pending [`Task`], that identify a 
partition within
+/// a particular pipeline that may be able to make progress on some "morsel" 
of data. These
+/// [`Task`] are then scheduled on the worker pool, with a preference for 
scheduling work
+/// on a given "morsel" on the same thread that produced it.
+///
+/// # Rayon
+///
+/// Under-the-hood these [`Task`] are scheduled by [rayon], which is a 
lightweight, work-stealing
+/// scheduler optimised for CPU-bound workloads. Pipelines may exploit this 
fact, and use [rayon]'s
+/// structured concurrency primitives to express additional parallelism that 
may be exploited
+/// if there are idle threads available at runtime
+///
+/// # Shutdown
+///
+/// TBC
+///
+/// [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf
+/// [rayon]: https://docs.rs/rayon/latest/rayon/
+///
+pub struct Scheduler {
+    pool: Arc<ThreadPool>,
+}
+
+impl Scheduler {
+    /// Create a new [`Scheduler`] with `num_threads` new threads in a 
dedicated thread pool
+    pub fn new(num_threads: usize) -> Self {
+        SchedulerBuilder::new(num_threads).build()
+    }
+
+    /// Schedule the provided [`ExecutionPlan`] on this [`Scheduler`].
+    ///
+    /// Returns a [`BoxStream`] that can be used to receive results as they 
are produced
+    pub fn schedule(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        context: Arc<TaskContext>,
+    ) -> Result<BoxStream<'static, ArrowResult<RecordBatch>>> {
+        let (query, receiver) = Query::new(plan, context, self.spawner())?;
+        spawn_query(Arc::new(query));
+        Ok(receiver.boxed())
+    }
+
+    fn spawner(&self) -> Spawner {
+        Spawner {
+            pool: self.pool.clone(),
+        }
+    }
+}
+
+/// Formats a panic message for a worker
+fn format_worker_panic(panic: Box<dyn std::any::Any + Send>) -> String {
+    let maybe_idx = rayon::current_thread_index();
+    let worker: &dyn std::fmt::Display = match &maybe_idx {
+        Some(idx) => idx,
+        None => &"UNKNOWN",
+    };
+
+    let message = if let Some(msg) = panic.downcast_ref::<&str>() {
+        *msg
+    } else if let Some(msg) = panic.downcast_ref::<String>() {
+        msg.as_str()
+    } else {
+        "UNKNOWN"
+    };
+
+    format!("worker {} panicked with: {}", worker, message)
+}
+
+/// Returns `true` if the current thread is a worker thread
+fn is_worker() -> bool {
+    rayon::current_thread_index().is_some()
+}
+
+/// Spawn a [`Task`] onto the local workers thread pool
+///
+/// There is no guaranteed order of execution, as workers may steal at any 
time. However,

Review Comment:
   👍 



##########
datafusion/scheduler/src/pipeline/mod.rs:
##########
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::task::{Context, Poll};
+
+use arrow::record_batch::RecordBatch;
+
+use crate::ArrowResult;
+
+pub mod execution;
+pub mod repartition;
+
+/// A push-based interface used by the scheduler to drive query execution
+///
+/// A pipeline processes data from one or more input partitions, producing 
output
+/// to one or more output partitions. As a [`Pipeline`] may drawn on input from
+/// more than one upstream [`Pipeline`], input partitions are identified by 
both
+/// a child index, and a partition index, whereas output partitions are only
+/// identified by a partition index.
+///
+/// This is not intended as an eventual replacement for the physical plan 
representation
+/// within DataFusion, [`ExecutionPlan`], but rather a generic interface that
+/// parts of the physical plan are "compiled" into by the scheduler.
+///
+/// # Push vs Pull Execution
+///
+/// Whilst the interface exposed to the scheduler is push-based, in which 
member functions
+/// computation is performed is intentionally left as an implementation detail 
of the [`Pipeline`]
+///
+/// This allows flexibility to support the following different patterns, and 
potentially more:
+///
+/// An eager, push-based pipeline, that processes a batch synchronously in 
[`Pipeline::push`]
+/// and immediately wakes the corresponding output partition.
+///
+/// A parallel, push-based pipeline, that enqueues the processing of a batch 
to the rayon
+/// thread pool in [`Pipeline::push`], and wakes the corresponding output 
partition when
+/// the job completes. Order and non-order preserving variants are possible
+///
+/// A merge pipeline which combines data from one or more input partitions 
into one or
+/// more output partitions. [`Pipeline::push`] adds data to an input buffer, 
and wakes
+/// any output partitions that may now be able to make progress. This may be 
none none
+/// if the operator is waiting on data from a different input partition
+///
+/// An aggregation pipeline which combines data from one or more input 
partitions into
+/// a single output partition. [`Pipeline::push`] would eagerly update the 
computed
+/// aggregates, and the final [`Pipeline::close`] trigger flushing these to 
the output

Review Comment:
   There is another related strategy for hash aggregation which is to do 
partial aggregation and when the hash table is full flush the output hash table 
and start again fresh (meaning many batch pushes produce some output, but some 
pushes would produce output - the content of the hash table).
   
   Perhaps this is similar to merge-pipeline from a scheduler perspective



##########
datafusion/scheduler/src/pipeline/mod.rs:
##########
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::task::{Context, Poll};
+
+use arrow::record_batch::RecordBatch;
+
+use crate::ArrowResult;
+
+pub mod execution;
+pub mod repartition;
+
+/// A push-based interface used by the scheduler to drive query execution
+///
+/// A pipeline processes data from one or more input partitions, producing 
output
+/// to one or more output partitions. As a [`Pipeline`] may drawn on input from
+/// more than one upstream [`Pipeline`], input partitions are identified by 
both
+/// a child index, and a partition index, whereas output partitions are only
+/// identified by a partition index.
+///
+/// This is not intended as an eventual replacement for the physical plan 
representation
+/// within DataFusion, [`ExecutionPlan`], but rather a generic interface that
+/// parts of the physical plan are "compiled" into by the scheduler.
+///
+/// # Push vs Pull Execution
+///
+/// Whilst the interface exposed to the scheduler is push-based, in which 
member functions
+/// computation is performed is intentionally left as an implementation detail 
of the [`Pipeline`]
+///
+/// This allows flexibility to support the following different patterns, and 
potentially more:
+///
+/// An eager, push-based pipeline, that processes a batch synchronously in 
[`Pipeline::push`]
+/// and immediately wakes the corresponding output partition.
+///
+/// A parallel, push-based pipeline, that enqueues the processing of a batch 
to the rayon
+/// thread pool in [`Pipeline::push`], and wakes the corresponding output 
partition when
+/// the job completes. Order and non-order preserving variants are possible
+///
+/// A merge pipeline which combines data from one or more input partitions 
into one or
+/// more output partitions. [`Pipeline::push`] adds data to an input buffer, 
and wakes
+/// any output partitions that may now be able to make progress. This may be 
none none

Review Comment:
   ```suggestion
   /// any output partitions that may now be able to make progress. This may be 
none
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to