This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a4233adf84 Named window support (#6419)
a4233adf84 is described below

commit a4233adf8438376363fae6bacb75131400c11ab4
Author: Berkay Şahin <[email protected]>
AuthorDate: Thu May 25 11:39:08 2023 +0300

    Named window support (#6419)
    
    * named window statements can be planned
    
    * Remove unnecessary clone
    
    * Semantic check is added, avro error fix PR is waiting
    
    * SQLParser updates are handled better, a test is added.
    
    * Fix document error
    
    * Minor changes
    
    * UnnamedExpr bug fix
    
    * Refactor named window handling in select_to_plan()
    
    * Remove unnecessary clone
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
    Co-authored-by: Mustafa Akur <[email protected]>
---
 .../tests/sqllogictests/test_files/groupby.slt     |  35 +++++++
 .../core/tests/sqllogictests/test_files/window.slt | 113 +++++++++++++++++++++
 datafusion/sql/src/expr/function.rs                |  21 +---
 datafusion/sql/src/expr/mod.rs                     |  26 +----
 datafusion/sql/src/expr/order_by.rs                |  68 +++++++------
 datafusion/sql/src/query.rs                        |   7 +-
 datafusion/sql/src/select.rs                       |  57 ++++++++++-
 datafusion/sql/src/statement.rs                    |   5 +-
 8 files changed, 246 insertions(+), 86 deletions(-)

diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt 
b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index 79520ff734..b81731e64c 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -2214,3 +2214,38 @@ SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.amount 
DESC) AS amounts,
 FRA [200.0, 50.0] 250
 GRC [80.0, 30.0] 110
 TUR [100.0, 75.0] 175
+
+# test_ordering_sensitive_aggregation7
+# Lexicographical ordering requirement can be given as 
+# argument to the aggregate functions
+query TT
+EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.country DESC, s.amount 
DESC) AS amounts,
+        SUM(s.amount) AS sum1
+          FROM (SELECT *
+            FROM sales_global
+            ORDER BY country) AS s
+          GROUP BY s.country
+----
+logical_plan
+Projection: s.country, ARRAYAGG(s.amount) ORDER BY [s.country DESC NULLS 
FIRST, s.amount DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
+--Aggregate: groupBy=[[s.country]], aggr=[[ARRAYAGG(s.amount) ORDER BY 
[s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST], SUM(s.amount)]]
+----SubqueryAlias: s
+------Sort: sales_global.country ASC NULLS LAST
+--------TableScan: sales_global projection=[country, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, ARRAYAGG(s.amount) ORDER BY 
[s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST]@1 as amounts, 
SUM(s.amount)@2 as sum1]
+--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAYAGG(s.amount), SUM(s.amount)], ordering_mode=FullyOrdered
+----SortExec: expr=[country@0 ASC NULLS LAST,amount@1 DESC]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query T?R
+SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.country DESC, s.amount DESC) 
AS amounts,
+  SUM(s.amount) AS sum1
+    FROM (SELECT *
+      FROM sales_global
+      ORDER BY country) AS s
+    GROUP BY s.country
+----
+FRA [200.0, 50.0] 250
+GRC [80.0, 30.0] 110
+TUR [100.0, 75.0] 175
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt 
b/datafusion/core/tests/sqllogictests/test_files/window.slt
index b15bc90fb1..32f45dbb57 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -3023,3 +3023,116 @@ drop table annotated_data_finite2
 
 statement ok
 drop table annotated_data_infinite2
