alamb commented on code in PR #4122:
URL: https://github.com/apache/arrow-datafusion/pull/4122#discussion_r1017824178


##########
datafusion/core/src/execution/context.rs:
##########
@@ -1548,6 +1551,7 @@ impl SessionState {
             Arc::new(AggregateStatistics::new()),
             Arc::new(HashBuildProbeOrder::new()),
         ];
+        physical_optimizers.push(Arc::new(BasicEnforcement::new()));

Review Comment:
   Is there a reason that `BasicEnforcement` must be run twice?



##########
datafusion/physical-expr/src/equivalence.rs:
##########
@@ -195,15 +175,70 @@ impl EquivalentClass {
     }
 }
 
+/// Project Equivalence Properties.
+/// 1) Add Alias, Alias can introduce additional equivalence properties,

Review Comment:
   👍 



##########
datafusion/core/src/physical_optimizer/enforcement.rs:
##########
@@ -0,0 +1,2001 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Enforcement optimizer rules are used to make sure the plan's Distribution 
and Ordering
+//! requirements are met by inserting necessary [[RepartitionExec]] and 
[[SortExec]].
+//!
+use crate::error::Result;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, 
PhysicalGroupBy};
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::joins::{
+    CrossJoinExec, HashJoinExec, PartitionMode, SortMergeJoinExec,
+};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::windows::WindowAggExec;
+use crate::physical_plan::Partitioning;
+use crate::physical_plan::{with_new_children_if_necessary, Distribution, 
ExecutionPlan};
+use crate::prelude::SessionConfig;
+use datafusion_expr::logical_plan::JoinType;
+use datafusion_physical_expr::equivalence::EquivalenceProperties;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::expressions::NoOp;
+use datafusion_physical_expr::{
+    expr_list_eq_strict_order, normalize_expr_with_equivalence_properties,
+    normalize_sort_expr_with_equivalence_properties, PhysicalExpr, 
PhysicalSortExpr,
+};
+use std::collections::HashMap;
+use std::sync::Arc;
+
+/// BasicEnforcement rule, it ensures the Distribution and Ordering 
requirements are met
+/// in the strictest way. It might add additional [[RepartitionExec]] to the 
plan tree
+/// and give a non-optimal plan, but it can avoid the possible data skew in 
joins

Review Comment:
   I don't understand this comment about 'non optimal plan' -- I think the 
point of this pass to ensure that the plan gets the correct answer :) If the 
plan doesn't get the correct answer it can't be optimal
   



