alamb commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012139283
##########
datafusion/core/src/dataframe.rs:
##########
@@ -1515,4 +1515,84 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn partition_aware_union() -> Result<()> {
+ let left = test_table().await?.select_columns(&["c1", "c2"])?;
+ let right = test_table_with_name("c2")
+ .await?
+ .select_columns(&["c1", "c3"])?
+ .with_column_renamed("c2.c1", "c2_c1")?;
+
+ let left_rows = left.collect().await?;
+ let right_rows = right.collect().await?;
+ let join1 =
+ left.join(right.clone(), JoinType::Inner, &["c1"], &["c2_c1"],
None)?;
+ let join2 = left.join(right, JoinType::Inner, &["c1"], &["c2_c1"],
None)?;
+
+ let union = join1.union(join2)?;
+
+ let union_rows = union.collect().await?;
+
+ assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::<usize>());
+ assert_eq!(100, right_rows.iter().map(|x|
x.num_rows()).sum::<usize>());
+ assert_eq!(4016, union_rows.iter().map(|x|
x.num_rows()).sum::<usize>());
+
+ let physical_plan = union.create_physical_plan().await?;
+ let default_partition_count =
+ SessionContext::new().copied_config().target_partitions;
+ assert_eq!(
+ physical_plan.output_partitioning().partition_count(),
+ default_partition_count
+ );
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn non_partition_aware_union() -> Result<()> {
+ let left = test_table().await?.select_columns(&["c1", "c2"])?;
+ let right = test_table_with_name("c2")
+ .await?
+ .select_columns(&["c1", "c2"])?
+ .with_column_renamed("c2.c1", "c2_c1")?
+ .with_column_renamed("c2.c2", "c2_c2")?;
+
+ let left_rows = left.collect().await?;
+ let right_rows = right.collect().await?;
+ let join1 = left.join(
+ right.clone(),
+ JoinType::Inner,
+ &["c1", "c2"],
+ &["c2_c1", "c2_c2"],
+ None,
+ )?;
+
+ // join key ordering is different
Review Comment:
👍
##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1,
e2)| e1.eq(e2))
}
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec
of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2",
"b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn
PhysicalExpr>> {
+ match predicate.as_any().downcast_ref::<BinaryExpr>() {
+ Some(binary) => match binary.op() {
+ Operator::And => {
+ let mut vec1 = split_predicate(binary.left());
+ let vec2 = split_predicate(binary.right());
+ vec1.extend(vec2);
+ vec1
+ }
+ _ => vec![predicate],
+ },
+ None => vec![],
+ }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+ eq_properties: &mut Vec<EquivalenceProperties>,
+ new_condition: (&Column, &Column),
+) {
+ let mut idx1 = -1i32;
+ let mut idx2 = -1i32;
Review Comment:
I think typically in rust such a sentinel is signaled using `Option
So like
```rust
let mut idx1: Option<usize> = None;
let mut idx2: Option<usize> = None;
```
##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -255,25 +276,58 @@ impl ExecutionPlan for AggregateExec {
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
- self.input.output_partitioning()
+ match &self.mode {
+ AggregateMode::Partial => {
+ // Partial Aggregation will not change the output partitioning
but need to respect the Alias
+ let input_partition = self.input.output_partitioning();
+ match input_partition {
+ Partitioning::Hash(exprs, part) => {
+ let normalized_exprs = exprs
+ .into_iter()
+ .map(|expr| {
+ normalize_out_expr_with_alias_schema(
+ expr,
+ &self.alias_map,
+ &self.schema,
+ )
+ })
+ .collect::<Vec<_>>();
+ Partitioning::Hash(normalized_exprs, part)
+ }
+ _ => input_partition,
+ }
+ }
+ // Final Aggregation's output partitioning is the same as its real
input
+ _ => self.input.output_partitioning(),
+ }
}
+ // TODO check the output ordering of AggregateExec
Review Comment:
// is it still TODO?
##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -136,6 +138,67 @@ impl PhysicalExprStats for BasicExpressionStats {
}
}
+#[derive(Debug, Clone)]
+pub struct EquivalenceProperties {
Review Comment:
Having `EquivalenceProperties` would also provide a single location to add
docstrings explaining the structures, and their assumptions and what they are
good for
##########
datafusion/core/src/dataframe.rs:
##########
@@ -1515,4 +1515,84 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn partition_aware_union() -> Result<()> {
+ let left = test_table().await?.select_columns(&["c1", "c2"])?;
+ let right = test_table_with_name("c2")
+ .await?
+ .select_columns(&["c1", "c3"])?
+ .with_column_renamed("c2.c1", "c2_c1")?;
+
+ let left_rows = left.collect().await?;
+ let right_rows = right.collect().await?;
+ let join1 =
Review Comment:
Could you possibly add some comments here about what this test is verifying?
It seems like perhaps it is verifying that when the joins are on the same key
the partitioning is the same and thus union can be done without bringing
everything to a single stream?
##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1,
e2)| e1.eq(e2))
}
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec
of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2",
"b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn
PhysicalExpr>> {
+ match predicate.as_any().downcast_ref::<BinaryExpr>() {
+ Some(binary) => match binary.op() {
+ Operator::And => {
+ let mut vec1 = split_predicate(binary.left());
+ let vec2 = split_predicate(binary.right());
+ vec1.extend(vec2);
+ vec1
+ }
+ _ => vec![predicate],
+ },
+ None => vec![],
+ }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+ eq_properties: &mut Vec<EquivalenceProperties>,
+ new_condition: (&Column, &Column),
+) {
+ let mut idx1 = -1i32;
+ let mut idx2 = -1i32;
+ for (idx, prop) in eq_properties.iter_mut().enumerate() {
+ let contains_first = prop.contains(new_condition.0);
+ let contains_second = prop.contains(new_condition.1);
+ if contains_first && !contains_second {
+ prop.insert(new_condition.1.clone());
+ idx1 = idx as i32;
+ } else if !contains_first && contains_second {
+ prop.insert(new_condition.0.clone());
+ idx2 = idx as i32;
+ } else if contains_first && contains_second {
+ idx1 = idx as i32;
+ idx2 = idx as i32;
+ break;
+ }
Review Comment:
You could also use a match statement here and let the compiler heck that all
important cases are covered:
```suggestion
match (contains_first, contains_second) {
(true, false) => {
prop.insert(new_condition.1.clone());
idx1 = idx as i32;
}
(false, true)=> {
prop.insert(new_condition.0.clone());
idx2 = idx as i32;
}
(true, true) => {
idx1 = idx as i32;
idx2 = idx as i32;
break;
}
(false, false) => {}
}
```
##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -136,6 +138,67 @@ impl PhysicalExprStats for BasicExpressionStats {
}
}
+#[derive(Debug, Clone)]
+pub struct EquivalenceProperties {
Review Comment:
I wonder if `EquivalenceClass` is a more specific name?
Then you could make
```rust
struct EquivalenceProperties {
classes: Vec<EquivalentClass>
}
```
And move functions like `truncate_equivalence_properties_not_in_schema` on
to
```rust
impl EquivalenceProperties {
fn truncate_equivalence_properties_not_in_schema(&self, ..)
}
```
I don't think it is required, but it might keep the code easier to reason
about / keep it behind an abstraction
##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1,
e2)| e1.eq(e2))
}
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec
of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2",
"b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn
PhysicalExpr>> {
Review Comment:
This is called `split_conjunction` in the logical optimizer -- perhaps it
could be called the same thing in the physical layer. The logical expr
implementation also avoids creating quite as many `Vec`s
https://github.com/apache/arrow-datafusion/blob/345234550712173477e7807ba2cf67dd2ffb9ed5/datafusion/optimizer/src/utils.rs#L58-L78
##########
datafusion/core/src/physical_plan/joins/cross_join.rs:
##########
@@ -153,16 +156,27 @@ impl ExecutionPlan for CrossJoinExec {
)?))
}
+ // TODO optimize CrossJoin implementation to generate M * N partitions
fn output_partitioning(&self) -> Partitioning {
- self.right.output_partitioning()
+ let left_columns_len = self.left.schema().fields.len();
+ adjust_right_output_partitioning(
+ self.right.output_partitioning(),
+ left_columns_len,
+ )
}
+ // TODO check the output ordering of CrossJoin
Review Comment:
is this still a todo?
##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -186,13 +194,26 @@ impl AggregateExec {
let schema = Arc::new(schema);
+ let mut alias_map: HashMap<Column, Vec<Column>> = HashMap::new();
Review Comment:
Can you explain what this code is for? It doesn't seem correct to me as I
don't understand the circumstances under which the output of be different 🤔
It seems like in this case the input logical plan maybe was incorrect?
##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -136,6 +138,67 @@ impl PhysicalExprStats for BasicExpressionStats {
}
}
+#[derive(Debug, Clone)]
Review Comment:
What would you think about moving this into
`datafusion/physical-expr/src/equivalence.rs` or something? Then we could move
all the code that deals with equivalence classes into that module and keep them
and the tests together
##########
datafusion/core/src/physical_plan/filter.rs:
##########
@@ -231,6 +246,38 @@ impl RecordBatchStream for FilterExecStream {
}
}
+/// Return the equals Column-Pairs and Non-equals Column-Pairs
+fn collect_columns_from_predicate(predicate: &Arc<dyn PhysicalExpr>) ->
EqualAndNonEqual {
Review Comment:
Perhaps this would be better in utils.rs
##########
datafusion/core/src/physical_plan/coalesce_batches.rs:
##########
@@ -96,12 +96,15 @@ impl ExecutionPlan for CoalesceBatchesExec {
self.input.output_partitioning()
}
+ // Depends on how the CoalesceBatches was implemented, it is possible to
keep
Review Comment:
There is also `SortPreservingMerge` that can be used to preserve order but
there are tradeoffs there (specifically it takes more effort to keep the sort
order than it does to append batches together)
##########
datafusion/core/src/physical_plan/windows/window_agg_exec.rs:
##########
@@ -119,22 +129,25 @@ impl ExecutionPlan for WindowAggExec {
true
}
- fn relies_on_input_order(&self) -> bool {
- true
+ fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+ let sort_keys = self.sort_keys.as_deref();
+ vec![sort_keys]
}
- fn required_child_distribution(&self) -> Distribution {
- if self
- .window_expr()
- .iter()
- .all(|expr| expr.partition_by().is_empty())
- {
- Distribution::SinglePartition
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ if self.partition_keys.is_empty() {
+ warn!("No partition defined for WindowAggExec!!!");
Review Comment:
I don't know why this would generate a warning -- can't this occur with a
query like `SELECT ROW_NUMBER OVER () from foo` (as in an empty over clause)?
##########
datafusion/core/src/physical_plan/windows/window_agg_exec.rs:
##########
@@ -119,22 +129,25 @@ impl ExecutionPlan for WindowAggExec {
true
}
- fn relies_on_input_order(&self) -> bool {
- true
+ fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+ let sort_keys = self.sort_keys.as_deref();
+ vec![sort_keys]
}
- fn required_child_distribution(&self) -> Distribution {
- if self
- .window_expr()
- .iter()
- .all(|expr| expr.partition_by().is_empty())
- {
- Distribution::SinglePartition
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ if self.partition_keys.is_empty() {
+ warn!("No partition defined for WindowAggExec!!!");
+ vec![Distribution::SinglePartition]
} else {
- Distribution::UnspecifiedDistribution
+ //TODO support PartitionCollections if there is no common
partition columns in the window_expr
+ vec![Distribution::HashPartitioned(self.partition_keys.clone())]
Review Comment:
👍 I agree this sounds good
##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1,
e2)| e1.eq(e2))
}
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec
of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2",
"b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn
PhysicalExpr>> {
+ match predicate.as_any().downcast_ref::<BinaryExpr>() {
+ Some(binary) => match binary.op() {
+ Operator::And => {
+ let mut vec1 = split_predicate(binary.left());
+ let vec2 = split_predicate(binary.right());
+ vec1.extend(vec2);
+ vec1
+ }
+ _ => vec![predicate],
+ },
+ None => vec![],
+ }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+ eq_properties: &mut Vec<EquivalenceProperties>,
+ new_condition: (&Column, &Column),
+) {
+ let mut idx1 = -1i32;
+ let mut idx2 = -1i32;
+ for (idx, prop) in eq_properties.iter_mut().enumerate() {
+ let contains_first = prop.contains(new_condition.0);
+ let contains_second = prop.contains(new_condition.1);
+ if contains_first && !contains_second {
+ prop.insert(new_condition.1.clone());
+ idx1 = idx as i32;
+ } else if !contains_first && contains_second {
+ prop.insert(new_condition.0.clone());
+ idx2 = idx as i32;
+ } else if contains_first && contains_second {
+ idx1 = idx as i32;
+ idx2 = idx as i32;
+ break;
+ }
+ }
+
+ if idx1 != -1 && idx2 != -1 && idx1 != idx2 {
+ // need to merge the two existing properties
+ let second_properties = eq_properties.get(idx2 as
usize).unwrap().clone();
+ let first_properties = eq_properties.get_mut(idx1 as usize).unwrap();
+ for prop in second_properties.iter() {
+ if !first_properties.contains(prop) {
+ first_properties.insert(prop.clone());
+ }
+ }
+ eq_properties.remove(idx2 as usize);
+ } else if idx1 == -1 && idx2 == -1 {
+ // adding new pairs
+ eq_properties.push(EquivalenceProperties::new(
+ new_condition.0.clone(),
+ vec![new_condition.1.clone()],
+ ))
+ }
+}
+
+pub fn remove_equivalence_properties(
+ eq_properties: &mut Vec<EquivalenceProperties>,
+ remove_condition: (&Column, &Column),
+) {
+ let mut match_idx = -1i32;
+ for (idx, prop) in eq_properties.iter_mut().enumerate() {
+ let contains_first = prop.contains(remove_condition.0);
+ let contains_second = prop.contains(remove_condition.1);
+ if contains_first && contains_second {
+ match_idx = idx as i32;
+ break;
+ }
+ }
+ if match_idx >= 0 {
+ let matches = eq_properties.get_mut(match_idx as usize).unwrap();
+ matches.remove(remove_condition.0);
+ matches.remove(remove_condition.1);
+ if matches.len() <= 1 {
+ eq_properties.remove(match_idx as usize);
+ }
+ }
+}
+
+pub fn merge_equivalence_properties_with_alias(
+ eq_properties: &mut Vec<EquivalenceProperties>,
+ alias_map: &HashMap<Column, Vec<Column>>,
+) {
+ for (column, columns) in alias_map {
+ let mut find_match = false;
+ for (_idx, prop) in eq_properties.iter_mut().enumerate() {
+ if prop.contains(column) {
+ for col in columns {
+ prop.insert(col.clone());
+ }
+ find_match = true;
+ break;
+ }
+ }
+ if !find_match {
+ eq_properties
+ .push(EquivalenceProperties::new(column.clone(),
columns.clone()));
+ }
+ }
+}
+
+pub fn truncate_equivalence_properties_not_in_schema(
+ eq_properties: &mut Vec<EquivalenceProperties>,
+ schema: &SchemaRef,
+) {
+ for props in eq_properties.iter_mut() {
+ let mut columns_to_remove = vec![];
+ for column in props.iter() {
+ if let Ok(idx) = schema.index_of(column.name()) {
+ if idx != column.index() {
+ columns_to_remove.push(column.clone());
+ }
+ } else {
+ columns_to_remove.push(column.clone());
+ }
+ }
+ for column in columns_to_remove {
+ props.remove(&column);
+ }
+ }
+ eq_properties.retain(|props| props.len() > 1);
+}
+
+/// Normalize the output expressions based on Alias Map and SchemaRef.
+///
+/// 1) If there is mapping in Alias Map, replace the Column in the output
expressions with the 1st Column in Alias Map
+/// 2) If the Column is invalid for the current Schema, replace the Column
with a place holder UnKnownColumn
+///
+pub fn normalize_out_expr_with_alias_schema(
+ expr: Arc<dyn PhysicalExpr>,
+ alias_map: &HashMap<Column, Vec<Column>>,
+ schema: &SchemaRef,
+) -> Arc<dyn PhysicalExpr> {
+ let expr_clone = expr.clone();
+ expr_clone
+ .transform(&|expr| {
+ let normalized_form: Option<Arc<dyn PhysicalExpr>> =
+ match expr.as_any().downcast_ref::<Column>() {
+ Some(column) => {
+ let out = alias_map
+ .get(column)
+ .map(|c| {
+ let out_col: Arc<dyn PhysicalExpr> =
+ Arc::new(c[0].clone());
+ out_col
+ })
+ .or_else(|| match schema.index_of(column.name()) {
+ // Exactly matching, return None, no need to
do the transform
+ Ok(idx) if column.index() == idx => None,
+ _ => {
+ let out_col: Arc<dyn PhysicalExpr> =
+
Arc::new(UnKnownColumn::new(column.name()));
+ Some(out_col)
+ }
+ });
+ out
+ }
+ None => None,
+ };
+ normalized_form
+ })
+ .unwrap_or(expr)
+}
+
+pub fn normalize_expr_with_equivalence_properties(
+ expr: Arc<dyn PhysicalExpr>,
+ eq_properties: &[EquivalenceProperties],
+) -> Arc<dyn PhysicalExpr> {
+ let mut normalized = expr.clone();
+ if let Some(column) = expr.as_any().downcast_ref::<Column>() {
Review Comment:
Does this need to recursively rewrite exprs?
Like what if `expr` was `A + B` and you had an equivalence class with `B = C`
Wouldn't you have to rewrite `A + ` into `A + C`? But I don't see this code
recursing.
This kind of rewrite could be tested as well I think
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]