This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0332eb569a refactor: move ExecutionPlan and related structs into 
dedicated mod (#11759)
0332eb569a is described below

commit 0332eb569a5428ac385fe892ce7b5fb40d52c8c0
Author: Ruihang Xia <[email protected]>
AuthorDate: Fri Aug 2 20:10:41 2024 +0800

    refactor: move ExecutionPlan and related structs into dedicated mod (#11759)
    
    Signed-off-by: Ruihang Xia <[email protected]>
---
 .../src/{lib.rs => execution_plan.rs}              |  54 +-
 datafusion/physical-plan/src/lib.rs                | 995 +--------------------
 2 files changed, 17 insertions(+), 1032 deletions(-)

diff --git a/datafusion/physical-plan/src/lib.rs 
b/datafusion/physical-plan/src/execution_plan.rs
similarity index 96%
copy from datafusion/physical-plan/src/lib.rs
copy to datafusion/physical-plan/src/execution_plan.rs
index 19554d07f7..5a3fc086c1 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -14,10 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-// Make cheap clones clear: https://github.com/apache/datafusion/issues/11143
-#![deny(clippy::clone_on_ref_ptr)]
-
-//! Traits for physical query plan, supporting parallel execution for 
partitioned relations.
 
 use std::any::Any;
 use std::fmt::Debug;
@@ -54,47 +50,6 @@ use crate::repartition::RepartitionExec;
 use crate::sorts::sort_preserving_merge::SortPreservingMergeExec;
 pub use crate::stream::EmptyRecordBatchStream;
 use crate::stream::RecordBatchStreamAdapter;
-pub use crate::topk::TopK;
-pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor};
-
-mod ordering;
-mod topk;
-mod visitor;
-
-pub mod aggregates;
-pub mod analyze;
-pub mod coalesce_batches;
-pub mod coalesce_partitions;
-pub mod common;
-pub mod display;
-pub mod empty;
-pub mod explain;
-pub mod filter;
-pub mod insert;
-pub mod joins;
-pub mod limit;
-pub mod memory;
-pub mod metrics;
-pub mod placeholder_row;
-pub mod projection;
-pub mod recursive_query;
-pub mod repartition;
-pub mod sorts;
-pub mod spill;
-pub mod stream;
-pub mod streaming;
-pub mod tree_node;
-pub mod union;
-pub mod unnest;
-pub mod values;
-pub mod windows;
-pub mod work_table;
-
-pub mod udaf {
-    pub use datafusion_physical_expr_common::aggregate::{
-        create_aggregate_expr, create_aggregate_expr_with_dfschema, 
AggregateFunctionExpr,
-    };
-}
 
 /// Represent nodes in the DataFusion Physical Plan.
 ///
@@ -558,7 +513,7 @@ impl ExecutionMode {
 }
 
 /// Conservatively "combines" execution modes of a given collection of 
operators.
-fn execution_mode_from_children<'a>(
+pub(crate) fn execution_mode_from_children<'a>(
     children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
 ) -> ExecutionMode {
     let mut result = ExecutionMode::Bounded;
@@ -722,7 +677,7 @@ pub async fn collect(
     context: Arc<TaskContext>,
 ) -> Result<Vec<RecordBatch>> {
     let stream = execute_stream(plan, context)?;
-    common::collect(stream).await
+    crate::common::collect(stream).await
 }
 
 /// Execute the [ExecutionPlan] and return a single stream of `RecordBatch`es.
@@ -909,6 +864,7 @@ pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> 
Vec<String> {
 
 #[cfg(test)]
 mod tests {
+    use super::*;
     use std::any::Any;
     use std::sync::Arc;
 
@@ -917,7 +873,7 @@ mod tests {
     use datafusion_common::{Result, Statistics};
     use datafusion_execution::{SendableRecordBatchStream, TaskContext};
 
-    use crate::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
+    use crate::{DisplayAs, DisplayFormatType, ExecutionPlan};
 
     #[derive(Debug)]
     pub struct EmptyExec;
@@ -1059,4 +1015,4 @@ mod tests {
     }
 }
 
-pub mod test;
+// pub mod test;
diff --git a/datafusion/physical-plan/src/lib.rs 
b/datafusion/physical-plan/src/lib.rs
index 19554d07f7..eeecc017c2 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -14,46 +14,36 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+
 // Make cheap clones clear: https://github.com/apache/datafusion/issues/11143
+
 #![deny(clippy::clone_on_ref_ptr)]
 
 //! Traits for physical query plan, supporting parallel execution for 
partitioned relations.
+//!
+//! Entrypoint of this crate is trait [ExecutionPlan].
 
-use std::any::Any;
-use std::fmt::Debug;
-use std::sync::Arc;
-
-use arrow::datatypes::SchemaRef;
-use arrow::record_batch::RecordBatch;
-use futures::stream::{StreamExt, TryStreamExt};
-use tokio::task::JoinSet;
-
-use datafusion_common::config::ConfigOptions;
 pub use datafusion_common::hash_utils;
 pub use datafusion_common::utils::project_schema;
-use datafusion_common::{exec_err, Result};
 pub use datafusion_common::{internal_err, ColumnStatistics, Statistics};
-use datafusion_execution::TaskContext;
 pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
 pub use datafusion_expr::{Accumulator, ColumnarValue};
 pub use datafusion_physical_expr::window::WindowExpr;
+use datafusion_physical_expr::PhysicalSortExpr;
 pub use datafusion_physical_expr::{
     expressions, functions, udf, AggregateExpr, Distribution, Partitioning, 
PhysicalExpr,
 };
-use datafusion_physical_expr::{
-    EquivalenceProperties, LexOrdering, PhysicalSortExpr, 
PhysicalSortRequirement,
-};
 
-use crate::coalesce_partitions::CoalescePartitionsExec;
-use crate::display::DisplayableExecutionPlan;
 pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, 
VerboseDisplay};
+pub(crate) use crate::execution_plan::execution_mode_from_children;
+pub use crate::execution_plan::{
+    collect, collect_partitioned, displayable, execute_input_stream, 
execute_stream,
+    execute_stream_partitioned, get_plan_string, 
with_new_children_if_necessary,
+    ExecutionMode, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
+};
 pub use crate::metrics::Metric;
-use crate::metrics::MetricsSet;
 pub use crate::ordering::InputOrderMode;
-use crate::repartition::RepartitionExec;
-use crate::sorts::sort_preserving_merge::SortPreservingMergeExec;
 pub use crate::stream::EmptyRecordBatchStream;
-use crate::stream::RecordBatchStreamAdapter;
 pub use crate::topk::TopK;
 pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor};
 
@@ -68,6 +58,7 @@ pub mod coalesce_partitions;
 pub mod common;
 pub mod display;
 pub mod empty;
+pub mod execution_plan;
 pub mod explain;
 pub mod filter;
 pub mod insert;
@@ -96,967 +87,5 @@ pub mod udaf {
     };
 }
 