+
+# window3 spec is not used in window functions.
+# The query should still work.
+query RR
+SELECT
+  MAX(c12) OVER window1,
+  MIN(c12) OVER window2 as max1
+  FROM aggregate_test_100
+  WINDOW window1 AS (ORDER BY C12),
+  window2 AS (PARTITION BY C11),
+  window3 AS (ORDER BY C1)
+  ORDER BY C3
+  LIMIT 5
+----
+0.970671228336 0.970671228336
+0.850672105305 0.850672105305
+0.152498292972 0.152498292972
+0.369363046006 0.369363046006
+0.56535284223 0.56535284223
+
+query TT
+EXPLAIN SELECT
+  MAX(c12) OVER window1 as min1,
+  MIN(c12) OVER window2 as max1
+  FROM aggregate_test_100
+  WINDOW window1 AS (ORDER BY C12),
+  window2 AS (PARTITION BY C11),
+  window3 AS (ORDER BY C1)
+  ORDER BY C3
+  LIMIT 5
+----
+logical_plan
+Projection: min1, max1
+--Limit: skip=0, fetch=5
+----Sort: aggregate_test_100.c3 ASC NULLS LAST, fetch=5
+------Projection: MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS min1, 
MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS max1, aggregate_test_100.c3
+--------WindowAggr: windowExpr=[[MAX(aggregate_test_100.c12) ORDER BY 
[aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW]]
+----------Projection: aggregate_test_100.c3, aggregate_test_100.c12, 
MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
+------------WindowAggr: windowExpr=[[MIN(aggregate_test_100.c12) PARTITION BY 
[aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING]]
+--------------TableScan: aggregate_test_100 projection=[c3, c11, c12]
+physical_plan
+ProjectionExec: expr=[min1@0 as min1, max1@1 as max1]
+--GlobalLimitExec: skip=0, fetch=5
+----SortExec: fetch=5, expr=[c3@2 ASC NULLS LAST]
+------ProjectionExec: expr=[MAX(aggregate_test_100.c12) ORDER BY 
[aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@3 as min1, MIN(aggregate_test_100.c12) PARTITION BY 
[aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@2 as max1, c3@0 as c3]
+--------BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c12): Ok(Field { 
name: "MAX(aggregate_test_100.c12)", data_type: Float64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Float64(NULL)), end_bound: CurrentRow }], 
mode=[Sorted]
+----------SortExec: expr=[c12@1 ASC NULLS LAST]
+------------ProjectionExec: expr=[c3@0 as c3, c12@2 as c12, 
MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@3 as MIN(aggregate_test_100.c12)]
+--------------WindowAggExec: wdw=[MIN(aggregate_test_100.c12): Ok(Field { 
name: "MIN(aggregate_test_100.c12)", data_type: Float64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)) }]
+----------------SortExec: expr=[c11@1 ASC NULLS LAST]
+------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3, 
c11, c12], has_header=true
+
+# window1 spec is used multiple times under different aggregations.
+# The query should still work.
+query RR
+SELECT
+  MAX(c12) OVER window1 as min1,
+  MIN(c12) OVER window1 as max1
+  FROM aggregate_test_100
+  WINDOW window1 AS (ORDER BY C12)
+  ORDER BY C3
+  LIMIT 5
+----
+0.970671228336 0.014793053078
+0.850672105305 0.014793053078
+0.152498292972 0.014793053078
+0.369363046006 0.014793053078
+0.56535284223 0.014793053078
+
+query TT
+EXPLAIN SELECT
+  MAX(c12) OVER window1 as min1,
+  MIN(c12) OVER window1 as max1
+  FROM aggregate_test_100
+  WINDOW window1 AS (ORDER BY C12)
+  ORDER BY C3
+  LIMIT 5
+----
+logical_plan
+Projection: min1, max1
+--Limit: skip=0, fetch=5
+----Sort: aggregate_test_100.c3 ASC NULLS LAST, fetch=5
+------Projection: MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS min1, 
MIN(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS max1, aggregate_test_100.c3
+--------WindowAggr: windowExpr=[[MAX(aggregate_test_100.c12) ORDER BY 
[aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW, MIN(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC 
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----------TableScan: aggregate_test_100 projection=[c3, c12]
+physical_plan
+ProjectionExec: expr=[min1@0 as min1, max1@1 as max1]
+--GlobalLimitExec: skip=0, fetch=5
+----SortExec: fetch=5, expr=[c3@2 ASC NULLS LAST]
+------ProjectionExec: expr=[MAX(aggregate_test_100.c12) ORDER BY 
[aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@2 as min1, MIN(aggregate_test_100.c12) ORDER BY 
[aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@3 as max1, c3@0 as c3]
+--------BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c12): Ok(Field { 
name: "MAX(aggregate_test_100.c12)", data_type: Float64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Float64(NULL)), end_bound: CurrentRow }, 
MIN(aggregate_test_100.c12): Ok(Field { name: "MIN(aggregate_test_100.c12)", 
data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units [...]
+----------SortExec: expr=[c12@1 ASC NULLS LAST]
+------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3, 
c12], has_header=true
+
+# window2 spec is not defined
+statement error DataFusion error: Error during planning: The window window2 is 
not defined!
+SELECT
+  MAX(c12) OVER window1 as min1,
+  MIN(c12) OVER window2 as max1
+  FROM aggregate_test_100
+  WINDOW window1 AS (ORDER BY C12)
+  ORDER BY C3
+  LIMIT 5
+
+# window1 spec is defined multiple times
+statement error DataFusion error: Error during planning: The window window1 is 
defined multiple times!
+SELECT
+  MAX(c12) OVER window1 as min1,
+  FROM aggregate_test_100
+  WINDOW window1 AS (ORDER BY C12),
+  window1 AS (ORDER BY C3)
+  ORDER BY C3
+  LIMIT 5
\ No newline at end of file
diff --git a/datafusion/sql/src/expr/function.rs 
b/datafusion/sql/src/expr/function.rs
index bf69c381cd..0c5b460ead 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -54,29 +54,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         };
 
         // then, window function
-        let window = function
-            .over
-            .take()
-            // TODO support named windows
-            .map(|window| match window {
-                WindowType::WindowSpec(window_spec) => Ok(window_spec),
-                WindowType::NamedWindow(name) => 
Err(DataFusionError::NotImplemented(
-                    format!("Named windows ({name}) are not supported"),
-                )),
-            })
-            .transpose()?;
-
-        if let Some(window) = window {
+        if let Some(WindowType::WindowSpec(window)) = function.over.take() {
             let partition_by = window
                 .partition_by
                 .into_iter()
                 .map(|e| self.sql_expr_to_logical_expr(e, schema, 
planner_context))
                 .collect::<Result<Vec<_>>>()?;
-            let order_by = window
-                .order_by
-                .into_iter()
-                .map(|e| self.order_by_to_sort_expr(e, schema, 
planner_context))
-                .collect::<Result<Vec<_>>>()?;
+            let order_by =
+                self.order_by_to_sort_expr(&window.order_by, schema, 
planner_context)?;
             let window_frame = window
                 .window_frame
                 .as_ref()
diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs
index 05136a4a06..66422aa43f 100644
--- a/datafusion/sql/src/expr/mod.rs
+++ b/datafusion/sql/src/expr/mod.rs
@@ -167,7 +167,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 leading_precision,
                 last_field,
                 fractional_seconds_precision,
-            }) => self.sql_interval_to_expr(
+            })=> self.sql_interval_to_expr(
                 *value,
                 schema,
                 planner_context,
@@ -350,28 +350,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             within_group,
         } = array_agg;
 
-        let order_by = if let Some(mut order_by) = order_by {
-            // TODO: Once sqlparser supports multiple order by clause, handle 
it
-            //       see issue: 
https://github.com/sqlparser-rs/sqlparser-rs/issues/875
-            let order_by = match order_by.len() {
-                0 => {
-                    return Err(DataFusionError::NotImplemented(
-                        "ARRAY_AGG with empty ORDER BY not 
supported".to_string(),
-                    ))
-                }
-                1 => order_by.pop().unwrap(),
-                n => {
-                    return Err(DataFusionError::NotImplemented(format!(
-                        "ARRAY_AGG only supports a single ORDER BY expression. 
Got {n}"
-                    )))
-                }
-            };
-
-            Some(vec![self.order_by_to_sort_expr(
-                order_by,
-                input_schema,
-                planner_context,
-            )?])
+        let order_by = if let Some(order_by) = order_by {
+            Some(self.order_by_to_sort_expr(&order_by, input_schema, 
planner_context)?)
         } else {
             None
         };
diff --git a/datafusion/sql/src/expr/order_by.rs 
b/datafusion/sql/src/expr/order_by.rs
index d8c0e34f34..b32388f1bc 100644
--- a/datafusion/sql/src/expr/order_by.rs
+++ b/datafusion/sql/src/expr/order_by.rs
@@ -22,51 +22,53 @@ use datafusion_expr::Expr;
 use sqlparser::ast::{Expr as SQLExpr, OrderByExpr, Value};
 
 impl<'a, S: ContextProvider> SqlToRel<'a, S> {
-    /// convert sql OrderByExpr to Expr::Sort
+    /// convert sql [OrderByExpr] to `Vec<Expr>`
     pub(crate) fn order_by_to_sort_expr(
         &self,
-        e: OrderByExpr,
+        exprs: &[OrderByExpr],
         schema: &DFSchema,
         planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        let OrderByExpr {
-            asc,
-            expr,
-            nulls_first,
-        } = e;
+    ) -> Result<Vec<Expr>> {
+        let mut expr_vec = vec![];
+        for e in exprs {
+            let OrderByExpr {
+                asc,
+                expr,
+                nulls_first,
+            } = e;
 
-        let expr = match expr {
-            SQLExpr::Value(Value::Number(v, _)) => {
-                let field_index = v
-                    .parse::<usize>()
-                    .map_err(|err| DataFusionError::Plan(err.to_string()))?;
+            let expr = match expr {
+                SQLExpr::Value(Value::Number(v, _)) => {
+                    let field_index = v
+                        .parse::<usize>()
+                        .map_err(|err| 
DataFusionError::Plan(err.to_string()))?;
 
-                if field_index == 0 {
-                    return Err(DataFusionError::Plan(
-                        "Order by index starts at 1 for column 
indexes".to_string(),
-                    ));
-                } else if schema.fields().len() < field_index {
-                    return Err(DataFusionError::Plan(format!(
-                        "Order by column out of bounds, specified: {}, max: 
{}",
-                        field_index,
-                        schema.fields().len()
-                    )));
-                }
+                    if field_index == 0 {
+                        return Err(DataFusionError::Plan(
+                            "Order by index starts at 1 for column 
indexes".to_string(),
+                        ));
+                    } else if schema.fields().len() < field_index {
+                        return Err(DataFusionError::Plan(format!(
+                            "Order by column out of bounds, specified: {}, 
max: {}",
+                            field_index,
+                            schema.fields().len()
+                        )));
+                    }
 
-                let field = schema.field(field_index - 1);
-                Expr::Column(field.qualified_column())
-            }
-            e => self.sql_expr_to_logical_expr(e, schema, planner_context)?,
-        };
-        Ok({
+                    let field = schema.field(field_index - 1);
+                    Expr::Column(field.qualified_column())
+                }
+                e => self.sql_expr_to_logical_expr(e.clone(), schema, 
planner_context)?,
+            };
             let asc = asc.unwrap_or(true);
-            Expr::Sort(Sort::new(
+            expr_vec.push(Expr::Sort(Sort::new(
                 Box::new(expr),
                 asc,
                 // when asc is true, by default nulls last to be consistent 
with postgres
                 // postgres rule: 
https://www.postgresql.org/docs/current/queries-order.html
                 nulls_first.unwrap_or(!asc),
-            ))
-        })
+            )))
+        }
+        Ok(expr_vec)
     }
 }
diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs
index ea1f72e76c..27235a050d 100644
--- a/datafusion/sql/src/query.rs
+++ b/datafusion/sql/src/query.rs
@@ -143,11 +143,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             return Ok(plan);
         }
 
-        let order_by_rex = order_by
-            .into_iter()
-            .map(|e| self.order_by_to_sort_expr(e, plan.schema(), 
planner_context))
-            .collect::<Result<Vec<_>>>()?;
-
+        let order_by_rex =
+            self.order_by_to_sort_expr(&order_by, plan.schema(), 
planner_context)?;
         LogicalPlanBuilder::from(plan).sort(order_by_rex)?.build()
     }
 }
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index 2acc2c7a8b..01bd740bf2 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -34,8 +34,9 @@ use datafusion_expr::{
     CreateMemoryTable, DdlStatement, Expr, Filter, GroupingSet, LogicalPlan,
     LogicalPlanBuilder, Partitioning,
 };
-use sqlparser::ast::{Distinct, Expr as SQLExpr, WildcardAdditionalOptions};
-use sqlparser::ast::{Select, SelectItem, TableWithJoins};
+
+use sqlparser::ast::{Distinct, Expr as SQLExpr, WildcardAdditionalOptions, 
WindowType};
+use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem, 
TableWithJoins};
 use std::collections::HashSet;
 use std::sync::Arc;
 
@@ -43,7 +44,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
     /// Generate a logic plan from an SQL select
     pub(super) fn select_to_plan(
         &self,
-        select: Select,
+        mut select: Select,
         planner_context: &mut PlannerContext,
     ) -> Result<LogicalPlan> {
         // check for unsupported syntax first
@@ -70,6 +71,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         // process `where` clause
         let plan = self.plan_selection(select.selection, plan, 
planner_context)?;
 
+        // handle named windows before processing the projection expression
+        check_conflicting_windows(&select.named_window)?;
+        match_window_definitions(&mut select.projection, 
&select.named_window)?;
+
         // process the SELECT expressions, with wildcards expanded.
         let select_exprs = self.prepare_select_exprs(
             &plan,
@@ -520,3 +525,49 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         Ok((plan, select_exprs_post_aggr, having_expr_post_aggr))
     }
 }
+
+// If there are any multiple-defined windows, we raise an error.
+fn check_conflicting_windows(window_defs: &[NamedWindowDefinition]) -> 
Result<()> {
+    for (i, window_def_i) in window_defs.iter().enumerate() {
+        for window_def_j in window_defs.iter().skip(i + 1) {
+            if window_def_i.0 == window_def_j.0 {
+                return Err(DataFusionError::Plan(format!(
+                    "The window {} is defined multiple times!",
+                    window_def_i.0
+                )));
+            }
+        }
+    }
+    Ok(())
+}
+
+// If the projection is done over a named window, that window
+// name must be defined. Otherwise, it gives an error.
+fn match_window_definitions(
+    projection: &mut [SelectItem],
+    named_windows: &[NamedWindowDefinition],
+) -> Result<()> {
+    for proj in projection.iter_mut() {
+        if let SelectItem::ExprWithAlias {
+            expr: SQLExpr::Function(f),
+            alias: _,
+        }
+        | SelectItem::UnnamedExpr(SQLExpr::Function(f)) = proj
+        {
+            for NamedWindowDefinition(window_ident, window_spec) in 
named_windows.iter() {
+                if let Some(WindowType::NamedWindow(ident)) = &f.over {
+                    if ident.eq(window_ident) {
+                        f.over = 
Some(WindowType::WindowSpec(window_spec.clone()))
+                    }
+                }
+            }
+            // All named windows must be defined with a WindowSpec.
+            if let Some(WindowType::NamedWindow(ident)) = &f.over {
+                return Err(DataFusionError::Plan(format!(
+                    "The window {ident} is not defined!"
+                )));
+            }
+        }
+    }
+    Ok(())
+}
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index e5ee3d43d9..dada4b147b 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -590,10 +590,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             ));
         }
         // Convert each OrderByExpr to a SortExpr:
-        let result = order_exprs
-            .into_iter()
-            .map(|e| self.order_by_to_sort_expr(e, schema, planner_context))
-            .collect::<Result<Vec<_>>>()?;
+        let result = self.order_by_to_sort_expr(&order_exprs, schema, 
planner_context)?;
         // Verify that columns of all SortExprs exist in the schema:
         for expr in result.iter() {
             for column in expr.to_columns()?.iter() {

Reply via email to