This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 81d06f2e10 Move OutputRequirements to datafusion-physical-optimizer
crate (#11579)
81d06f2e10 is described below
commit 81d06f2e103385fe744fb909563d4fb4c4b13d49
Author: Xin Li <[email protected]>
AuthorDate: Mon Jul 22 05:37:55 2024 -0700
Move OutputRequirements to datafusion-physical-optimizer crate (#11579)
* Move OutputRequirements to datafusion-physical-optimizer crate
* Fix fmt
* Fix cargo for cli
---
datafusion-cli/Cargo.lock | 10 +++++----
.../src/physical_optimizer/enforce_distribution.rs | 4 ++--
datafusion/core/src/physical_optimizer/mod.rs | 1 -
datafusion/physical-optimizer/Cargo.toml | 2 ++
datafusion/physical-optimizer/src/lib.rs | 1 +
.../src}/output_requirements.rs | 24 +++++++++++++---------
6 files changed, 25 insertions(+), 17 deletions(-)
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 61d9c72b89..84bff8c871 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -118,9 +118,9 @@ dependencies = [
[[package]]
name = "arrayref"
-version = "0.3.7"
+version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b4930d2cb77ce62f89ee5d5289b4ac049559b1c45539271f5ed4fdc7db34545"
+checksum = "9d151e35f61089500b617991b791fc8bfd237ae50cd5950803758a179b41e67a"
[[package]]
name = "arrayvec"
@@ -875,9 +875,9 @@ dependencies = [
[[package]]
name = "cc"
-version = "1.1.5"
+version = "1.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "324c74f2155653c90b04f25b2a47a8a631360cb908f92a772695f430c7e31052"
+checksum = "2aba8f4e9906c7ce3c73463f62a7f0c65183ada1a2d47e397cc8810827f9694f"
dependencies = [
"jobserver",
"libc",
@@ -1397,6 +1397,8 @@ name = "datafusion-physical-optimizer"
version = "40.0.0"
dependencies = [
"datafusion-common",
+ "datafusion-execution",
+ "datafusion-physical-expr",
"datafusion-physical-plan",
]
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 9791f23f96..62ac9089e2 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -24,7 +24,6 @@
use std::fmt::Debug;
use std::sync::Arc;
-use super::output_requirements::OutputRequirementExec;
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::utils::{
@@ -55,6 +54,7 @@ use datafusion_physical_expr::{
use datafusion_physical_plan::windows::{get_best_fitting_window,
BoundedWindowAggExec};
use datafusion_physical_plan::ExecutionPlanProperties;
+use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use itertools::izip;
@@ -1290,7 +1290,6 @@ pub(crate) mod tests {
use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::physical_plan::{CsvExec, FileScanConfig,
ParquetExec};
use crate::physical_optimizer::enforce_sorting::EnforceSorting;
- use crate::physical_optimizer::output_requirements::OutputRequirements;
use crate::physical_optimizer::test_utils::{
check_integrity, coalesce_partitions_exec, repartition_exec,
};
@@ -1301,6 +1300,7 @@ pub(crate) mod tests {
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::{displayable, DisplayAs, DisplayFormatType,
Statistics};
+ use datafusion_physical_optimizer::output_requirements::OutputRequirements;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::ScalarValue;
diff --git a/datafusion/core/src/physical_optimizer/mod.rs
b/datafusion/core/src/physical_optimizer/mod.rs
index 582f340151..a0c9c36977 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -29,7 +29,6 @@ pub mod enforce_sorting;
pub mod join_selection;
pub mod limited_distinct_aggregation;
pub mod optimizer;
-pub mod output_requirements;
pub mod projection_pushdown;
pub mod pruning;
pub mod replace_with_order_preserving_variants;
diff --git a/datafusion/physical-optimizer/Cargo.toml
b/datafusion/physical-optimizer/Cargo.toml
index 9c0ee61da5..125ea6acc7 100644
--- a/datafusion/physical-optimizer/Cargo.toml
+++ b/datafusion/physical-optimizer/Cargo.toml
@@ -33,4 +33,6 @@ workspace = true
[dependencies]
datafusion-common = { workspace = true, default-features = true }
+datafusion-execution = { workspace = true }
+datafusion-physical-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
diff --git a/datafusion/physical-optimizer/src/lib.rs
b/datafusion/physical-optimizer/src/lib.rs
index c5a49216f5..6b9df7cad5 100644
--- a/datafusion/physical-optimizer/src/lib.rs
+++ b/datafusion/physical-optimizer/src/lib.rs
@@ -18,5 +18,6 @@
#![deny(clippy::clone_on_ref_ptr)]
mod optimizer;
+pub mod output_requirements;
pub use optimizer::PhysicalOptimizerRule;
diff --git a/datafusion/core/src/physical_optimizer/output_requirements.rs
b/datafusion/physical-optimizer/src/output_requirements.rs
similarity index 94%
rename from datafusion/core/src/physical_optimizer/output_requirements.rs
rename to datafusion/physical-optimizer/src/output_requirements.rs
index cb9a0cb90e..f971d8f1f0 100644
--- a/datafusion/core/src/physical_optimizer/output_requirements.rs
+++ b/datafusion/physical-optimizer/src/output_requirements.rs
@@ -24,17 +24,21 @@
use std::sync::Arc;
-use crate::physical_plan::sorts::sort::SortExec;
-use crate::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
+use datafusion_execution::TaskContext;
+use datafusion_physical_plan::sorts::sort::SortExec;
+use datafusion_physical_plan::{
+ DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream,
+};
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Result, Statistics};
use datafusion_physical_expr::{Distribution, LexRequirement,
PhysicalSortRequirement};
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
use
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::{ExecutionPlanProperties, PlanProperties};
+use crate::PhysicalOptimizerRule;
+
/// This rule either adds or removes [`OutputRequirements`]s to/from the
physical
/// plan according to its `mode` attribute, which is set by the constructors
/// `new_add_mode` and `new_remove_mode`. With this rule, we can keep track of
@@ -86,7 +90,7 @@ enum RuleMode {
///
/// See [`OutputRequirements`] for more details
#[derive(Debug)]
-pub(crate) struct OutputRequirementExec {
+pub struct OutputRequirementExec {
input: Arc<dyn ExecutionPlan>,
order_requirement: Option<LexRequirement>,
dist_requirement: Distribution,
@@ -94,7 +98,7 @@ pub(crate) struct OutputRequirementExec {
}
impl OutputRequirementExec {
- pub(crate) fn new(
+ pub fn new(
input: Arc<dyn ExecutionPlan>,
requirements: Option<LexRequirement>,
dist_requirement: Distribution,
@@ -108,8 +112,8 @@ impl OutputRequirementExec {
}
}
- pub(crate) fn input(&self) -> Arc<dyn ExecutionPlan> {
- self.input.clone()
+ pub fn input(&self) -> Arc<dyn ExecutionPlan> {
+ Arc::clone(&self.input)
}
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
@@ -179,8 +183,8 @@ impl ExecutionPlan for OutputRequirementExec {
fn execute(
&self,
_partition: usize,
- _context: Arc<crate::execution::context::TaskContext>,
- ) -> Result<crate::physical_plan::SendableRecordBatchStream> {
+ _context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
unreachable!();
}
@@ -275,7 +279,7 @@ fn require_top_ordering_helper(
// When an operator requires an ordering, any `SortExec` below can not
// be responsible for (i.e. the originator of) the global ordering.
let (new_child, is_changed) =
- require_top_ordering_helper(children.swap_remove(0).clone())?;
+ require_top_ordering_helper(Arc::clone(children.swap_remove(0)))?;
Ok((plan.with_new_children(vec![new_child])?, is_changed))
} else {
// Stop searching, there is no global ordering desired for the query.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]