This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7aad6c7f45 Fix column mapping in output_ordering() and
output_partitioning() for ProjectionExec and AggregateExec (#6113)
7aad6c7f45 is described below
commit 7aad6c7f45657f6594552e0244aa141a76d7448d
Author: mingmwang <[email protected]>
AuthorDate: Tue Apr 25 22:45:55 2023 +0800
Fix column mapping in output_ordering() and output_partitioning() for
ProjectionExec and AggregateExec (#6113)
* Fix column mapping in output_ordering() and output_partitioning() for
ProjectionExec and AggregateExec
* tiny
* remove issue link
---
.../core/src/physical_plan/aggregates/mod.rs | 25 ++++++++---------
datafusion/core/src/physical_plan/projection.rs | 32 +++++++++-------------
.../tests/sqllogictests/test_files/groupby.slt | 7 -----
datafusion/physical-expr/src/lib.rs | 2 +-
datafusion/physical-expr/src/utils.rs | 25 ++++++-----------
5 files changed, 34 insertions(+), 57 deletions(-)
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs
b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 3ec9a6d712..57525b387f 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -50,7 +50,7 @@ pub use datafusion_expr::AggregateFunction;
use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator;
use datafusion_physical_expr::equivalence::project_equivalence_properties;
pub use datafusion_physical_expr::expressions::create_aggregate_expr;
-use datafusion_physical_expr::normalize_out_expr_with_alias_schema;
+use datafusion_physical_expr::normalize_out_expr_with_columns_map;
/// Hash aggregate modes
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
@@ -203,9 +203,9 @@ pub struct AggregateExec {
/// same as input.schema() but for the final aggregate it will be the same
as the input
/// to the partial aggregate
pub(crate) input_schema: SchemaRef,
- /// The alias map used to normalize out expressions like Partitioning and
PhysicalSortExpr
+ /// The columns map used to normalize out expressions like Partitioning
and PhysicalSortExpr
/// The key is the column from the input schema and the values are the
columns from the output schema
- alias_map: HashMap<Column, Vec<Column>>,
+ columns_map: HashMap<Column, Vec<Column>>,
/// Execution Metrics
metrics: ExecutionPlanMetricsSet,
}
@@ -230,15 +230,13 @@ impl AggregateExec {
let schema = Arc::new(schema);
- let mut alias_map: HashMap<Column, Vec<Column>> = HashMap::new();
+ // construct a map from the input columns to the output columns of the
Aggregation
+ let mut columns_map: HashMap<Column, Vec<Column>> = HashMap::new();
for (expression, name) in group_by.expr.iter() {
if let Some(column) = expression.as_any().downcast_ref::<Column>()
{
let new_col_idx = schema.index_of(name)?;
- // When the column name is the same, but index does not equal,
treat it as Alias
- if (column.name() != name) || (column.index() != new_col_idx) {
- let entry =
alias_map.entry(column.clone()).or_insert_with(Vec::new);
- entry.push(Column::new(name, new_col_idx));
- }
+ let entry =
columns_map.entry(column.clone()).or_insert_with(Vec::new);
+ entry.push(Column::new(name, new_col_idx));
};
}
@@ -250,7 +248,7 @@ impl AggregateExec {
input,
schema,
input_schema,
- alias_map,
+ columns_map,
metrics: ExecutionPlanMetricsSet::new(),
})
}
@@ -358,10 +356,9 @@ impl ExecutionPlan for AggregateExec {
let normalized_exprs = exprs
.into_iter()
.map(|expr| {
- normalize_out_expr_with_alias_schema(
+ normalize_out_expr_with_columns_map(
expr,
- &self.alias_map,
- &self.schema,
+ &self.columns_map,
)
})
.collect::<Vec<_>>();
@@ -408,7 +405,7 @@ impl ExecutionPlan for AggregateExec {
let mut new_properties = EquivalenceProperties::new(self.schema());
project_equivalence_properties(
self.input.equivalence_properties(),
- &self.alias_map,
+ &self.columns_map,
&mut new_properties,
);
new_properties
diff --git a/datafusion/core/src/physical_plan/projection.rs
b/datafusion/core/src/physical_plan/projection.rs
index 95d33fbcd3..799de0c191 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -40,7 +40,7 @@ use super::metrics::{BaselineMetrics,
ExecutionPlanMetricsSet, MetricsSet};
use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
use crate::execution::context::TaskContext;
use datafusion_physical_expr::equivalence::project_equivalence_properties;
-use datafusion_physical_expr::normalize_out_expr_with_alias_schema;
+use datafusion_physical_expr::normalize_out_expr_with_columns_map;
use futures::stream::Stream;
use futures::stream::StreamExt;
@@ -55,9 +55,9 @@ pub struct ProjectionExec {
input: Arc<dyn ExecutionPlan>,
/// The output ordering
output_ordering: Option<Vec<PhysicalSortExpr>>,
- /// The alias map used to normalize out expressions like Partitioning and
PhysicalSortExpr
+ /// The columns map used to normalize out expressions like Partitioning
and PhysicalSortExpr
/// The key is the column from the input schema and the values are the
columns from the output schema
- alias_map: HashMap<Column, Vec<Column>>,
+ columns_map: HashMap<Column, Vec<Column>>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}
@@ -91,15 +91,13 @@ impl ProjectionExec {
input_schema.metadata().clone(),
));
- let mut alias_map: HashMap<Column, Vec<Column>> = HashMap::new();
+ // construct a map from the input columns to the output columns of the
Projection
+ let mut columns_map: HashMap<Column, Vec<Column>> = HashMap::new();
for (expression, name) in expr.iter() {
if let Some(column) = expression.as_any().downcast_ref::<Column>()
{
let new_col_idx = schema.index_of(name)?;
- // When the column name is the same, but index does not equal,
treat it as Alias
- if (column.name() != name) || (column.index() != new_col_idx) {
- let entry =
alias_map.entry(column.clone()).or_insert_with(Vec::new);
- entry.push(Column::new(name, new_col_idx));
- }
+ let entry =
columns_map.entry(column.clone()).or_insert_with(Vec::new);
+ entry.push(Column::new(name, new_col_idx));
};
}
@@ -110,10 +108,9 @@ impl ProjectionExec {
let normalized_exprs = sort_exprs
.iter()
.map(|sort_expr| {
- let expr = normalize_out_expr_with_alias_schema(
+ let expr = normalize_out_expr_with_columns_map(
sort_expr.expr.clone(),
- &alias_map,
- &schema,
+ &columns_map,
);
PhysicalSortExpr {
expr,
@@ -131,7 +128,7 @@ impl ProjectionExec {
schema,
input: input.clone(),
output_ordering,
- alias_map,
+ columns_map,
metrics: ExecutionPlanMetricsSet::new(),
})
}
@@ -178,13 +175,10 @@ impl ExecutionPlan for ProjectionExec {
let normalized_exprs = exprs
.into_iter()
.map(|expr| {
- normalize_out_expr_with_alias_schema(
- expr,
- &self.alias_map,
- &self.schema,
- )
+ normalize_out_expr_with_columns_map(expr,
&self.columns_map)
})
.collect::<Vec<_>>();
+
Partitioning::Hash(normalized_exprs, part)
}
_ => input_partition,
@@ -204,7 +198,7 @@ impl ExecutionPlan for ProjectionExec {
let mut new_properties = EquivalenceProperties::new(self.schema());
project_equivalence_properties(
self.input.equivalence_properties(),
- &self.alias_map,
+ &self.columns_map,
&mut new_properties,
);
new_properties
diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index 4693423b3a..0ecede8806 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -1477,14 +1477,10 @@ SELECT + cor0.col0 AS col2 FROM tab1 AS cor0 GROUP BY
cor0.col0
28
82
-# TODO: WRONG
-# https://github.com/apache/arrow-datafusion/issues/6099
query I rowsort
SELECT DISTINCT + 45 col0 FROM tab1 AS cor0 GROUP BY col0
----
45
-45
-45
query I rowsort label-211
SELECT ALL CAST ( NULL AS INTEGER ) FROM tab2 AS cor0 GROUP BY col1
@@ -1914,13 +1910,10 @@ SELECT ALL - ( 30 ) * + cor0.col1 AS col2 FROM tab2 AS
cor0 GROUP BY cor0.col1
-1770
-1830
-# TODO: WRONG
-# https://github.com/apache/arrow-datafusion/issues/6099
query I rowsort
SELECT DISTINCT 94 AS col1 FROM tab0 AS cor0 GROUP BY cor0.col1
----
94
-94
query I rowsort
SELECT DISTINCT + col1 FROM tab2 AS cor0 GROUP BY cor0.col1
diff --git a/datafusion/physical-expr/src/lib.rs
b/datafusion/physical-expr/src/lib.rs
index 84b177c4bf..5cbd40cc8c 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -56,7 +56,7 @@ pub use scalar_function::ScalarFunctionExpr;
pub use sort_expr::{PhysicalSortExpr, PhysicalSortRequirement};
pub use utils::{
expr_list_eq_any_order, expr_list_eq_strict_order,
- normalize_expr_with_equivalence_properties,
normalize_out_expr_with_alias_schema,
+ normalize_expr_with_equivalence_properties,
normalize_out_expr_with_columns_map,
normalize_sort_expr_with_equivalence_properties,
sort_expr_list_eq_strict_order,
split_conjunction,
};
diff --git a/datafusion/physical-expr/src/utils.rs
b/datafusion/physical-expr/src/utils.rs
index e459246692..472914f8d0 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -110,15 +110,14 @@ fn split_conjunction_impl<'a>(
}
}
-/// Normalize the output expressions based on Alias Map and SchemaRef.
+/// Normalize the output expressions based on Columns Map.
///
-/// 1) If there is mapping in Alias Map, replace the Column in the output
expressions with the 1st Column in Alias Map
-/// 2) If the Column is invalid for the current Schema, replace the Column
with a place holder UnKnownColumn
+/// If there is a mapping in Columns Map, replace the Column in the output
expressions with the 1st Column in the Columns Map.
+/// Otherwise, replace the Column with a place holder of [UnKnownColumn]
///
-pub fn normalize_out_expr_with_alias_schema(
+pub fn normalize_out_expr_with_columns_map(
expr: Arc<dyn PhysicalExpr>,
- alias_map: &HashMap<Column, Vec<Column>>,
- schema: &SchemaRef,
+ columns_map: &HashMap<Column, Vec<Column>>,
) -> Arc<dyn PhysicalExpr> {
expr.clone()
.transform(&|expr| {
@@ -126,16 +125,10 @@ pub fn normalize_out_expr_with_alias_schema(
.as_any()
.downcast_ref::<Column>()
{
- Some(column) => {
- alias_map
- .get(column)
- .map(|c| Arc::new(c[0].clone()) as _)
- .or_else(|| match schema.index_of(column.name()) {
- // Exactly matching, return None, no need to do
the transform
- Ok(idx) if column.index() == idx => None,
- _ =>
Some(Arc::new(UnKnownColumn::new(column.name())) as _),
- })
- }
+ Some(column) => columns_map
+ .get(column)
+ .map(|c| Arc::new(c[0].clone()) as _)
+ .or_else(||
Some(Arc::new(UnKnownColumn::new(column.name())) as _)),
None => None,
};
Ok(if let Some(normalized_form) = normalized_form {