This is an automated email from the ASF dual-hosted git repository.
houqp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new f24e45f [nit] simplify datafusion optimizer module codes (#1146)
f24e45f is described below
commit f24e45fc8ec035e9ec0f6b6a18bb97e5bc0f9a1c
Author: Taehoon Moon <[email protected]>
AuthorDate: Wed Oct 20 03:15:40 2021 +0900
[nit] simplify datafusion optimizer module codes (#1146)
* [nit] simplify datafusion optimizer module codes
replace mut declaration to iterator collect in `filter_push_down.rs` and
`projection_push_down.rs`.
add `to_arrays` closure in `common_subexpr_eliminate.rs`.
* Change `to_arrays` from closure to `fn`
---
.../src/optimizer/common_subexpr_eliminate.rs | 56 ++++++++--------------
datafusion/src/optimizer/filter_push_down.rs | 24 ++++++----
datafusion/src/optimizer/projection_push_down.rs | 26 ++++------
3 files changed, 45 insertions(+), 61 deletions(-)
diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs
b/datafusion/src/optimizer/common_subexpr_eliminate.rs
index 760067b..7192471 100644
--- a/datafusion/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs
@@ -83,13 +83,7 @@ fn optimize(plan: &LogicalPlan, execution_props:
&ExecutionProps) -> Result<Logi
schema,
alias,
} => {
- let mut arrays = vec![];
- for e in expr {
- let data_type = e.get_type(input.schema())?;
- let mut id_array = vec![];
- expr_to_identifier(e, &mut expr_set, &mut id_array,
data_type)?;
- arrays.push(id_array);
- }
+ let arrays = to_arrays(expr, input, &mut expr_set)?;
let (mut new_expr, new_input) = rewrite_expr(
&[expr],
@@ -138,13 +132,7 @@ fn optimize(plan: &LogicalPlan, execution_props:
&ExecutionProps) -> Result<Logi
window_expr,
schema,
} => {
- let mut arrays = vec![];
- for e in window_expr {
- let data_type = e.get_type(input.schema())?;
- let mut id_array = vec![];
- expr_to_identifier(e, &mut expr_set, &mut id_array,
data_type)?;
- arrays.push(id_array);
- }
+ let arrays = to_arrays(window_expr, input, &mut expr_set)?;
let (mut new_expr, new_input) = rewrite_expr(
&[window_expr],
@@ -167,20 +155,8 @@ fn optimize(plan: &LogicalPlan, execution_props:
&ExecutionProps) -> Result<Logi
aggr_expr,
schema,
} => {
- let mut group_arrays = vec![];
- for e in group_expr {
- let data_type = e.get_type(input.schema())?;
- let mut id_array = vec![];
- expr_to_identifier(e, &mut expr_set, &mut id_array,
data_type)?;
- group_arrays.push(id_array);
- }
- let mut aggr_arrays = vec![];
- for e in aggr_expr {
- let data_type = e.get_type(input.schema())?;
- let mut id_array = vec![];
- expr_to_identifier(e, &mut expr_set, &mut id_array,
data_type)?;
- aggr_arrays.push(id_array);
- }
+ let group_arrays = to_arrays(group_expr, input, &mut expr_set)?;
+ let aggr_arrays = to_arrays(aggr_expr, input, &mut expr_set)?;
let (mut new_expr, new_input) = rewrite_expr(
&[group_expr, aggr_expr],
@@ -202,13 +178,7 @@ fn optimize(plan: &LogicalPlan, execution_props:
&ExecutionProps) -> Result<Logi
})
}
LogicalPlan::Sort { expr, input } => {
- let mut arrays = vec![];
- for e in expr {
- let data_type = e.get_type(input.schema())?;
- let mut id_array = vec![];
- expr_to_identifier(e, &mut expr_set, &mut id_array,
data_type)?;
- arrays.push(id_array);
- }
+ let arrays = to_arrays(expr, input, &mut expr_set)?;
let (mut new_expr, new_input) = rewrite_expr(
&[expr],
@@ -248,6 +218,22 @@ fn optimize(plan: &LogicalPlan, execution_props:
&ExecutionProps) -> Result<Logi
}
}
+fn to_arrays(
+ expr: &[Expr],
+ input: &LogicalPlan,
+ mut expr_set: &mut ExprSet,
+) -> Result<Vec<Vec<(usize, String)>>> {
+ expr.iter()
+ .map(|e| {
+ let data_type = e.get_type(input.schema())?;
+ let mut id_array = vec![];
+ expr_to_identifier(e, &mut expr_set, &mut id_array, data_type)?;
+
+ Ok(id_array)
+ })
+ .collect::<Result<Vec<_>>>()
+}
+
/// Build the "intermediate" projection plan that evaluates the extracted
common expressions.
///
/// This projection plan will merge all fields in the `input.schema()` into
its own schema.
diff --git a/datafusion/src/optimizer/filter_push_down.rs
b/datafusion/src/optimizer/filter_push_down.rs
index 1f82ca8..84265cb 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -322,16 +322,20 @@ fn optimize(plan: &LogicalPlan, mut state: State) ->
Result<LogicalPlan> {
} => {
// A projection is filter-commutable, but re-writes all predicate
expressions
// collect projection.
- let mut projection = HashMap::new();
- schema.fields().iter().enumerate().for_each(|(i, field)| {
- // strip alias, as they should not be part of filters
- let expr = match &expr[i] {
- Expr::Alias(expr, _) => expr.as_ref().clone(),
- expr => expr.clone(),
- };
-
- projection.insert(field.qualified_name(), expr);
- });
+ let projection = schema
+ .fields()
+ .iter()
+ .enumerate()
+ .map(|(i, field)| {
+ // strip alias, as they should not be part of filters
+ let expr = match &expr[i] {
+ Expr::Alias(expr, _) => expr.as_ref().clone(),
+ expr => expr.clone(),
+ };
+
+ (field.qualified_name(), expr)
+ })
+ .collect::<HashMap<_, _>>();
// re-write all filters based on this projection
// E.g. in `Filter: #b\n Projection: #a > 1 as b`, we can swap
them, but the filter must be "#a > 1"
diff --git a/datafusion/src/optimizer/projection_push_down.rs
b/datafusion/src/optimizer/projection_push_down.rs
index aba248c..58fde1d 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -105,22 +105,16 @@ fn get_projected_schema(
}
// create the projected schema
- let mut projected_fields: Vec<DFField> =
Vec::with_capacity(projection.len());
- match table_name {
- Some(qualifer) => {
- for i in &projection {
- projected_fields.push(DFField::from_qualified(
- qualifer,
- schema.fields()[*i].clone(),
- ));
- }
- }
- None => {
- for i in &projection {
-
projected_fields.push(DFField::from(schema.fields()[*i].clone()));
- }
- }
- }
+ let projected_fields: Vec<DFField> = match table_name {
+ Some(qualifer) => projection
+ .iter()
+ .map(|i| DFField::from_qualified(qualifer,
schema.fields()[*i].clone()))
+ .collect(),
+ None => projection
+ .iter()
+ .map(|i| DFField::from(schema.fields()[*i].clone()))
+ .collect(),
+ };
let projection = projection.into_iter().collect::<Vec<_>>();
Ok((projection, projected_fields.to_dfschema_ref()?))