This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 56c735c458 [MINOR]: Move some repetitive codes to functions (#9810)
56c735c458 is described below

commit 56c735c458c6d6dd7696941457dd4bbe95eaa2e0
Author: Mustafa Akur <[email protected]>
AuthorDate: Wed Mar 27 09:24:37 2024 +0300

    [MINOR]: Move some repetitive codes to functions (#9810)
    
    * Minor changes
    
    * Accept both owned and reference
---
 datafusion/core/src/datasource/memory.rs | 17 +++------
 datafusion/core/src/physical_planner.rs  | 63 ++++++++++++++------------------
 datafusion/physical-expr/src/lib.rs      |  2 +-
 datafusion/physical-expr/src/planner.rs  | 48 +++++++++++++-----------
 4 files changed, 60 insertions(+), 70 deletions(-)

diff --git a/datafusion/core/src/datasource/memory.rs 
b/datafusion/core/src/datasource/memory.rs
index 3c76ee6358..608a46144d 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -33,7 +33,7 @@ use crate::physical_plan::{
     common, DisplayAs, DisplayFormatType, ExecutionPlan, 
ExecutionPlanProperties,
     Partitioning, SendableRecordBatchStream,
 };
-use crate::physical_planner::create_physical_sort_expr;
+use crate::physical_planner::create_physical_sort_exprs;
 
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
@@ -231,16 +231,11 @@ impl TableProvider for MemTable {
             let file_sort_order = sort_order
                 .iter()
                 .map(|sort_exprs| {
-                    sort_exprs
-                        .iter()
-                        .map(|expr| {
-                            create_physical_sort_expr(
-                                expr,
-                                &df_schema,
-                                state.execution_props(),
-                            )
-                        })
-                        .collect::<Result<Vec<_>>>()
+                    create_physical_sort_exprs(
+                        sort_exprs,
+                        &df_schema,
+                        state.execution_props(),
+                    )
                 })
                 .collect::<Result<Vec<_>>>()?;
             exec = exec.with_sort_information(file_sort_order);
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index ca708b0582..deac3dcf46 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -43,7 +43,7 @@ use crate::logical_expr::{
     Repartition, Union, UserDefinedLogicalNode,
 };
 use crate::logical_expr::{Limit, Values};
-use crate::physical_expr::create_physical_expr;
+use crate::physical_expr::{create_physical_expr, create_physical_exprs};
 use crate::physical_optimizer::optimizer::PhysicalOptimizerRule;
 use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, 
PhysicalGroupBy};
 use crate::physical_plan::analyze::AnalyzeExec;
@@ -96,6 +96,7 @@ use datafusion_sql::utils::window_expr_common_partition_keys;
 
 use async_trait::async_trait;
 use datafusion_common::config::FormatOptions;
+use datafusion_physical_expr::LexOrdering;
 use futures::future::BoxFuture;
 use futures::{FutureExt, StreamExt, TryStreamExt};
 use itertools::{multiunzip, Itertools};
@@ -958,14 +959,7 @@ impl DefaultPhysicalPlanner {
                 LogicalPlan::Sort(Sort { expr, input, fetch, .. }) => {
                     let physical_input = self.create_initial_plan(input, 
session_state).await?;
                     let input_dfschema = input.as_ref().schema();
-                    let sort_expr = expr
-                        .iter()
-                        .map(|e| create_physical_sort_expr(
-                            e,
-                            input_dfschema,
-                            session_state.execution_props(),
-                        ))
-                        .collect::<Result<Vec<_>>>()?;
+                    let sort_expr = create_physical_sort_exprs(expr, 
input_dfschema, session_state.execution_props())?;
                     let new_sort = SortExec::new(sort_expr, physical_input)
                         .with_fetch(*fetch);
                     Ok(Arc::new(new_sort))
@@ -1592,18 +1586,11 @@ pub fn create_window_expr_with_name(
             window_frame,
             null_treatment,
         }) => {
-            let args = args
-                .iter()
-                .map(|e| create_physical_expr(e, logical_schema, 
execution_props))
-                .collect::<Result<Vec<_>>>()?;
-            let partition_by = partition_by
-                .iter()
-                .map(|e| create_physical_expr(e, logical_schema, 
execution_props))
-                .collect::<Result<Vec<_>>>()?;
-            let order_by = order_by
-                .iter()
-                .map(|e| create_physical_sort_expr(e, logical_schema, 
execution_props))
-                .collect::<Result<Vec<_>>>()?;
+            let args = create_physical_exprs(args, logical_schema, 
execution_props)?;
+            let partition_by =
+                create_physical_exprs(partition_by, logical_schema, 
execution_props)?;
+            let order_by =
+                create_physical_sort_exprs(order_by, logical_schema, 
execution_props)?;
 
             if !is_window_frame_bound_valid(window_frame) {
                 return plan_err!(
@@ -1670,10 +1657,8 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
             order_by,
             null_treatment,
         }) => {
-            let args = args
-                .iter()
-                .map(|e| create_physical_expr(e, logical_input_schema, 
execution_props))
-                .collect::<Result<Vec<_>>>()?;
+            let args =
+                create_physical_exprs(args, logical_input_schema, 
execution_props)?;
             let filter = match filter {
                 Some(e) => Some(create_physical_expr(
                     e,
@@ -1683,17 +1668,11 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
                 None => None,
             };
             let order_by = match order_by {
-                Some(e) => Some(
-                    e.iter()
-                        .map(|expr| {
-                            create_physical_sort_expr(
-                                expr,
-                                logical_input_schema,
-                                execution_props,
-                            )
-                        })
-                        .collect::<Result<Vec<_>>>()?,
-                ),
+                Some(e) => Some(create_physical_sort_exprs(
+                    e,
+                    logical_input_schema,
+                    execution_props,
+                )?),
                 None => None,
             };
             let ignore_nulls = null_treatment
@@ -1780,6 +1759,18 @@ pub fn create_physical_sort_expr(
     }
 }
 
+/// Create vector of physical sort expression from a vector of logical 
expression
+pub fn create_physical_sort_exprs(
+    exprs: &[Expr],
+    input_dfschema: &DFSchema,
+    execution_props: &ExecutionProps,
+) -> Result<LexOrdering> {
+    exprs
+        .iter()
+        .map(|expr| create_physical_sort_expr(expr, input_dfschema, 
execution_props))
+        .collect::<Result<Vec<_>>>()
+}
+
 impl DefaultPhysicalPlanner {
     /// Handles capturing the various plans for EXPLAIN queries
     ///
diff --git a/datafusion/physical-expr/src/lib.rs 
b/datafusion/physical-expr/src/lib.rs
index 1791a6ed60..1dead09954 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -53,7 +53,7 @@ pub use physical_expr::{
     physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal,
     PhysicalExpr, PhysicalExprRef,
 };
-pub use planner::create_physical_expr;
+pub use planner::{create_physical_expr, create_physical_exprs};
 pub use scalar_function::ScalarFunctionExpr;
 pub use sort_expr::{
     LexOrdering, LexOrderingRef, LexRequirement, LexRequirementRef, 
PhysicalSortExpr,
diff --git a/datafusion/physical-expr/src/planner.rs 
b/datafusion/physical-expr/src/planner.rs
index 241f01a417..319d9ca226 100644
--- a/datafusion/physical-expr/src/planner.rs
+++ b/datafusion/physical-expr/src/planner.rs
@@ -168,20 +168,15 @@ pub fn create_physical_expr(
             } else {
                 None
             };
-            let when_expr = case
+            let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case
                 .when_then_expr
                 .iter()
-                .map(|(w, _)| {
-                    create_physical_expr(w.as_ref(), input_dfschema, 
execution_props)
-                })
-                .collect::<Result<Vec<_>>>()?;
-            let then_expr = case
-                .when_then_expr
-                .iter()
-                .map(|(_, t)| {
-                    create_physical_expr(t.as_ref(), input_dfschema, 
execution_props)
-                })
-                .collect::<Result<Vec<_>>>()?;
+                .map(|(w, t)| (w.as_ref(), t.as_ref()))
+                .unzip();
+            let when_expr =
+                create_physical_exprs(when_expr, input_dfschema, 
execution_props)?;
+            let then_expr =
+                create_physical_exprs(then_expr, input_dfschema, 
execution_props)?;
             let when_then_expr: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn 
PhysicalExpr>)> =
                 when_expr
                     .iter()
@@ -248,10 +243,8 @@ pub fn create_physical_expr(
         }
 
         Expr::ScalarFunction(ScalarFunction { func_def, args }) => {
-            let physical_args = args
-                .iter()
-                .map(|e| create_physical_expr(e, input_dfschema, 
execution_props))
-                .collect::<Result<Vec<_>>>()?;
+            let physical_args =
+                create_physical_exprs(args, input_dfschema, execution_props)?;
 
             match func_def {
                 ScalarFunctionDefinition::BuiltIn(fun) => {
@@ -310,12 +303,8 @@ pub fn create_physical_expr(
                 let value_expr =
                     create_physical_expr(expr, input_dfschema, 
execution_props)?;
 
-                let list_exprs = list
-                    .iter()
-                    .map(|expr| {
-                        create_physical_expr(expr, input_dfschema, 
execution_props)
-                    })
-                    .collect::<Result<Vec<_>>>()?;
+                let list_exprs =
+                    create_physical_exprs(list, input_dfschema, 
execution_props)?;
                 expressions::in_list(value_expr, list_exprs, negated, 
input_schema)
             }
         },
@@ -325,6 +314,21 @@ pub fn create_physical_expr(
     }
 }
 
+/// Create vector of Physical Expression from a vector of logical expression
+pub fn create_physical_exprs<'a, I>(
+    exprs: I,
+    input_dfschema: &DFSchema,
+    execution_props: &ExecutionProps,
+) -> Result<Vec<Arc<dyn PhysicalExpr>>>
+where
+    I: IntoIterator<Item = &'a Expr>,
+{
+    exprs
+        .into_iter()
+        .map(|expr| create_physical_expr(expr, input_dfschema, 
execution_props))
+        .collect::<Result<Vec<_>>>()
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;

Reply via email to