-/// Represent nodes in the DataFusion Physical Plan.
-///
-/// Calling [`execute`] produces an `async` [`SendableRecordBatchStream`] of
-/// [`RecordBatch`] that incrementally computes a partition of the
-/// `ExecutionPlan`'s output from its input. See [`Partitioning`] for more
-/// details on partitioning.
-///
-/// Methods such as [`Self::schema`] and [`Self::properties`] communicate
-/// properties of the output to the DataFusion optimizer, and methods such as
-/// [`required_input_distribution`] and [`required_input_ordering`] express
-/// requirements of the `ExecutionPlan` from its input.
-///
-/// [`ExecutionPlan`] can be displayed in a simplified form using the
-/// return value from [`displayable`] in addition to the (normally
-/// quite verbose) `Debug` output.
-///
-/// [`execute`]: ExecutionPlan::execute
-/// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
-/// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
-pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
-    /// Short name for the ExecutionPlan, such as 'ParquetExec'.
-    ///
-    /// Implementation note: this method can just proxy to
-    /// [`static_name`](ExecutionPlan::static_name) if no special action is
-    /// needed. It doesn't provide a default implementation like that because
-    /// this method doesn't require the `Sized` constrain to allow a wilder
-    /// range of use cases.
-    fn name(&self) -> &str;
-
-    /// Short name for the ExecutionPlan, such as 'ParquetExec'.
-    /// Like [`name`](ExecutionPlan::name) but can be called without an 
instance.
-    fn static_name() -> &'static str
-    where
-        Self: Sized,
-    {
-        let full_name = std::any::type_name::<Self>();
-        let maybe_start_idx = full_name.rfind(':');
-        match maybe_start_idx {
-            Some(start_idx) => &full_name[start_idx + 1..],
-            None => "UNKNOWN",
-        }
-    }
-
-    /// Returns the execution plan as [`Any`] so that it can be
-    /// downcast to a specific implementation.
-    fn as_any(&self) -> &dyn Any;
-
-    /// Get the schema for this execution plan
-    fn schema(&self) -> SchemaRef {
-        Arc::clone(self.properties().schema())
-    }
-
-    /// Return properties of the output of the `ExecutionPlan`, such as output
-    /// ordering(s), partitioning information etc.
-    ///
-    /// This information is available via methods on 
[`ExecutionPlanProperties`]
-    /// trait, which is implemented for all `ExecutionPlan`s.
-    fn properties(&self) -> &PlanProperties;
-
-    /// Specifies the data distribution requirements for all the
-    /// children for this `ExecutionPlan`, By default it's 
[[Distribution::UnspecifiedDistribution]] for each child,
-    fn required_input_distribution(&self) -> Vec<Distribution> {
-        vec![Distribution::UnspecifiedDistribution; self.children().len()]
-    }
-
-    /// Specifies the ordering required for all of the children of this
-    /// `ExecutionPlan`.
-    ///
-    /// For each child, it's the local ordering requirement within
-    /// each partition rather than the global ordering
-    ///
-    /// NOTE that checking `!is_empty()` does **not** check for a
-    /// required input ordering. Instead, the correct check is that at
-    /// least one entry must be `Some`
-    fn required_input_ordering(&self) -> 
Vec<Option<Vec<PhysicalSortRequirement>>> {
-        vec![None; self.children().len()]
-    }
-
-    /// Returns `false` if this `ExecutionPlan`'s implementation may reorder
-    /// rows within or between partitions.
-    ///
-    /// For example, Projection, Filter, and Limit maintain the order
-    /// of inputs -- they may transform values (Projection) or not
-    /// produce the same number of rows that went in (Filter and
-    /// Limit), but the rows that are produced go in the same way.
-    ///
-    /// DataFusion uses this metadata to apply certain optimizations
-    /// such as automatically repartitioning correctly.
-    ///
-    /// The default implementation returns `false`
-    ///
-    /// WARNING: if you override this default, you *MUST* ensure that
-    /// the `ExecutionPlan`'s maintains the ordering invariant or else
-    /// DataFusion may produce incorrect results.
-    fn maintains_input_order(&self) -> Vec<bool> {
-        vec![false; self.children().len()]
-    }
-
-    /// Specifies whether the `ExecutionPlan` benefits from increased
-    /// parallelization at its input for each child.
-    ///
-    /// If returns `true`, the `ExecutionPlan` would benefit from partitioning
-    /// its corresponding child (and thus from more parallelism). For
-    /// `ExecutionPlan` that do very little work the overhead of extra
-    /// parallelism may outweigh any benefits
-    ///
-    /// The default implementation returns `true` unless this `ExecutionPlan`
-    /// has signalled it requires a single child input partition.
-    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
-        // By default try to maximize parallelism with more CPUs if
-        // possible
-        self.required_input_distribution()
-            .into_iter()
-            .map(|dist| !matches!(dist, Distribution::SinglePartition))
-            .collect()
-    }
-
-    /// Get a list of children `ExecutionPlan`s that act as inputs to this 
plan.
-    /// The returned list will be empty for leaf nodes such as scans, will 
contain
-    /// a single value for unary nodes, or two values for binary nodes (such as
-    /// joins).
-    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;
-
-    /// Returns a new `ExecutionPlan` where all existing children were replaced
-    /// by the `children`, in order
-    fn with_new_children(
-        self: Arc<Self>,
-        children: Vec<Arc<dyn ExecutionPlan>>,
-    ) -> Result<Arc<dyn ExecutionPlan>>;
-
-    /// If supported, attempt to increase the partitioning of this 
`ExecutionPlan` to
-    /// produce `target_partitions` partitions.
-    ///
-    /// If the `ExecutionPlan` does not support changing its partitioning,
-    /// returns `Ok(None)` (the default).
-    ///
-    /// It is the `ExecutionPlan` can increase its partitioning, but not to the
-    /// `target_partitions`, it may return an ExecutionPlan with fewer
-    /// partitions. This might happen, for example, if each new partition would
-    /// be too small to be efficiently processed individually.
-    ///
-    /// The DataFusion optimizer attempts to use as many threads as possible by
-    /// repartitioning its inputs to match the target number of threads
-    /// available (`target_partitions`). Some data sources, such as the built 
in
-    /// CSV and Parquet readers, implement this method as they are able to read
-    /// from their input files in parallel, regardless of how the source data 
is
-    /// split amongst files.
-    fn repartitioned(
-        &self,
-        _target_partitions: usize,
-        _config: &ConfigOptions,
-    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
-        Ok(None)
-    }
-
-    /// Begin execution of `partition`, returning a [`Stream`] of
-    /// [`RecordBatch`]es.
-    ///
-    /// # Notes
-    ///
-    /// The `execute` method itself is not `async` but it returns an `async`
-    /// [`futures::stream::Stream`]. This `Stream` should incrementally compute
-    /// the output, `RecordBatch` by `RecordBatch` (in a streaming fashion).
-    /// Most `ExecutionPlan`s should not do any work before the first
-    /// `RecordBatch` is requested from the stream.
-    ///
-    /// [`RecordBatchStreamAdapter`] can be used to convert an `async`
-    /// [`Stream`] into a [`SendableRecordBatchStream`].
-    ///
-    /// Using `async` `Streams` allows for network I/O during execution and
-    /// takes advantage of Rust's built in support for `async` continuations 
and
-    /// crate ecosystem.
-    ///
-    /// [`Stream`]: futures::stream::Stream
-    /// [`StreamExt`]: futures::stream::StreamExt
-    /// [`TryStreamExt`]: futures::stream::TryStreamExt
-    /// [`RecordBatchStreamAdapter`]: crate::stream::RecordBatchStreamAdapter
-    ///
-    /// # Cancellation / Aborting Execution
-    ///
-    /// The [`Stream`] that is returned must ensure that any allocated 
resources
-    /// are freed when the stream itself is dropped. This is particularly
-    /// important for [`spawn`]ed tasks or threads. Unless care is taken to
-    /// "abort" such tasks, they may continue to consume resources even after
-    /// the plan is dropped, generating intermediate results that are never
-    /// used.
-    /// Thus, [`spawn`] is disallowed, and instead use [`SpawnedTask`].
-    ///
-    /// For more details see [`SpawnedTask`], [`JoinSet`] and 
[`RecordBatchReceiverStreamBuilder`]
-    /// for structures to help ensure all background tasks are cancelled.
-    ///
-    /// [`spawn`]: tokio::task::spawn
-    /// [`JoinSet`]: tokio::task::JoinSet
-    /// [`SpawnedTask`]: datafusion_common_runtime::SpawnedTask
-    /// [`RecordBatchReceiverStreamBuilder`]: 
crate::stream::RecordBatchReceiverStreamBuilder
-    ///
-    /// # Implementation Examples
-    ///
-    /// While `async` `Stream`s have a non trivial learning curve, the
-    /// [`futures`] crate provides [`StreamExt`] and [`TryStreamExt`]
-    /// which help simplify many common operations.
-    ///
-    /// Here are some common patterns:
-    ///
-    /// ## Return Precomputed `RecordBatch`
-    ///
-    /// We can return a precomputed `RecordBatch` as a `Stream`:
-    ///
-    /// ```
-    /// # use std::sync::Arc;
-    /// # use arrow_array::RecordBatch;
-    /// # use arrow_schema::SchemaRef;
-    /// # use datafusion_common::Result;
-    /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
-    /// # use datafusion_physical_plan::memory::MemoryStream;
-    /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
-    /// struct MyPlan {
-    ///     batch: RecordBatch,
-    /// }
-    ///
-    /// impl MyPlan {
-    ///     fn execute(
-    ///         &self,
-    ///         partition: usize,
-    ///         context: Arc<TaskContext>
-    ///     ) -> Result<SendableRecordBatchStream> {
-    ///         // use functions from futures crate convert the batch into a 
stream
-    ///         let fut = futures::future::ready(Ok(self.batch.clone()));
-    ///         let stream = futures::stream::once(fut);
-    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(self.batch.schema(), 
stream)))
-    ///     }
-    /// }
-    /// ```
-    ///
-    /// ## Lazily (async) Compute `RecordBatch`
-    ///
-    /// We can also lazily compute a `RecordBatch` when the returned `Stream` 
is polled
-    ///
-    /// ```
-    /// # use std::sync::Arc;
-    /// # use arrow_array::RecordBatch;
-    /// # use arrow_schema::SchemaRef;
-    /// # use datafusion_common::Result;
-    /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
-    /// # use datafusion_physical_plan::memory::MemoryStream;
-    /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
-    /// struct MyPlan {
-    ///     schema: SchemaRef,
-    /// }
-    ///
-    /// /// Returns a single batch when the returned stream is polled
-    /// async fn get_batch() -> Result<RecordBatch> {
-    ///     todo!()
-    /// }
-    ///
-    /// impl MyPlan {
-    ///     fn execute(
-    ///         &self,
-    ///         partition: usize,
-    ///         context: Arc<TaskContext>
-    ///     ) -> Result<SendableRecordBatchStream> {
-    ///         let fut = get_batch();
-    ///         let stream = futures::stream::once(fut);
-    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), 
stream)))
-    ///     }
-    /// }
-    /// ```
-    ///
-    /// ## Lazily (async) create a Stream
-    ///
-    /// If you need to create the return `Stream` using an `async` function,
-    /// you can do so by flattening the result:
-    ///
-    /// ```
-    /// # use std::sync::Arc;
-    /// # use arrow_array::RecordBatch;
-    /// # use arrow_schema::SchemaRef;
-    /// # use futures::TryStreamExt;
-    /// # use datafusion_common::Result;
-    /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
-    /// # use datafusion_physical_plan::memory::MemoryStream;
-    /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
-    /// struct MyPlan {
-    ///     schema: SchemaRef,
-    /// }
-    ///
-    /// /// async function that returns a stream
-    /// async fn get_batch_stream() -> Result<SendableRecordBatchStream> {
-    ///     todo!()
-    /// }
-    ///
-    /// impl MyPlan {
-    ///     fn execute(
-    ///         &self,
-    ///         partition: usize,
-    ///         context: Arc<TaskContext>
-    ///     ) -> Result<SendableRecordBatchStream> {
-    ///         // A future that yields a stream
-    ///         let fut = get_batch_stream();
-    ///         // Use TryStreamExt::try_flatten to flatten the stream of 
streams
-    ///         let stream = futures::stream::once(fut).try_flatten();
-    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), 
stream)))
-    ///     }
-    /// }
-    /// ```
-    fn execute(
-        &self,
-        partition: usize,
-        context: Arc<TaskContext>,
-    ) -> Result<SendableRecordBatchStream>;
-
-    /// Return a snapshot of the set of [`Metric`]s for this
-    /// [`ExecutionPlan`]. If no `Metric`s are available, return None.
-    ///
-    /// While the values of the metrics in the returned
-    /// [`MetricsSet`]s may change as execution progresses, the
-    /// specific metrics will not.
-    ///
-    /// Once `self.execute()` has returned (technically the future is
-    /// resolved) for all available partitions, the set of metrics
-    /// should be complete. If this function is called prior to
-    /// `execute()` new metrics may appear in subsequent calls.
-    fn metrics(&self) -> Option<MetricsSet> {
-        None
-    }
-
-    /// Returns statistics for this `ExecutionPlan` node. If statistics are not
-    /// available, should return [`Statistics::new_unknown`] (the default), not
-    /// an error.
-    fn statistics(&self) -> Result<Statistics> {
-        Ok(Statistics::new_unknown(&self.schema()))
-    }
-
-    /// Returns `true` if a limit can be safely pushed down through this
-    /// `ExecutionPlan` node.
-    ///
-    /// If this method returns `true`, and the query plan contains a limit at
-    /// the output of this node, DataFusion will push the limit to the input
-    /// of this node.
-    fn supports_limit_pushdown(&self) -> bool {
-        false
-    }
-
-    /// Returns a fetching variant of this `ExecutionPlan` node, if it supports
-    /// fetch limits. Returns `None` otherwise.
-    fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn 
ExecutionPlan>> {
-        None
-    }
-}
-
-/// Extension trait provides an easy API to fetch various properties of
-/// [`ExecutionPlan`] objects based on [`ExecutionPlan::properties`].
-pub trait ExecutionPlanProperties {
-    /// Specifies how the output of this `ExecutionPlan` is split into
-    /// partitions.
-    fn output_partitioning(&self) -> &Partitioning;
-
-    /// Specifies whether this plan generates an infinite stream of records.
-    /// If the plan does not support pipelining, but its input(s) are
-    /// infinite, returns [`ExecutionMode::PipelineBreaking`] to indicate this.
-    fn execution_mode(&self) -> ExecutionMode;
-
-    /// If the output of this `ExecutionPlan` within each partition is sorted,
-    /// returns `Some(keys)` describing the ordering. A `None` return value
-    /// indicates no assumptions should be made on the output ordering.
-    ///
-    /// For example, `SortExec` (obviously) produces sorted output as does
-    /// `SortPreservingMergeStream`. Less obviously, `Projection` produces 
sorted
-    /// output if its input is sorted as it does not reorder the input rows.
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>;
-
-    /// Get the [`EquivalenceProperties`] within the plan.
-    ///
-    /// Equivalence properties tell DataFusion what columns are known to be
-    /// equal, during various optimization passes. By default, this returns "no
-    /// known equivalences" which is always correct, but may cause DataFusion 
to
-    /// unnecessarily resort data.
-    ///
-    /// If this ExecutionPlan makes no changes to the schema of the rows 
flowing
-    /// through it or how columns within each row relate to each other, it
-    /// should return the equivalence properties of its input. For
-    /// example, since `FilterExec` may remove rows from its input, but does 
not
-    /// otherwise modify them, it preserves its input equivalence properties.
-    /// However, since `ProjectionExec` may calculate derived expressions, it
-    /// needs special handling.
-    ///
-    /// See also [`ExecutionPlan::maintains_input_order`] and 
[`Self::output_ordering`]
-    /// for related concepts.
-    fn equivalence_properties(&self) -> &EquivalenceProperties;
-}
-
-impl ExecutionPlanProperties for Arc<dyn ExecutionPlan> {
-    fn output_partitioning(&self) -> &Partitioning {
-        self.properties().output_partitioning()
-    }
-
-    fn execution_mode(&self) -> ExecutionMode {
-        self.properties().execution_mode()
-    }
-
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.properties().output_ordering()
-    }
-
-    fn equivalence_properties(&self) -> &EquivalenceProperties {
-        self.properties().equivalence_properties()
-    }
-}
-
-impl ExecutionPlanProperties for &dyn ExecutionPlan {
-    fn output_partitioning(&self) -> &Partitioning {
-        self.properties().output_partitioning()
-    }
-
-    fn execution_mode(&self) -> ExecutionMode {
-        self.properties().execution_mode()
-    }
-
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.properties().output_ordering()
-    }
-
-    fn equivalence_properties(&self) -> &EquivalenceProperties {
-        self.properties().equivalence_properties()
-    }
-}
-
-/// Describes the execution mode of an operator's resulting stream with respect
-/// to its size and behavior. There are three possible execution modes: 
`Bounded`,
-/// `Unbounded` and `PipelineBreaking`.
-#[derive(Clone, Copy, PartialEq, Debug)]
-pub enum ExecutionMode {
-    /// Represents the mode where generated stream is bounded, e.g. finite.
-    Bounded,
-    /// Represents the mode where generated stream is unbounded, e.g. infinite.
-    /// Even though the operator generates an unbounded stream of results, it
-    /// works with bounded memory and execution can still continue 
successfully.
-    ///
-    /// The stream that results from calling `execute` on an `ExecutionPlan` 
that is `Unbounded`
-    /// will never be done (return `None`), except in case of error.
-    Unbounded,
-    /// Represents the mode where some of the operator's input stream(s) are
-    /// unbounded; however, the operator cannot generate streaming results from
-    /// these streaming inputs. In this case, the execution mode will be 
pipeline
-    /// breaking, e.g. the operator requires unbounded memory to generate 
results.
-    PipelineBreaking,
-}
-
-impl ExecutionMode {
-    /// Check whether the execution mode is unbounded or not.
-    pub fn is_unbounded(&self) -> bool {
-        matches!(self, ExecutionMode::Unbounded)
-    }
-
-    /// Check whether the execution is pipeline friendly. If so, operator can
-    /// execute safely.
-    pub fn pipeline_friendly(&self) -> bool {
-        matches!(self, ExecutionMode::Bounded | ExecutionMode::Unbounded)
-    }
-}
-
-/// Conservatively "combines" execution modes of a given collection of 
operators.
-fn execution_mode_from_children<'a>(
-    children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
-) -> ExecutionMode {
-    let mut result = ExecutionMode::Bounded;
-    for mode in children.into_iter().map(|child| child.execution_mode()) {
-        match (mode, result) {
-            (ExecutionMode::PipelineBreaking, _)
-            | (_, ExecutionMode::PipelineBreaking) => {
-                // If any of the modes is `PipelineBreaking`, so is the result:
-                return ExecutionMode::PipelineBreaking;
-            }
-            (ExecutionMode::Unbounded, _) | (_, ExecutionMode::Unbounded) => {
-                // Unbounded mode eats up bounded mode:
-                result = ExecutionMode::Unbounded;
-            }
-            (ExecutionMode::Bounded, ExecutionMode::Bounded) => {
-                // When both modes are bounded, so is the result:
-                result = ExecutionMode::Bounded;
-            }
-        }
-    }
-    result
-}
-
-/// Stores certain, often expensive to compute, plan properties used in query
-/// optimization.
-///
-/// These properties are stored a single structure to permit this information 
to
-/// be computed once and then those cached results used multiple times without
-/// recomputation (aka a cache)
-#[derive(Debug, Clone)]
-pub struct PlanProperties {
-    /// See [ExecutionPlanProperties::equivalence_properties]
-    pub eq_properties: EquivalenceProperties,
-    /// See [ExecutionPlanProperties::output_partitioning]
-    pub partitioning: Partitioning,
-    /// See [ExecutionPlanProperties::execution_mode]
-    pub execution_mode: ExecutionMode,
-    /// See [ExecutionPlanProperties::output_ordering]
-    output_ordering: Option<LexOrdering>,
-}
-
-impl PlanProperties {
-    /// Construct a new `PlanPropertiesCache` from the
-    pub fn new(
-        eq_properties: EquivalenceProperties,
-        partitioning: Partitioning,
-        execution_mode: ExecutionMode,
-    ) -> Self {
-        // Output ordering can be derived from `eq_properties`.
-        let output_ordering = eq_properties.output_ordering();
-        Self {
-            eq_properties,
-            partitioning,
-            execution_mode,
-            output_ordering,
-        }
-    }
-
-    /// Overwrite output partitioning with its new value.
-    pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
-        self.partitioning = partitioning;
-        self
-    }
-
-    /// Overwrite the execution Mode with its new value.
-    pub fn with_execution_mode(mut self, execution_mode: ExecutionMode) -> 
Self {
-        self.execution_mode = execution_mode;
-        self
-    }
-
-    /// Overwrite equivalence properties with its new value.
-    pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) 
-> Self {
-        // Changing equivalence properties also changes output ordering, so
-        // make sure to overwrite it:
-        self.output_ordering = eq_properties.output_ordering();
-        self.eq_properties = eq_properties;
-        self
-    }
-
-    pub fn equivalence_properties(&self) -> &EquivalenceProperties {
-        &self.eq_properties
-    }
-
-    pub fn output_partitioning(&self) -> &Partitioning {
-        &self.partitioning
-    }
-
-    pub fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.output_ordering.as_deref()
-    }
-
-    pub fn execution_mode(&self) -> ExecutionMode {
-        self.execution_mode
-    }
-
-    /// Get schema of the node.
-    fn schema(&self) -> &SchemaRef {
-        self.eq_properties.schema()
-    }
-}
-
-/// Indicate whether a data exchange is needed for the input of `plan`, which 
will be very helpful
-/// especially for the distributed engine to judge whether need to deal with 
shuffling.
-/// Currently there are 3 kinds of execution plan which needs data exchange
-///     1. RepartitionExec for changing the partition number between two 
`ExecutionPlan`s
-///     2. CoalescePartitionsExec for collapsing all of the partitions into 
one without ordering guarantee
-///     3. SortPreservingMergeExec for collapsing all of the sorted partitions 
into one with ordering guarantee
-pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
-    if let Some(repartition) = plan.as_any().downcast_ref::<RepartitionExec>() 
{
-        !matches!(
-            repartition.properties().output_partitioning(),
-            Partitioning::RoundRobinBatch(_)
-        )
-    } else if let Some(coalesce) = 
plan.as_any().downcast_ref::<CoalescePartitionsExec>()
-    {
-        coalesce.input().output_partitioning().partition_count() > 1
-    } else if let Some(sort_preserving_merge) =
-        plan.as_any().downcast_ref::<SortPreservingMergeExec>()
-    {
-        sort_preserving_merge
-            .input()
-            .output_partitioning()
-            .partition_count()
-            > 1
-    } else {
-        false
-    }
-}
-
-/// Returns a copy of this plan if we change any child according to the 
pointer comparison.
-/// The size of `children` must be equal to the size of 
`ExecutionPlan::children()`.
-pub fn with_new_children_if_necessary(
-    plan: Arc<dyn ExecutionPlan>,
-    children: Vec<Arc<dyn ExecutionPlan>>,
-) -> Result<Arc<dyn ExecutionPlan>> {
-    let old_children = plan.children();
-    if children.len() != old_children.len() {
-        internal_err!("Wrong number of children")
-    } else if children.is_empty()
-        || children
-            .iter()
-            .zip(old_children.iter())
-            .any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
-    {
-        plan.with_new_children(children)
-    } else {
-        Ok(plan)
-    }
-}
-
-/// Return a [wrapper](DisplayableExecutionPlan) around an
-/// [`ExecutionPlan`] which can be displayed in various easier to
-/// understand ways.
-pub fn displayable(plan: &dyn ExecutionPlan) -> DisplayableExecutionPlan<'_> {
-    DisplayableExecutionPlan::new(plan)
-}
-
-/// Execute the [ExecutionPlan] and collect the results in memory
-pub async fn collect(
-    plan: Arc<dyn ExecutionPlan>,
-    context: Arc<TaskContext>,
-) -> Result<Vec<RecordBatch>> {
-    let stream = execute_stream(plan, context)?;
-    common::collect(stream).await
-}
-
-/// Execute the [ExecutionPlan] and return a single stream of `RecordBatch`es.
-///
-/// See [collect] to buffer the `RecordBatch`es in memory.
-///
-/// # Aborting Execution
-///
-/// Dropping the stream will abort the execution of the query, and free up
-/// any allocated resources
-pub fn execute_stream(
-    plan: Arc<dyn ExecutionPlan>,
-    context: Arc<TaskContext>,
-) -> Result<SendableRecordBatchStream> {
-    match plan.output_partitioning().partition_count() {
-        0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
-        1 => plan.execute(0, context),
-        _ => {
-            // merge into a single partition
-            let plan = CoalescePartitionsExec::new(Arc::clone(&plan));
-            // CoalescePartitionsExec must produce a single partition
-            assert_eq!(1, 
plan.properties().output_partitioning().partition_count());
-            plan.execute(0, context)
-        }
-    }
-}
-
-/// Execute the [ExecutionPlan] and collect the results in memory
-pub async fn collect_partitioned(
-    plan: Arc<dyn ExecutionPlan>,
-    context: Arc<TaskContext>,
-) -> Result<Vec<Vec<RecordBatch>>> {
-    let streams = execute_stream_partitioned(plan, context)?;
-
-    let mut join_set = JoinSet::new();
-    // Execute the plan and collect the results into batches.
-    streams.into_iter().enumerate().for_each(|(idx, stream)| {
-        join_set.spawn(async move {
-            let result: Result<Vec<RecordBatch>> = stream.try_collect().await;
-            (idx, result)
-        });
-    });
-
-    let mut batches = vec![];
-    // Note that currently this doesn't identify the thread that panicked
-    //
-    // TODO: Replace with 
[join_next_with_id](https://docs.rs/tokio/latest/tokio/task/struct.JoinSet.html#method.join_next_with_id
-    // once it is stable
-    while let Some(result) = join_set.join_next().await {
-        match result {
-            Ok((idx, res)) => batches.push((idx, res?)),
-            Err(e) => {
-                if e.is_panic() {
-                    std::panic::resume_unwind(e.into_panic());
-                } else {
-                    unreachable!();
-                }
-            }
-        }
-    }
-
-    batches.sort_by_key(|(idx, _)| *idx);
-    let batches = batches.into_iter().map(|(_, batch)| batch).collect();
-
-    Ok(batches)
-}
-
-/// Execute the [ExecutionPlan] and return a vec with one stream per output
-/// partition
-///
-/// # Aborting Execution
-///
-/// Dropping the stream will abort the execution of the query, and free up
-/// any allocated resources
-pub fn execute_stream_partitioned(
-    plan: Arc<dyn ExecutionPlan>,
-    context: Arc<TaskContext>,
-) -> Result<Vec<SendableRecordBatchStream>> {
-    let num_partitions = plan.output_partitioning().partition_count();
-    let mut streams = Vec::with_capacity(num_partitions);
-    for i in 0..num_partitions {
-        streams.push(plan.execute(i, Arc::clone(&context))?);
-    }
-    Ok(streams)
-}
-
-/// Executes an input stream and ensures that the resulting stream adheres to
-/// the `not null` constraints specified in the `sink_schema`.
-///
-/// # Arguments
-///
-/// * `input` - An execution plan
-/// * `sink_schema` - The schema to be applied to the output stream
-/// * `partition` - The partition index to be executed
-/// * `context` - The task context
-///
-/// # Returns
-///
-/// * `Result<SendableRecordBatchStream>` - A stream of `RecordBatch`es if 
successful
-///
-/// This function first executes the given input plan for the specified 
partition
-/// and context. It then checks if there are any columns in the input that 
might
-/// violate the `not null` constraints specified in the `sink_schema`. If 
there are
-/// such columns, it wraps the resulting stream to enforce the `not null` 
constraints
-/// by invoking the `check_not_null_contraits` function on each batch of the 
stream.
-pub fn execute_input_stream(
-    input: Arc<dyn ExecutionPlan>,
-    sink_schema: SchemaRef,
-    partition: usize,
-    context: Arc<TaskContext>,
-) -> Result<SendableRecordBatchStream> {
-    let input_stream = input.execute(partition, context)?;
-
-    debug_assert_eq!(sink_schema.fields().len(), 
input.schema().fields().len());
-
-    // Find input columns that may violate the not null constraint.
-    let risky_columns: Vec<_> = sink_schema
-        .fields()
-        .iter()
-        .zip(input.schema().fields().iter())
-        .enumerate()
-        .filter_map(|(idx, (sink_field, input_field))| {
-            (!sink_field.is_nullable() && 
input_field.is_nullable()).then_some(idx)
-        })
-        .collect();
-
-    if risky_columns.is_empty() {
-        Ok(input_stream)
-    } else {
-        // Check not null constraint on the input stream
-        Ok(Box::pin(RecordBatchStreamAdapter::new(
-            sink_schema,
-            input_stream
-                .map(move |batch| check_not_null_contraits(batch?, 
&risky_columns)),
-        )))
-    }
-}
-
-/// Checks a `RecordBatch` for `not null` constraints on specified columns.
-///
-/// # Arguments
-///
-/// * `batch` - The `RecordBatch` to be checked
-/// * `column_indices` - A vector of column indices that should be checked for
-///   `not null` constraints.
-///
-/// # Returns
-///
-/// * `Result<RecordBatch>` - The original `RecordBatch` if all constraints 
are met
-///
-/// This function iterates over the specified column indices and ensures that 
none
-/// of the columns contain null values. If any column contains null values, an 
error
-/// is returned.
-pub fn check_not_null_contraits(
-    batch: RecordBatch,
-    column_indices: &Vec<usize>,
-) -> Result<RecordBatch> {
-    for &index in column_indices {
-        if batch.num_columns() <= index {
-            return exec_err!(
-                "Invalid batch column count {} expected > {}",
-                batch.num_columns(),
-                index
-            );
-        }
-
-        if batch.column(index).null_count() > 0 {
-            return exec_err!(
-                "Invalid batch column at '{}' has null but schema specifies 
non-nullable",
-                index
-            );
-        }
-    }
-
-    Ok(batch)
-}
-
-/// Utility function yielding a string representation of the given 
[`ExecutionPlan`].
-pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
-    let formatted = displayable(plan.as_ref()).indent(true).to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    actual.iter().map(|elem| elem.to_string()).collect()
-}
-
 #[cfg(test)]
