This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 81d06f2e10 Move OutputRequirements to datafusion-physical-optimizer 
crate (#11579)
81d06f2e10 is described below

commit 81d06f2e103385fe744fb909563d4fb4c4b13d49
Author: Xin Li <[email protected]>
AuthorDate: Mon Jul 22 05:37:55 2024 -0700

    Move OutputRequirements to datafusion-physical-optimizer crate (#11579)
    
    * Move OutputRequirements to datafusion-physical-optimizer crate
    
    * Fix fmt
    
    * Fix cargo for cli
---
 datafusion-cli/Cargo.lock                          | 10 +++++----
 .../src/physical_optimizer/enforce_distribution.rs |  4 ++--
 datafusion/core/src/physical_optimizer/mod.rs      |  1 -
 datafusion/physical-optimizer/Cargo.toml           |  2 ++
 datafusion/physical-optimizer/src/lib.rs           |  1 +
 .../src}/output_requirements.rs                    | 24 +++++++++++++---------
 6 files changed, 25 insertions(+), 17 deletions(-)

diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 61d9c72b89..84bff8c871 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -118,9 +118,9 @@ dependencies = [
 
 [[package]]
 name = "arrayref"
-version = "0.3.7"
+version = "0.3.8"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6b4930d2cb77ce62f89ee5d5289b4ac049559b1c45539271f5ed4fdc7db34545"
+checksum = "9d151e35f61089500b617991b791fc8bfd237ae50cd5950803758a179b41e67a"
 
 [[package]]
 name = "arrayvec"
@@ -875,9 +875,9 @@ dependencies = [
 
 [[package]]
 name = "cc"
-version = "1.1.5"
+version = "1.1.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "324c74f2155653c90b04f25b2a47a8a631360cb908f92a772695f430c7e31052"
+checksum = "2aba8f4e9906c7ce3c73463f62a7f0c65183ada1a2d47e397cc8810827f9694f"
 dependencies = [
  "jobserver",
  "libc",
@@ -1397,6 +1397,8 @@ name = "datafusion-physical-optimizer"
 version = "40.0.0"
 dependencies = [
  "datafusion-common",
+ "datafusion-execution",
+ "datafusion-physical-expr",
  "datafusion-physical-plan",
 ]
 
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 9791f23f96..62ac9089e2 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -24,7 +24,6 @@
 use std::fmt::Debug;
 use std::sync::Arc;
 
-use super::output_requirements::OutputRequirementExec;
 use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_optimizer::utils::{
@@ -55,6 +54,7 @@ use datafusion_physical_expr::{
 use datafusion_physical_plan::windows::{get_best_fitting_window, 
BoundedWindowAggExec};
 use datafusion_physical_plan::ExecutionPlanProperties;
 
+use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
 use datafusion_physical_optimizer::PhysicalOptimizerRule;
 use itertools::izip;
 
@@ -1290,7 +1290,6 @@ pub(crate) mod tests {
     use crate::datasource::object_store::ObjectStoreUrl;
     use crate::datasource::physical_plan::{CsvExec, FileScanConfig, 
ParquetExec};
     use crate::physical_optimizer::enforce_sorting::EnforceSorting;
-    use crate::physical_optimizer::output_requirements::OutputRequirements;
     use crate::physical_optimizer::test_utils::{
         check_integrity, coalesce_partitions_exec, repartition_exec,
     };
@@ -1301,6 +1300,7 @@ pub(crate) mod tests {
     use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
     use crate::physical_plan::sorts::sort::SortExec;
     use crate::physical_plan::{displayable, DisplayAs, DisplayFormatType, 
Statistics};
+    use datafusion_physical_optimizer::output_requirements::OutputRequirements;
 
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
     use datafusion_common::ScalarValue;
diff --git a/datafusion/core/src/physical_optimizer/mod.rs 
b/datafusion/core/src/physical_optimizer/mod.rs
index 582f340151..a0c9c36977 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -29,7 +29,6 @@ pub mod enforce_sorting;
 pub mod join_selection;
 pub mod limited_distinct_aggregation;
 pub mod optimizer;
-pub mod output_requirements;
 pub mod projection_pushdown;
 pub mod pruning;
 pub mod replace_with_order_preserving_variants;
diff --git a/datafusion/physical-optimizer/Cargo.toml 
b/datafusion/physical-optimizer/Cargo.toml
index 9c0ee61da5..125ea6acc7 100644
--- a/datafusion/physical-optimizer/Cargo.toml
+++ b/datafusion/physical-optimizer/Cargo.toml
@@ -33,4 +33,6 @@ workspace = true
 
 [dependencies]
 datafusion-common = { workspace = true, default-features = true }
+datafusion-execution = { workspace = true }
+datafusion-physical-expr = { workspace = true }
 datafusion-physical-plan = { workspace = true }
diff --git a/datafusion/physical-optimizer/src/lib.rs 
b/datafusion/physical-optimizer/src/lib.rs
index c5a49216f5..6b9df7cad5 100644
--- a/datafusion/physical-optimizer/src/lib.rs
+++ b/datafusion/physical-optimizer/src/lib.rs
@@ -18,5 +18,6 @@
 #![deny(clippy::clone_on_ref_ptr)]
 
 mod optimizer;
+pub mod output_requirements;
 
 pub use optimizer::PhysicalOptimizerRule;
diff --git a/datafusion/core/src/physical_optimizer/output_requirements.rs 
b/datafusion/physical-optimizer/src/output_requirements.rs
similarity index 94%
rename from datafusion/core/src/physical_optimizer/output_requirements.rs
rename to datafusion/physical-optimizer/src/output_requirements.rs
index cb9a0cb90e..f971d8f1f0 100644
--- a/datafusion/core/src/physical_optimizer/output_requirements.rs
+++ b/datafusion/physical-optimizer/src/output_requirements.rs
@@ -24,17 +24,21 @@
 
 use std::sync::Arc;
 
-use crate::physical_plan::sorts::sort::SortExec;
-use crate::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
+use datafusion_execution::TaskContext;
+use datafusion_physical_plan::sorts::sort::SortExec;
+use datafusion_physical_plan::{
+    DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream,
+};
 
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_common::{Result, Statistics};
 use datafusion_physical_expr::{Distribution, LexRequirement, 
PhysicalSortRequirement};
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
 use 
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
 use datafusion_physical_plan::{ExecutionPlanProperties, PlanProperties};
 
+use crate::PhysicalOptimizerRule;
+
 /// This rule either adds or removes [`OutputRequirements`]s to/from the 
physical
 /// plan according to its `mode` attribute, which is set by the constructors
 /// `new_add_mode` and `new_remove_mode`. With this rule, we can keep track of
@@ -86,7 +90,7 @@ enum RuleMode {
 ///
 /// See [`OutputRequirements`] for more details
 #[derive(Debug)]
-pub(crate) struct OutputRequirementExec {
+pub struct OutputRequirementExec {
     input: Arc<dyn ExecutionPlan>,
     order_requirement: Option<LexRequirement>,
     dist_requirement: Distribution,
@@ -94,7 +98,7 @@ pub(crate) struct OutputRequirementExec {
 }
 
 impl OutputRequirementExec {
-    pub(crate) fn new(
+    pub fn new(
         input: Arc<dyn ExecutionPlan>,
         requirements: Option<LexRequirement>,
         dist_requirement: Distribution,
@@ -108,8 +112,8 @@ impl OutputRequirementExec {
         }
     }
 
-    pub(crate) fn input(&self) -> Arc<dyn ExecutionPlan> {
-        self.input.clone()
+    pub fn input(&self) -> Arc<dyn ExecutionPlan> {
+        Arc::clone(&self.input)
     }
 
     /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
@@ -179,8 +183,8 @@ impl ExecutionPlan for OutputRequirementExec {
     fn execute(
         &self,
         _partition: usize,
-        _context: Arc<crate::execution::context::TaskContext>,
-    ) -> Result<crate::physical_plan::SendableRecordBatchStream> {
+        _context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
         unreachable!();
     }
 
@@ -275,7 +279,7 @@ fn require_top_ordering_helper(
         // When an operator requires an ordering, any `SortExec` below can not
         // be responsible for (i.e. the originator of) the global ordering.
         let (new_child, is_changed) =
-            require_top_ordering_helper(children.swap_remove(0).clone())?;
+            require_top_ordering_helper(Arc::clone(children.swap_remove(0)))?;
         Ok((plan.with_new_children(vec![new_child])?, is_changed))
     } else {
         // Stop searching, there is no global ordering desired for the query.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to