This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7aad6c7f45 Fix column mapping in output_ordering() and 
output_partitioning() for ProjectionExec and AggregateExec (#6113)
7aad6c7f45 is described below

commit 7aad6c7f45657f6594552e0244aa141a76d7448d
Author: mingmwang <[email protected]>
AuthorDate: Tue Apr 25 22:45:55 2023 +0800

    Fix column mapping in output_ordering() and output_partitioning() for 
ProjectionExec and AggregateExec (#6113)
    
    * Fix column mapping in output_ordering() and output_partitioning() for 
ProjectionExec and AggregateExec
    
    * tiny
    
    * remove issue link
---
 .../core/src/physical_plan/aggregates/mod.rs       | 25 ++++++++---------
 datafusion/core/src/physical_plan/projection.rs    | 32 +++++++++-------------
 .../tests/sqllogictests/test_files/groupby.slt     |  7 -----
 datafusion/physical-expr/src/lib.rs                |  2 +-
 datafusion/physical-expr/src/utils.rs              | 25 ++++++-----------
 5 files changed, 34 insertions(+), 57 deletions(-)

diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs 
b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 3ec9a6d712..57525b387f 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -50,7 +50,7 @@ pub use datafusion_expr::AggregateFunction;
 use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator;
 use datafusion_physical_expr::equivalence::project_equivalence_properties;
 pub use datafusion_physical_expr::expressions::create_aggregate_expr;
-use datafusion_physical_expr::normalize_out_expr_with_alias_schema;
+use datafusion_physical_expr::normalize_out_expr_with_columns_map;
 
 /// Hash aggregate modes
 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
@@ -203,9 +203,9 @@ pub struct AggregateExec {
     /// same as input.schema() but for the final aggregate it will be the same 
as the input
     /// to the partial aggregate
     pub(crate) input_schema: SchemaRef,
-    /// The alias map used to normalize out expressions like Partitioning and 
PhysicalSortExpr
+    /// The columns map used to normalize out expressions like Partitioning 
and PhysicalSortExpr
     /// The key is the column from the input schema and the values are the 
columns from the output schema
-    alias_map: HashMap<Column, Vec<Column>>,
+    columns_map: HashMap<Column, Vec<Column>>,
     /// Execution Metrics
     metrics: ExecutionPlanMetricsSet,
 }
@@ -230,15 +230,13 @@ impl AggregateExec {
 
         let schema = Arc::new(schema);
 
-        let mut alias_map: HashMap<Column, Vec<Column>> = HashMap::new();
+        // construct a map from the input columns to the output columns of the 
Aggregation
+        let mut columns_map: HashMap<Column, Vec<Column>> = HashMap::new();
         for (expression, name) in group_by.expr.iter() {
             if let Some(column) = expression.as_any().downcast_ref::<Column>() 
{
                 let new_col_idx = schema.index_of(name)?;
-                // When the column name is the same, but index does not equal, 
treat it as Alias
-                if (column.name() != name) || (column.index() != new_col_idx) {
-                    let entry = 
alias_map.entry(column.clone()).or_insert_with(Vec::new);
-                    entry.push(Column::new(name, new_col_idx));
-                }
+                let entry = 
columns_map.entry(column.clone()).or_insert_with(Vec::new);
+                entry.push(Column::new(name, new_col_idx));
             };
         }
 
@@ -250,7 +248,7 @@ impl AggregateExec {
             input,
             schema,
             input_schema,
-            alias_map,
+            columns_map,
             metrics: ExecutionPlanMetricsSet::new(),
         })
     }
@@ -358,10 +356,9 @@ impl ExecutionPlan for AggregateExec {
                         let normalized_exprs = exprs
                             .into_iter()
                             .map(|expr| {
-                                normalize_out_expr_with_alias_schema(
+                                normalize_out_expr_with_columns_map(
                                     expr,
-                                    &self.alias_map,
-                                    &self.schema,
+                                    &self.columns_map,
                                 )
                             })
                             .collect::<Vec<_>>();
@@ -408,7 +405,7 @@ impl ExecutionPlan for AggregateExec {
         let mut new_properties = EquivalenceProperties::new(self.schema());
         project_equivalence_properties(
             self.input.equivalence_properties(),
-            &self.alias_map,
+            &self.columns_map,
             &mut new_properties,
         );
         new_properties
diff --git a/datafusion/core/src/physical_plan/projection.rs 
b/datafusion/core/src/physical_plan/projection.rs
index 95d33fbcd3..799de0c191 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -40,7 +40,7 @@ use super::metrics::{BaselineMetrics, 
ExecutionPlanMetricsSet, MetricsSet};
 use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
 use crate::execution::context::TaskContext;
 use datafusion_physical_expr::equivalence::project_equivalence_properties;
-use datafusion_physical_expr::normalize_out_expr_with_alias_schema;
+use datafusion_physical_expr::normalize_out_expr_with_columns_map;
 use futures::stream::Stream;
 use futures::stream::StreamExt;
 
@@ -55,9 +55,9 @@ pub struct ProjectionExec {
     input: Arc<dyn ExecutionPlan>,
     /// The output ordering
     output_ordering: Option<Vec<PhysicalSortExpr>>,
-    /// The alias map used to normalize out expressions like Partitioning and 
PhysicalSortExpr
+    /// The columns map used to normalize out expressions like Partitioning 
and PhysicalSortExpr
     /// The key is the column from the input schema and the values are the 
columns from the output schema
-    alias_map: HashMap<Column, Vec<Column>>,
+    columns_map: HashMap<Column, Vec<Column>>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
 }
@@ -91,15 +91,13 @@ impl ProjectionExec {
             input_schema.metadata().clone(),
         ));
 
-        let mut alias_map: HashMap<Column, Vec<Column>> = HashMap::new();
+        // construct a map from the input columns to the output columns of the 
Projection
+        let mut columns_map: HashMap<Column, Vec<Column>> = HashMap::new();
         for (expression, name) in expr.iter() {
             if let Some(column) = expression.as_any().downcast_ref::<Column>() 
{
                 let new_col_idx = schema.index_of(name)?;
-                // When the column name is the same, but index does not equal, 
treat it as Alias
-                if (column.name() != name) || (column.index() != new_col_idx) {
-                    let entry = 
alias_map.entry(column.clone()).or_insert_with(Vec::new);
-                    entry.push(Column::new(name, new_col_idx));
-                }
+                let entry = 
columns_map.entry(column.clone()).or_insert_with(Vec::new);
+                entry.push(Column::new(name, new_col_idx));
             };
         }
 
@@ -110,10 +108,9 @@ impl ProjectionExec {
                 let normalized_exprs = sort_exprs
                     .iter()
                     .map(|sort_expr| {
-                        let expr = normalize_out_expr_with_alias_schema(
+                        let expr = normalize_out_expr_with_columns_map(
                             sort_expr.expr.clone(),
-                            &alias_map,
-                            &schema,
+                            &columns_map,
                         );
                         PhysicalSortExpr {
                             expr,
@@ -131,7 +128,7 @@ impl ProjectionExec {
             schema,
             input: input.clone(),
             output_ordering,
-            alias_map,
+            columns_map,
             metrics: ExecutionPlanMetricsSet::new(),
         })
     }
@@ -178,13 +175,10 @@ impl ExecutionPlan for ProjectionExec {
                 let normalized_exprs = exprs
                     .into_iter()
                     .map(|expr| {
-                        normalize_out_expr_with_alias_schema(
-                            expr,
-                            &self.alias_map,
-                            &self.schema,
-                        )
+                        normalize_out_expr_with_columns_map(expr, 
&self.columns_map)
                     })
                     .collect::<Vec<_>>();
+
                 Partitioning::Hash(normalized_exprs, part)
             }
             _ => input_partition,
@@ -204,7 +198,7 @@ impl ExecutionPlan for ProjectionExec {
         let mut new_properties = EquivalenceProperties::new(self.schema());
         project_equivalence_properties(
             self.input.equivalence_properties(),
-            &self.alias_map,
+            &self.columns_map,
             &mut new_properties,
         );
         new_properties
diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt 
b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index 4693423b3a..0ecede8806 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -1477,14 +1477,10 @@ SELECT + cor0.col0 AS col2 FROM tab1 AS cor0 GROUP BY 
cor0.col0
 28
 82
 
-# TODO: WRONG
-# https://github.com/apache/arrow-datafusion/issues/6099
 query I rowsort
 SELECT DISTINCT + 45 col0 FROM tab1 AS cor0 GROUP BY col0
 ----
 45
-45
-45
 
 query I rowsort label-211
 SELECT ALL CAST ( NULL AS INTEGER ) FROM tab2 AS cor0 GROUP BY col1
@@ -1914,13 +1910,10 @@ SELECT ALL - ( 30 ) * + cor0.col1 AS col2 FROM tab2 AS 
cor0 GROUP BY cor0.col1
 -1770
 -1830
 
-# TODO: WRONG
-# https://github.com/apache/arrow-datafusion/issues/6099
 query I rowsort
 SELECT DISTINCT 94 AS col1 FROM tab0 AS cor0 GROUP BY cor0.col1
 ----
 94
-94
 
 query I rowsort
 SELECT DISTINCT + col1 FROM tab2 AS cor0 GROUP BY cor0.col1
diff --git a/datafusion/physical-expr/src/lib.rs 
b/datafusion/physical-expr/src/lib.rs
index 84b177c4bf..5cbd40cc8c 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -56,7 +56,7 @@ pub use scalar_function::ScalarFunctionExpr;
 pub use sort_expr::{PhysicalSortExpr, PhysicalSortRequirement};
 pub use utils::{
     expr_list_eq_any_order, expr_list_eq_strict_order,
-    normalize_expr_with_equivalence_properties, 
normalize_out_expr_with_alias_schema,
+    normalize_expr_with_equivalence_properties, 
normalize_out_expr_with_columns_map,
     normalize_sort_expr_with_equivalence_properties, 
sort_expr_list_eq_strict_order,
     split_conjunction,
 };
diff --git a/datafusion/physical-expr/src/utils.rs 
b/datafusion/physical-expr/src/utils.rs
index e459246692..472914f8d0 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -110,15 +110,14 @@ fn split_conjunction_impl<'a>(
     }
 }
 
-/// Normalize the output expressions based on Alias Map and SchemaRef.
+/// Normalize the output expressions based on Columns Map.
 ///
-/// 1) If there is mapping in Alias Map, replace the Column in the output 
expressions with the 1st Column in Alias Map
-/// 2) If the Column is invalid for the current Schema, replace the Column 
with a place holder UnKnownColumn
+/// If there is a mapping in Columns Map, replace the Column in the output 
expressions with the 1st Column in the Columns Map.
+/// Otherwise, replace the Column with a place holder of [UnKnownColumn]
 ///
-pub fn normalize_out_expr_with_alias_schema(
+pub fn normalize_out_expr_with_columns_map(
     expr: Arc<dyn PhysicalExpr>,
-    alias_map: &HashMap<Column, Vec<Column>>,
-    schema: &SchemaRef,
+    columns_map: &HashMap<Column, Vec<Column>>,
 ) -> Arc<dyn PhysicalExpr> {
     expr.clone()
         .transform(&|expr| {
@@ -126,16 +125,10 @@ pub fn normalize_out_expr_with_alias_schema(
                 .as_any()
                 .downcast_ref::<Column>()
             {
-                Some(column) => {
-                    alias_map
-                        .get(column)
-                        .map(|c| Arc::new(c[0].clone()) as _)
-                        .or_else(|| match schema.index_of(column.name()) {
-                            // Exactly matching, return None, no need to do 
the transform
-                            Ok(idx) if column.index() == idx => None,
-                            _ => 
Some(Arc::new(UnKnownColumn::new(column.name())) as _),
-                        })
-                }
+                Some(column) => columns_map
+                    .get(column)
+                    .map(|c| Arc::new(c[0].clone()) as _)
+                    .or_else(|| 
Some(Arc::new(UnKnownColumn::new(column.name())) as _)),
                 None => None,
             };
             Ok(if let Some(normalized_form) = normalized_form {

Reply via email to