alamb commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012139283


##########
datafusion/core/src/dataframe.rs:
##########
@@ -1515,4 +1515,84 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn partition_aware_union() -> Result<()> {
+        let left = test_table().await?.select_columns(&["c1", "c2"])?;
+        let right = test_table_with_name("c2")
+            .await?
+            .select_columns(&["c1", "c3"])?
+            .with_column_renamed("c2.c1", "c2_c1")?;
+
+        let left_rows = left.collect().await?;
+        let right_rows = right.collect().await?;
+        let join1 =
+            left.join(right.clone(), JoinType::Inner, &["c1"], &["c2_c1"], 
None)?;
+        let join2 = left.join(right, JoinType::Inner, &["c1"], &["c2_c1"], 
None)?;
+
+        let union = join1.union(join2)?;
+
+        let union_rows = union.collect().await?;
+
+        assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::<usize>());
+        assert_eq!(100, right_rows.iter().map(|x| 
x.num_rows()).sum::<usize>());
+        assert_eq!(4016, union_rows.iter().map(|x| 
x.num_rows()).sum::<usize>());
+
+        let physical_plan = union.create_physical_plan().await?;
+        let default_partition_count =
+            SessionContext::new().copied_config().target_partitions;
+        assert_eq!(
+            physical_plan.output_partitioning().partition_count(),
+            default_partition_count
+        );
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn non_partition_aware_union() -> Result<()> {
+        let left = test_table().await?.select_columns(&["c1", "c2"])?;
+        let right = test_table_with_name("c2")
+            .await?
+            .select_columns(&["c1", "c2"])?
+            .with_column_renamed("c2.c1", "c2_c1")?
+            .with_column_renamed("c2.c2", "c2_c2")?;
+
+        let left_rows = left.collect().await?;
+        let right_rows = right.collect().await?;
+        let join1 = left.join(
+            right.clone(),
+            JoinType::Inner,
+            &["c1", "c2"],
+            &["c2_c1", "c2_c2"],
+            None,
+        )?;
+
+        // join key ordering is different

Review Comment:
   👍 



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, 
e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec 
of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", 
"b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn 
PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    new_condition: (&Column, &Column),
+) {
+    let mut idx1 = -1i32;
+    let mut idx2 = -1i32;

Review Comment:
   I think typically in rust such a sentinel is signaled using `Option
   
   So like
   
   ```rust
       let mut idx1: Option<usize> = None;
       let mut idx2: Option<usize> = None;
   ```



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -255,25 +276,58 @@ impl ExecutionPlan for AggregateExec {
 
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
-        self.input.output_partitioning()
+        match &self.mode {
+            AggregateMode::Partial => {
+                // Partial Aggregation will not change the output partitioning 
but need to respect the Alias
+                let input_partition = self.input.output_partitioning();
+                match input_partition {
+                    Partitioning::Hash(exprs, part) => {
+                        let normalized_exprs = exprs
+                            .into_iter()
+                            .map(|expr| {
+                                normalize_out_expr_with_alias_schema(
+                                    expr,
+                                    &self.alias_map,
+                                    &self.schema,
+                                )
+                            })
+                            .collect::<Vec<_>>();
+                        Partitioning::Hash(normalized_exprs, part)
+                    }
+                    _ => input_partition,
+                }
+            }
+            // Final Aggregation's output partitioning is the same as its real 
input
+            _ => self.input.output_partitioning(),
+        }
     }
 
+    // TODO check the output ordering of AggregateExec

Review Comment:
   // is it still TODO?



##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -136,6 +138,67 @@ impl PhysicalExprStats for BasicExpressionStats {
     }
 }
 
+#[derive(Debug, Clone)]
+pub struct EquivalenceProperties {

Review Comment:
   Having `EquivalenceProperties` would also provide a single location to add 
docstrings explaining the structures, and their assumptions and what they are 
good for



##########
datafusion/core/src/dataframe.rs:
##########
@@ -1515,4 +1515,84 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn partition_aware_union() -> Result<()> {
+        let left = test_table().await?.select_columns(&["c1", "c2"])?;
+        let right = test_table_with_name("c2")
+            .await?
+            .select_columns(&["c1", "c3"])?
+            .with_column_renamed("c2.c1", "c2_c1")?;
+
+        let left_rows = left.collect().await?;
+        let right_rows = right.collect().await?;
+        let join1 =

Review Comment:
   Could you possibly add some comments here about what this test is verifying? 
It seems like perhaps it is verifying that when the joins are on the same key 
the partitioning is  the same and thus union can be done without bringing 
everything to a single stream?



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, 
e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec 
of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", 
"b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn 
PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    new_condition: (&Column, &Column),
+) {
+    let mut idx1 = -1i32;
+    let mut idx2 = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(new_condition.0);
+        let contains_second = prop.contains(new_condition.1);
+        if contains_first && !contains_second {
+            prop.insert(new_condition.1.clone());
+            idx1 = idx as i32;
+        } else if !contains_first && contains_second {
+            prop.insert(new_condition.0.clone());
+            idx2 = idx as i32;
+        } else if contains_first && contains_second {
+            idx1 = idx as i32;
+            idx2 = idx as i32;
+            break;
+        }

Review Comment:
   You could also use a match statement here and let the compiler heck that all 
important cases are covered:
   
   ```suggestion
           match (contains_first, contains_second) {
               (true, false) => {
                 prop.insert(new_condition.1.clone());
                 idx1 = idx as i32;
               } 
               (false, true)=> {
                 prop.insert(new_condition.0.clone());
                 idx2 = idx as i32;
               }
               (true, true) =>  {
                 idx1 = idx as i32;
                 idx2 = idx as i32;
                 break;
               }
               (false, false) => {}
           }
   ```



##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -136,6 +138,67 @@ impl PhysicalExprStats for BasicExpressionStats {
     }
 }
 
+#[derive(Debug, Clone)]
+pub struct EquivalenceProperties {

Review Comment:
   I wonder if `EquivalenceClass` is a more specific name?
   
   Then you could make
   
   ```rust
   struct EquivalenceProperties {
     classes: Vec<EquivalentClass>
   }
   ```
   
   And move functions like `truncate_equivalence_properties_not_in_schema` on 
to 
   
   ```rust
   impl EquivalenceProperties {
     fn truncate_equivalence_properties_not_in_schema(&self, ..)
   }
   ```
   
   I don't think it is required, but it might keep the code easier to reason 
about / keep it behind an abstraction



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, 
e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec 
of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", 
"b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn 
PhysicalExpr>> {

Review Comment:
   This is called `split_conjunction` in the logical optimizer -- perhaps it 
could be called the same thing in the physical layer. The logical expr 
implementation also avoids creating quite as many `Vec`s
   
   
https://github.com/apache/arrow-datafusion/blob/345234550712173477e7807ba2cf67dd2ffb9ed5/datafusion/optimizer/src/utils.rs#L58-L78



##########
datafusion/core/src/physical_plan/joins/cross_join.rs:
##########
@@ -153,16 +156,27 @@ impl ExecutionPlan for CrossJoinExec {
         )?))
     }
 
+    // TODO optimize CrossJoin implementation to generate M * N partitions
     fn output_partitioning(&self) -> Partitioning {
-        self.right.output_partitioning()
+        let left_columns_len = self.left.schema().fields.len();
+        adjust_right_output_partitioning(
+            self.right.output_partitioning(),
+            left_columns_len,
+        )
     }
 
+    // TODO check the output ordering of CrossJoin

Review Comment:
   is this still a todo?



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -186,13 +194,26 @@ impl AggregateExec {
 
         let schema = Arc::new(schema);
 
+        let mut alias_map: HashMap<Column, Vec<Column>> = HashMap::new();

Review Comment:
   Can you explain what this code is for?  It doesn't seem correct to me as I 
don't understand the  circumstances under which the output of be different 🤔 
   
   It seems like in this case the input logical plan maybe was incorrect?



##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -136,6 +138,67 @@ impl PhysicalExprStats for BasicExpressionStats {
     }
 }
 
+#[derive(Debug, Clone)]

Review Comment:
   What would you think about moving this into 
`datafusion/physical-expr/src/equivalence.rs` or something? Then we could move 
all the code that deals with equivalence classes into that module and keep them 
and the tests together
   
   



##########
datafusion/core/src/physical_plan/filter.rs:
##########
@@ -231,6 +246,38 @@ impl RecordBatchStream for FilterExecStream {
     }
 }
 
+/// Return the equals Column-Pairs and Non-equals Column-Pairs
+fn collect_columns_from_predicate(predicate: &Arc<dyn PhysicalExpr>) -> 
EqualAndNonEqual {

Review Comment:
   Perhaps this would be better in utils.rs



##########
datafusion/core/src/physical_plan/coalesce_batches.rs:
##########
@@ -96,12 +96,15 @@ impl ExecutionPlan for CoalesceBatchesExec {
         self.input.output_partitioning()
     }
 
+    // Depends on how the CoalesceBatches was implemented, it is possible to 
keep

Review Comment:
   There is also `SortPreservingMerge` that can be used to preserve order but 
there are tradeoffs there (specifically it takes more effort to keep the sort 
order than it does to append batches together)



##########
datafusion/core/src/physical_plan/windows/window_agg_exec.rs:
##########
@@ -119,22 +129,25 @@ impl ExecutionPlan for WindowAggExec {
         true
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        true
+    fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+        let sort_keys = self.sort_keys.as_deref();
+        vec![sort_keys]
     }
 
-    fn required_child_distribution(&self) -> Distribution {
-        if self
-            .window_expr()
-            .iter()
-            .all(|expr| expr.partition_by().is_empty())
-        {
-            Distribution::SinglePartition
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        if self.partition_keys.is_empty() {
+            warn!("No partition defined for WindowAggExec!!!");

Review Comment:
   I don't know why this would generate a warning -- can't this occur with a 
query like `SELECT ROW_NUMBER OVER () from foo` (as in an empty over clause)?



##########
datafusion/core/src/physical_plan/windows/window_agg_exec.rs:
##########
@@ -119,22 +129,25 @@ impl ExecutionPlan for WindowAggExec {
         true
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        true
+    fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+        let sort_keys = self.sort_keys.as_deref();
+        vec![sort_keys]
     }
 
-    fn required_child_distribution(&self) -> Distribution {
-        if self
-            .window_expr()
-            .iter()
-            .all(|expr| expr.partition_by().is_empty())
-        {
-            Distribution::SinglePartition
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        if self.partition_keys.is_empty() {
+            warn!("No partition defined for WindowAggExec!!!");
+            vec![Distribution::SinglePartition]
         } else {
-            Distribution::UnspecifiedDistribution
+            //TODO support PartitionCollections if there is no common 
partition columns in the window_expr
+            vec![Distribution::HashPartitioned(self.partition_keys.clone())]

Review Comment:
   👍  I agree this sounds good



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, 
e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec 
of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", 
"b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn 
PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    new_condition: (&Column, &Column),
+) {
+    let mut idx1 = -1i32;
+    let mut idx2 = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(new_condition.0);
+        let contains_second = prop.contains(new_condition.1);
+        if contains_first && !contains_second {
+            prop.insert(new_condition.1.clone());
+            idx1 = idx as i32;
+        } else if !contains_first && contains_second {
+            prop.insert(new_condition.0.clone());
+            idx2 = idx as i32;
+        } else if contains_first && contains_second {
+            idx1 = idx as i32;
+            idx2 = idx as i32;
+            break;
+        }
+    }
+
+    if idx1 != -1 && idx2 != -1 && idx1 != idx2 {
+        // need to merge the two existing properties
+        let second_properties = eq_properties.get(idx2 as 
usize).unwrap().clone();
+        let first_properties = eq_properties.get_mut(idx1 as usize).unwrap();
+        for prop in second_properties.iter() {
+            if !first_properties.contains(prop) {
+                first_properties.insert(prop.clone());
+            }
+        }
+        eq_properties.remove(idx2 as usize);
+    } else if idx1 == -1 && idx2 == -1 {
+        // adding new pairs
+        eq_properties.push(EquivalenceProperties::new(
+            new_condition.0.clone(),
+            vec![new_condition.1.clone()],
+        ))
+    }
+}
+
+pub fn remove_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    remove_condition: (&Column, &Column),
+) {
+    let mut match_idx = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(remove_condition.0);
+        let contains_second = prop.contains(remove_condition.1);
+        if contains_first && contains_second {
+            match_idx = idx as i32;
+            break;
+        }
+    }
+    if match_idx >= 0 {
+        let matches = eq_properties.get_mut(match_idx as usize).unwrap();
+        matches.remove(remove_condition.0);
+        matches.remove(remove_condition.1);
+        if matches.len() <= 1 {
+            eq_properties.remove(match_idx as usize);
+        }
+    }
+}
+
+pub fn merge_equivalence_properties_with_alias(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    alias_map: &HashMap<Column, Vec<Column>>,
+) {
+    for (column, columns) in alias_map {
+        let mut find_match = false;
+        for (_idx, prop) in eq_properties.iter_mut().enumerate() {
+            if prop.contains(column) {
+                for col in columns {
+                    prop.insert(col.clone());
+                }
+                find_match = true;
+                break;
+            }
+        }
+        if !find_match {
+            eq_properties
+                .push(EquivalenceProperties::new(column.clone(), 
columns.clone()));
+        }
+    }
+}
+
+pub fn truncate_equivalence_properties_not_in_schema(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    schema: &SchemaRef,
+) {
+    for props in eq_properties.iter_mut() {
+        let mut columns_to_remove = vec![];
+        for column in props.iter() {
+            if let Ok(idx) = schema.index_of(column.name()) {
+                if idx != column.index() {
+                    columns_to_remove.push(column.clone());
+                }
+            } else {
+                columns_to_remove.push(column.clone());
+            }
+        }
+        for column in columns_to_remove {
+            props.remove(&column);
+        }
+    }
+    eq_properties.retain(|props| props.len() > 1);
+}
+
+/// Normalize the output expressions based on Alias Map and SchemaRef.
+///
+/// 1) If there is mapping in Alias Map, replace the Column in the output 
expressions with the 1st Column in Alias Map
+/// 2) If the Column is invalid for the current Schema, replace the Column 
with a place holder UnKnownColumn
+///
+pub fn normalize_out_expr_with_alias_schema(
+    expr: Arc<dyn PhysicalExpr>,
+    alias_map: &HashMap<Column, Vec<Column>>,
+    schema: &SchemaRef,
+) -> Arc<dyn PhysicalExpr> {
+    let expr_clone = expr.clone();
+    expr_clone
+        .transform(&|expr| {
+            let normalized_form: Option<Arc<dyn PhysicalExpr>> =
+                match expr.as_any().downcast_ref::<Column>() {
+                    Some(column) => {
+                        let out = alias_map
+                            .get(column)
+                            .map(|c| {
+                                let out_col: Arc<dyn PhysicalExpr> =
+                                    Arc::new(c[0].clone());
+                                out_col
+                            })
+                            .or_else(|| match schema.index_of(column.name()) {
+                                // Exactly matching, return None, no need to 
do the transform
+                                Ok(idx) if column.index() == idx => None,
+                                _ => {
+                                    let out_col: Arc<dyn PhysicalExpr> =
+                                        
Arc::new(UnKnownColumn::new(column.name()));
+                                    Some(out_col)
+                                }
+                            });
+                        out
+                    }
+                    None => None,
+                };
+            normalized_form
+        })
+        .unwrap_or(expr)
+}
+
+pub fn normalize_expr_with_equivalence_properties(
+    expr: Arc<dyn PhysicalExpr>,
+    eq_properties: &[EquivalenceProperties],
+) -> Arc<dyn PhysicalExpr> {
+    let mut normalized = expr.clone();
+    if let Some(column) = expr.as_any().downcast_ref::<Column>() {

Review Comment:
   Does this need to recursively rewrite exprs?
   
   Like what if `expr` was `A + B` and you had an equivalence class with `B = C`
   
   Wouldn't you have to rewrite `A + ` into `A + C`? But I don't see this code 
recursing.
   
   This kind of rewrite could be tested as well I think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to