##########
datafusion/core/src/physical_plan/windows/window_agg_exec.rs:
##########
@@ -136,7 +136,7 @@ impl ExecutionPlan for WindowAggExec {
 
     fn required_input_distribution(&self) -> Vec<Distribution> {
         if self.partition_keys.is_empty() {
-            warn!("No partition defined for WindowAggExec!!!");
+            debug!("No partition defined for WindowAggExec!!!");

Review Comment:
   👍 



##########
datafusion/core/src/physical_plan/mod.rs:
##########
@@ -477,6 +473,66 @@ impl Partitioning {
             RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n,
         }
     }
+
+    /// Returns true when the guarantees made by this [[Partitioning]] are 
sufficient to
+    /// satisfy the partitioning scheme mandated by the `required` 
[[Distribution]]
+    pub fn satisfy<F: FnOnce() -> EquivalenceProperties>(
+        &self,
+        required: Distribution,
+        equal_properties: F,
+    ) -> bool {
+        match required {
+            Distribution::UnspecifiedDistribution => true,
+            Distribution::SinglePartition if self.partition_count() == 1 => 
true,
+            Distribution::HashPartitioned(required_exprs) => {
+                match self {
+                    // Here we do not check the partition count for hash 
partitioning and assumes the partition count
+                    // and hash functions in the system are the same. In 
future if we plan to support storage partition-wise joins,
+                    // then we need to have the partition count and hash 
functions validation.
+                    Partitioning::Hash(partition_exprs, _) => {
+                        let fast_match =
+                            expr_list_eq_strict_order(&required_exprs, 
partition_exprs);

Review Comment:
   You might be able to reduce the indent level of this code with a construct 
like
   ```rust
   if expr_list_eq_strict_order(&required_exprs, partition_exprs) {
     return true
   }
   ```



##########
datafusion/core/src/physical_plan/planner.rs:
##########
@@ -539,19 +539,6 @@ impl DefaultPhysicalPlanner {
                         vec![]
                     };
 
-                    let input_exec = if can_repartition {

Review Comment:
   it is nice to avoid adding repartitioning directly in the planner and 
instead add it as a follow on pass



##########
datafusion/core/src/physical_plan/rewrite.rs:
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Trait to make Executionplan rewritable
+
+use crate::physical_plan::with_new_children_if_necessary;
+use crate::physical_plan::ExecutionPlan;
+use datafusion_common::Result;
+
+use std::sync::Arc;
+
+/// a Trait for marking tree node types that are rewritable
+pub trait TreeNodeRewritable: Clone {

Review Comment:
   I am not sure the term `TreeNode` is used much in this codebase
   
   Perhaps we could name this struct `ExecutionPlanRewritable` or something 
that was more consistent with it being used for physical plans 
   
   We could do this perhaps as a follow on refactoring (and also make the 
TreeNodeRewriteable` in physical_expr as well)



##########
datafusion/core/src/physical_plan/mod.rs:
##########
@@ -477,6 +473,66 @@ impl Partitioning {
             RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n,
         }
     }
+
+    /// Returns true when the guarantees made by this [[Partitioning]] are 
sufficient to
+    /// satisfy the partitioning scheme mandated by the `required` 
[[Distribution]]
+    pub fn satisfy<F: FnOnce() -> EquivalenceProperties>(
+        &self,
+        required: Distribution,
+        equal_properties: F,
+    ) -> bool {
+        match required {
+            Distribution::UnspecifiedDistribution => true,
+            Distribution::SinglePartition if self.partition_count() == 1 => 
true,
+            Distribution::HashPartitioned(required_exprs) => {
+                match self {
+                    // Here we do not check the partition count for hash 
partitioning and assumes the partition count
+                    // and hash functions in the system are the same. In 
future if we plan to support storage partition-wise joins,
+                    // then we need to have the partition count and hash 
functions validation.
+                    Partitioning::Hash(partition_exprs, _) => {
+                        let fast_match =
+                            expr_list_eq_strict_order(&required_exprs, 
partition_exprs);
+                        // If the required exprs do not match, need to 
leverage the eq_properties provided by the child
+                        // and normalize both exprs based on the eq_properties
+                        if !fast_match {
+                            let eq_properties = equal_properties();
+                            let eq_classes = eq_properties.classes();
+                            if !eq_classes.is_empty() {
+                                let normalized_required_exprs = required_exprs
+                                    .iter()
+                                    .map(|e| {
+                                        
normalize_expr_with_equivalence_properties(
+                                            e.clone(),
+                                            eq_classes,
+                                        )
+                                    })
+                                    .collect::<Vec<_>>();
+                                let normalized_partition_exprs = 
partition_exprs
+                                    .iter()
+                                    .map(|e| {
+                                        
normalize_expr_with_equivalence_properties(
+                                            e.clone(),
+                                            eq_classes,
+                                        )
+                                    })
+                                    .collect::<Vec<_>>();
+                                expr_list_eq_strict_order(

Review Comment:
   👍  this is a nice formulation 



##########
datafusion/core/src/physical_optimizer/enforcement.rs:
##########
@@ -0,0 +1,2001 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Enforcement optimizer rules are used to make sure the plan's Distribution 
and Ordering
+//! requirements are met by inserting necessary [[RepartitionExec]] and 
[[SortExec]].
+//!
+use crate::error::Result;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, 
PhysicalGroupBy};
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::joins::{
+    CrossJoinExec, HashJoinExec, PartitionMode, SortMergeJoinExec,
+};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::windows::WindowAggExec;
+use crate::physical_plan::Partitioning;
+use crate::physical_plan::{with_new_children_if_necessary, Distribution, 
ExecutionPlan};
+use crate::prelude::SessionConfig;
+use datafusion_expr::logical_plan::JoinType;
+use datafusion_physical_expr::equivalence::EquivalenceProperties;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::expressions::NoOp;
+use datafusion_physical_expr::{
+    expr_list_eq_strict_order, normalize_expr_with_equivalence_properties,
+    normalize_sort_expr_with_equivalence_properties, PhysicalExpr, 
PhysicalSortExpr,
+};
+use std::collections::HashMap;
+use std::sync::Arc;
+
+/// BasicEnforcement rule, it ensures the Distribution and Ordering 
requirements are met
+/// in the strictest way. It might add additional [[RepartitionExec]] to the 
plan tree
+/// and give a non-optimal plan, but it can avoid the possible data skew in 
joins
+///
+/// For example for a HashJoin with keys(a, b, c), the required 
Distribution(a, b, c) can be satisfied by
+/// several alternative partitioning ways: [(a, b, c), (a, b), (a, c), (b, c), 
(a), (b), (c), ( )].
+///
+/// This rule only chooses the exactly match and satisfies the Distribution(a, 
b, c) by a HashPartition(a, b, c).
+#[derive(Default)]
+pub struct BasicEnforcement {}
+
+impl BasicEnforcement {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for BasicEnforcement {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        config: &SessionConfig,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let target_partitions = config.target_partitions;
+        let top_down_join_key_reordering = config.top_down_join_key_reordering;
+        let new_plan = if top_down_join_key_reordering {
+            // Run a top-down process to adjust input key ordering recursively
+            adjust_input_keys_down_recursively(plan, vec![])?
+        } else {
+            plan
+        };
+        // Distribution and Ordering enforcement need to be applied bottom-up.
+        new_plan.transform_up(&{
+            |plan| {
+                let adjusted = if !top_down_join_key_reordering {
+                    reorder_join_keys_to_inputs(plan)
+                } else {
+                    plan
+                };
+                Some(ensure_distribution_and_ordering(
+                    adjusted,
+                    target_partitions,
+                ))
+            }
+        })
+    }
+
+    fn name(&self) -> &str {
+        "BasicEnforcement"
+    }
+}
+
+/// When the physical planner creates the Joins, the ordering of join keys is 
from the original query.

Review Comment:
   Thank you for this comment -- can you explain a little more about why the 
ordering of join keys in the plan affects the output partitioning? Is it 
because the output partitioning of a join on `(a, b)` is always `hash(a, b)` 
which might not be the same as `hash(b, a)`? 



##########
datafusion/physical-expr/src/equivalence.rs:
##########
@@ -231,26 +266,42 @@ mod tests {
     }
 
     #[test]
-    fn merge_equivalence_properties_with_alias_test() -> Result<()> {
-        let mut eq_properties = EquivalenceProperties::new();
-        let mut alias_map = HashMap::new();
-        alias_map.insert(
-            Column::new("a", 0),
-            vec![Column::new("a1", 1), Column::new("a2", 2)],
-        );
+    fn project_equivalence_properties_test() -> Result<()> {
+        let input_schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int64, true),
+            Field::new("b", DataType::Int64, true),
+            Field::new("c", DataType::Int64, true),
+        ]));
 
-        eq_properties.merge_properties_with_alias(&alias_map);
-        assert_eq!(eq_properties.classes().len(), 1);
-        assert_eq!(eq_properties.classes()[0].len(), 3);
+        let mut input_properties = EquivalenceProperties::new(input_schema);
+        let new_condition = (&Column::new("a", 0), &Column::new("b", 1));
+        input_properties.add_equal_conditions(new_condition);
+        let new_condition = (&Column::new("b", 1), &Column::new("c", 2));
+        input_properties.add_equal_conditions(new_condition);
+
+        let out_schema = Arc::new(Schema::new(vec![
+            Field::new("a1", DataType::Int64, true),
+            Field::new("a2", DataType::Int64, true),
+            Field::new("a3", DataType::Int64, true),
+            Field::new("a4", DataType::Int64, true),
+        ]));
 
         let mut alias_map = HashMap::new();
         alias_map.insert(
             Column::new("a", 0),
-            vec![Column::new("a3", 1), Column::new("a4", 2)],
+            vec![
+                Column::new("a1", 0),
+                Column::new("a2", 1),
+                Column::new("a3", 2),
+                Column::new("a4", 3),
+            ],
         );
-        eq_properties.merge_properties_with_alias(&alias_map);
-        assert_eq!(eq_properties.classes().len(), 1);
-        assert_eq!(eq_properties.classes()[0].len(), 5);
+        let mut out_properties = EquivalenceProperties::new(out_schema);
+
+        project_equivalence_properties(input_properties, &alias_map, &mut 
out_properties);
+        assert_eq!(out_properties.classes().len(), 1);
+        assert_eq!(out_properties.classes()[0].len(), 4);

Review Comment:
   I suggest actually validating the contents of these equivalence classes 
somehow (rather than just their sizes)



##########
datafusion/core/src/physical_plan/mod.rs:
##########
@@ -477,6 +473,66 @@ impl Partitioning {
             RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n,
         }
     }
+
+    /// Returns true when the guarantees made by this [[Partitioning]] are 
sufficient to
+    /// satisfy the partitioning scheme mandated by the `required` 
[[Distribution]]
+    pub fn satisfy<F: FnOnce() -> EquivalenceProperties>(
+        &self,
+        required: Distribution,
+        equal_properties: F,
+    ) -> bool {
+        match required {
+            Distribution::UnspecifiedDistribution => true,

Review Comment:
   Perhaps @jackwener  was getting at adding additional testing could also help 
to document the intent of satisfy() as well as making the coverage was clear, 
even if they did not add additional coverage
   
   I would be interested in helping to write such tests as part of my review of 
this code (as it would help me gain a better understanding)
   
   



##########
datafusion/core/src/physical_plan/planner.rs:
##########
@@ -965,32 +934,10 @@ impl DefaultPhysicalPlanner {
                     if session_state.config.target_partitions > 1
                         && session_state.config.repartition_joins
                     {
-                        let (left_expr, right_expr) = join_on
-                            .iter()
-                            .map(|(l, r)| {
-                                (
-                                    Arc::new(l.clone()) as Arc<dyn 
PhysicalExpr>,
-                                    Arc::new(r.clone()) as Arc<dyn 
PhysicalExpr>,
-                                )
-                            })
-                            .unzip();
-
                         // Use hash partition by default to parallelize hash 
joins
                         Ok(Arc::new(HashJoinExec::try_new(
-                            Arc::new(RepartitionExec::try_new(

Review Comment:
   I agree 100%



##########
datafusion/core/src/execution/context.rs:
##########
@@ -1565,7 +1569,8 @@ impl SessionState {
             )));
         }
         physical_optimizers.push(Arc::new(Repartition::new()));
-        physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new()));
+        physical_optimizers.push(Arc::new(BasicEnforcement::new()));
+        // 
physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new()));

Review Comment:
   When this call is removed, there is no other code that calls 
`AddCoalescePartitionsExec` and thus I think we should remove that entire 
module. We could do so as a follow on PR if you want to keep this one smaller



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to