This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 56c735c458 [MINOR]: Move some repetitive codes to functions (#9810)
56c735c458 is described below
commit 56c735c458c6d6dd7696941457dd4bbe95eaa2e0
Author: Mustafa Akur <[email protected]>
AuthorDate: Wed Mar 27 09:24:37 2024 +0300
[MINOR]: Move some repetitive codes to functions (#9810)
* Minor changes
* Accept both owned and reference
---
datafusion/core/src/datasource/memory.rs | 17 +++------
datafusion/core/src/physical_planner.rs | 63 ++++++++++++++------------------
datafusion/physical-expr/src/lib.rs | 2 +-
datafusion/physical-expr/src/planner.rs | 48 +++++++++++++-----------
4 files changed, 60 insertions(+), 70 deletions(-)
diff --git a/datafusion/core/src/datasource/memory.rs
b/datafusion/core/src/datasource/memory.rs
index 3c76ee6358..608a46144d 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -33,7 +33,7 @@ use crate::physical_plan::{
common, DisplayAs, DisplayFormatType, ExecutionPlan,
ExecutionPlanProperties,
Partitioning, SendableRecordBatchStream,
};
-use crate::physical_planner::create_physical_sort_expr;
+use crate::physical_planner::create_physical_sort_exprs;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
@@ -231,16 +231,11 @@ impl TableProvider for MemTable {
let file_sort_order = sort_order
.iter()
.map(|sort_exprs| {
- sort_exprs
- .iter()
- .map(|expr| {
- create_physical_sort_expr(
- expr,
- &df_schema,
- state.execution_props(),
- )
- })
- .collect::<Result<Vec<_>>>()
+ create_physical_sort_exprs(
+ sort_exprs,
+ &df_schema,
+ state.execution_props(),
+ )
})
.collect::<Result<Vec<_>>>()?;
exec = exec.with_sort_information(file_sort_order);
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index ca708b0582..deac3dcf46 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -43,7 +43,7 @@ use crate::logical_expr::{
Repartition, Union, UserDefinedLogicalNode,
};
use crate::logical_expr::{Limit, Values};
-use crate::physical_expr::create_physical_expr;
+use crate::physical_expr::{create_physical_expr, create_physical_exprs};
use crate::physical_optimizer::optimizer::PhysicalOptimizerRule;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode,
PhysicalGroupBy};
use crate::physical_plan::analyze::AnalyzeExec;
@@ -96,6 +96,7 @@ use datafusion_sql::utils::window_expr_common_partition_keys;
use async_trait::async_trait;
use datafusion_common::config::FormatOptions;
+use datafusion_physical_expr::LexOrdering;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryStreamExt};
use itertools::{multiunzip, Itertools};
@@ -958,14 +959,7 @@ impl DefaultPhysicalPlanner {
LogicalPlan::Sort(Sort { expr, input, fetch, .. }) => {
let physical_input = self.create_initial_plan(input,
session_state).await?;
let input_dfschema = input.as_ref().schema();
- let sort_expr = expr
- .iter()
- .map(|e| create_physical_sort_expr(
- e,
- input_dfschema,
- session_state.execution_props(),
- ))
- .collect::<Result<Vec<_>>>()?;
+ let sort_expr = create_physical_sort_exprs(expr,
input_dfschema, session_state.execution_props())?;
let new_sort = SortExec::new(sort_expr, physical_input)
.with_fetch(*fetch);
Ok(Arc::new(new_sort))
@@ -1592,18 +1586,11 @@ pub fn create_window_expr_with_name(
window_frame,
null_treatment,
}) => {
- let args = args
- .iter()
- .map(|e| create_physical_expr(e, logical_schema,
execution_props))
- .collect::<Result<Vec<_>>>()?;
- let partition_by = partition_by
- .iter()
- .map(|e| create_physical_expr(e, logical_schema,
execution_props))
- .collect::<Result<Vec<_>>>()?;
- let order_by = order_by
- .iter()
- .map(|e| create_physical_sort_expr(e, logical_schema,
execution_props))
- .collect::<Result<Vec<_>>>()?;
+ let args = create_physical_exprs(args, logical_schema,
execution_props)?;
+ let partition_by =
+ create_physical_exprs(partition_by, logical_schema,
execution_props)?;
+ let order_by =
+ create_physical_sort_exprs(order_by, logical_schema,
execution_props)?;
if !is_window_frame_bound_valid(window_frame) {
return plan_err!(
@@ -1670,10 +1657,8 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
order_by,
null_treatment,
}) => {
- let args = args
- .iter()
- .map(|e| create_physical_expr(e, logical_input_schema,
execution_props))
- .collect::<Result<Vec<_>>>()?;
+ let args =
+ create_physical_exprs(args, logical_input_schema,
execution_props)?;
let filter = match filter {
Some(e) => Some(create_physical_expr(
e,
@@ -1683,17 +1668,11 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
None => None,
};
let order_by = match order_by {
- Some(e) => Some(
- e.iter()
- .map(|expr| {
- create_physical_sort_expr(
- expr,
- logical_input_schema,
- execution_props,
- )
- })
- .collect::<Result<Vec<_>>>()?,
- ),
+ Some(e) => Some(create_physical_sort_exprs(
+ e,
+ logical_input_schema,
+ execution_props,
+ )?),
None => None,
};
let ignore_nulls = null_treatment
@@ -1780,6 +1759,18 @@ pub fn create_physical_sort_expr(
}
}
+/// Create vector of physical sort expression from a vector of logical
expression
+pub fn create_physical_sort_exprs(
+ exprs: &[Expr],
+ input_dfschema: &DFSchema,
+ execution_props: &ExecutionProps,
+) -> Result<LexOrdering> {
+ exprs
+ .iter()
+ .map(|expr| create_physical_sort_expr(expr, input_dfschema,
execution_props))
+ .collect::<Result<Vec<_>>>()
+}
+
impl DefaultPhysicalPlanner {
/// Handles capturing the various plans for EXPLAIN queries
///
diff --git a/datafusion/physical-expr/src/lib.rs
b/datafusion/physical-expr/src/lib.rs
index 1791a6ed60..1dead09954 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -53,7 +53,7 @@ pub use physical_expr::{
physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal,
PhysicalExpr, PhysicalExprRef,
};
-pub use planner::create_physical_expr;
+pub use planner::{create_physical_expr, create_physical_exprs};
pub use scalar_function::ScalarFunctionExpr;
pub use sort_expr::{
LexOrdering, LexOrderingRef, LexRequirement, LexRequirementRef,
PhysicalSortExpr,
diff --git a/datafusion/physical-expr/src/planner.rs
b/datafusion/physical-expr/src/planner.rs
index 241f01a417..319d9ca226 100644
--- a/datafusion/physical-expr/src/planner.rs
+++ b/datafusion/physical-expr/src/planner.rs
@@ -168,20 +168,15 @@ pub fn create_physical_expr(
} else {
None
};
- let when_expr = case
+ let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case
.when_then_expr
.iter()
- .map(|(w, _)| {
- create_physical_expr(w.as_ref(), input_dfschema,
execution_props)
- })
- .collect::<Result<Vec<_>>>()?;
- let then_expr = case
- .when_then_expr
- .iter()
- .map(|(_, t)| {
- create_physical_expr(t.as_ref(), input_dfschema,
execution_props)
- })
- .collect::<Result<Vec<_>>>()?;
+ .map(|(w, t)| (w.as_ref(), t.as_ref()))
+ .unzip();
+ let when_expr =
+ create_physical_exprs(when_expr, input_dfschema,
execution_props)?;
+ let then_expr =
+ create_physical_exprs(then_expr, input_dfschema,
execution_props)?;
let when_then_expr: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn
PhysicalExpr>)> =
when_expr
.iter()
@@ -248,10 +243,8 @@ pub fn create_physical_expr(
}
Expr::ScalarFunction(ScalarFunction { func_def, args }) => {
- let physical_args = args
- .iter()
- .map(|e| create_physical_expr(e, input_dfschema,
execution_props))
- .collect::<Result<Vec<_>>>()?;
+ let physical_args =
+ create_physical_exprs(args, input_dfschema, execution_props)?;
match func_def {
ScalarFunctionDefinition::BuiltIn(fun) => {
@@ -310,12 +303,8 @@ pub fn create_physical_expr(
let value_expr =
create_physical_expr(expr, input_dfschema,
execution_props)?;
- let list_exprs = list
- .iter()
- .map(|expr| {
- create_physical_expr(expr, input_dfschema,
execution_props)
- })
- .collect::<Result<Vec<_>>>()?;
+ let list_exprs =
+ create_physical_exprs(list, input_dfschema,
execution_props)?;
expressions::in_list(value_expr, list_exprs, negated,
input_schema)
}
},
@@ -325,6 +314,21 @@ pub fn create_physical_expr(
}
}
+/// Create vector of Physical Expression from a vector of logical expression
+pub fn create_physical_exprs<'a, I>(
+ exprs: I,
+ input_dfschema: &DFSchema,
+ execution_props: &ExecutionProps,
+) -> Result<Vec<Arc<dyn PhysicalExpr>>>
+where
+ I: IntoIterator<Item = &'a Expr>,
+{
+ exprs
+ .into_iter()
+ .map(|expr| create_physical_expr(expr, input_dfschema,
execution_props))
+ .collect::<Result<Vec<_>>>()
+}
+
#[cfg(test)]
mod tests {
use super::*;