-mod tests {
-    use std::any::Any;
-    use std::sync::Arc;
-
-    use arrow_schema::{Schema, SchemaRef};
-
-    use datafusion_common::{Result, Statistics};
-    use datafusion_execution::{SendableRecordBatchStream, TaskContext};
-
-    use crate::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
-
-    #[derive(Debug)]
-    pub struct EmptyExec;
-
-    impl EmptyExec {
-        pub fn new(_schema: SchemaRef) -> Self {
-            Self
-        }
-    }
-
-    impl DisplayAs for EmptyExec {
-        fn fmt_as(
-            &self,
-            _t: DisplayFormatType,
-            _f: &mut std::fmt::Formatter,
-        ) -> std::fmt::Result {
-            unimplemented!()
-        }
-    }
-
-    impl ExecutionPlan for EmptyExec {
-        fn name(&self) -> &'static str {
-            Self::static_name()
-        }
-
-        fn as_any(&self) -> &dyn Any {
-            self
-        }
-
-        fn properties(&self) -> &PlanProperties {
-            unimplemented!()
-        }
-
-        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
-            vec![]
-        }
-
-        fn with_new_children(
-            self: Arc<Self>,
-            _: Vec<Arc<dyn ExecutionPlan>>,
-        ) -> Result<Arc<dyn ExecutionPlan>> {
-            unimplemented!()
-        }
-
-        fn execute(
-            &self,
-            _partition: usize,
-            _context: Arc<TaskContext>,
-        ) -> Result<SendableRecordBatchStream> {
-            unimplemented!()
-        }
-
-        fn statistics(&self) -> Result<Statistics> {
-            unimplemented!()
-        }
-    }
-
-    #[derive(Debug)]
-    pub struct RenamedEmptyExec;
-
-    impl RenamedEmptyExec {
-        pub fn new(_schema: SchemaRef) -> Self {
-            Self
-        }
-    }
-
-    impl DisplayAs for RenamedEmptyExec {
-        fn fmt_as(
-            &self,
-            _t: DisplayFormatType,
-            _f: &mut std::fmt::Formatter,
-        ) -> std::fmt::Result {
-            unimplemented!()
-        }
-    }
-
-    impl ExecutionPlan for RenamedEmptyExec {
-        fn name(&self) -> &'static str {
-            Self::static_name()
-        }
-
-        fn static_name() -> &'static str
-        where
-            Self: Sized,
-        {
-            "MyRenamedEmptyExec"
-        }
-
-        fn as_any(&self) -> &dyn Any {
-            self
-        }
-
-        fn properties(&self) -> &PlanProperties {
-            unimplemented!()
-        }
-
-        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
-            vec![]
-        }
-
-        fn with_new_children(
-            self: Arc<Self>,
-            _: Vec<Arc<dyn ExecutionPlan>>,
-        ) -> Result<Arc<dyn ExecutionPlan>> {
-            unimplemented!()
-        }
-
-        fn execute(
-            &self,
-            _partition: usize,
-            _context: Arc<TaskContext>,
-        ) -> Result<SendableRecordBatchStream> {
-            unimplemented!()
-        }
-
-        fn statistics(&self) -> Result<Statistics> {
-            unimplemented!()
-        }
-    }
-
-    #[test]
-    fn test_execution_plan_name() {
-        let schema1 = Arc::new(Schema::empty());
-        let default_name_exec = EmptyExec::new(schema1);
-        assert_eq!(default_name_exec.name(), "EmptyExec");
-
-        let schema2 = Arc::new(Schema::empty());
-        let renamed_exec = RenamedEmptyExec::new(schema2);
-        assert_eq!(renamed_exec.name(), "MyRenamedEmptyExec");
-        assert_eq!(RenamedEmptyExec::static_name(), "MyRenamedEmptyExec");
-    }
-
-    /// A compilation test to ensure that the `ExecutionPlan::name()` method 
can
-    /// be called from a trait object.
-    /// Related ticket: https://github.com/apache/datafusion/pull/11047
-    #[allow(dead_code)]
-    fn use_execution_plan_as_trait_object(plan: &dyn ExecutionPlan) {
-        let _ = plan.name();
-    }
-}
-
 pub mod test;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to