This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 439339a651 fix: Literal in `ORDER BY` window definition should not be 
an ordinal referring to relation column (#8419)
439339a651 is described below

commit 439339a6519f48b672615ce6acac8d48b8be4b8f
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Wed Dec 6 13:45:28 2023 -0800

    fix: Literal in `ORDER BY` window definition should not be an ordinal 
referring to relation column (#8419)
    
    * fix: RANGE frame can be regularized to ROWS frame only if empty ORDER BY 
clause
    
    * Fix flaky test
    
    * Update test comment
    
    * Add code comment
    
    * Update
    
    * fix: Literal in window definition should not refer to relation column
    
    * Remove unused import
    
    * Update datafusion/sql/src/expr/function.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Add code comment
    
    * Fix format
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/physical-expr/src/sort_expr.rs                    |  8 ++------
 .../physical-plan/src/windows/bounded_window_agg_exec.rs     |  4 ++--
 datafusion/sql/src/expr/function.rs                          | 11 ++++++++---
 datafusion/sql/src/expr/mod.rs                               |  7 ++++++-
 datafusion/sql/src/expr/order_by.rs                          |  9 +++++++--
 datafusion/sql/src/query.rs                                  |  2 +-
 datafusion/sql/src/statement.rs                              |  3 ++-
 datafusion/sqllogictest/test_files/window.slt                | 12 ++++++------
 8 files changed, 34 insertions(+), 22 deletions(-)

diff --git a/datafusion/physical-expr/src/sort_expr.rs 
b/datafusion/physical-expr/src/sort_expr.rs
index 664a6b65b7..914d76f926 100644
--- a/datafusion/physical-expr/src/sort_expr.rs
+++ b/datafusion/physical-expr/src/sort_expr.rs
@@ -26,7 +26,7 @@ use crate::PhysicalExpr;
 use arrow::compute::kernels::sort::{SortColumn, SortOptions};
 use arrow::record_batch::RecordBatch;
 use arrow_schema::Schema;
-use datafusion_common::{exec_err, DataFusionError, Result};
+use datafusion_common::Result;
 use datafusion_expr::ColumnarValue;
 
 /// Represents Sort operation for a column in a RecordBatch
@@ -65,11 +65,7 @@ impl PhysicalSortExpr {
         let value_to_sort = self.expr.evaluate(batch)?;
         let array_to_sort = match value_to_sort {
             ColumnarValue::Array(array) => array,
-            ColumnarValue::Scalar(scalar) => {
-                return exec_err!(
-                    "Sort operation is not applicable to scalar value {scalar}"
-                );
-            }
+            ColumnarValue::Scalar(scalar) => 
scalar.to_array_of_size(batch.num_rows())?,
         };
         Ok(SortColumn {
             values: array_to_sort,
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs 
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index 9e4d6c1370..f988b28cce 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -51,7 +51,7 @@ use datafusion_common::utils::{
     evaluate_partition_ranges, get_arrayref_at_indices, get_at_indices,
     get_record_batch_at_indices, get_row_at_idx,
 };
-use datafusion_common::{exec_err, plan_err, DataFusionError, Result};
+use datafusion_common::{exec_err, DataFusionError, Result};
 use datafusion_execution::TaskContext;
 use datafusion_expr::window_state::{PartitionBatchState, WindowAggState};
 use datafusion_expr::ColumnarValue;
@@ -585,7 +585,7 @@ impl LinearSearch {
             .map(|item| match item.evaluate(record_batch)? {
                 ColumnarValue::Array(array) => Ok(array),
                 ColumnarValue::Scalar(scalar) => {
-                    plan_err!("Sort operation is not applicable to scalar 
value {scalar}")
+                    scalar.to_array_of_size(record_batch.num_rows())
                 }
             })
             .collect()
diff --git a/datafusion/sql/src/expr/function.rs 
b/datafusion/sql/src/expr/function.rs
index 958e038798..14ea20c3fa 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -92,8 +92,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 .into_iter()
                 .map(|e| self.sql_expr_to_logical_expr(e, schema, 
planner_context))
                 .collect::<Result<Vec<_>>>()?;
-            let order_by =
-                self.order_by_to_sort_expr(&window.order_by, schema, 
planner_context)?;
+            let order_by = self.order_by_to_sort_expr(
+                &window.order_by,
+                schema,
+                planner_context,
+                // Numeric literals in window function ORDER BY are treated as 
constants
+                false,
+            )?;
             let window_frame = window
                 .window_frame
                 .as_ref()
@@ -143,7 +148,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             // next, aggregate built-ins
             if let Ok(fun) = AggregateFunction::from_str(&name) {
                 let order_by =
-                    self.order_by_to_sort_expr(&order_by, schema, 
planner_context)?;
+                    self.order_by_to_sort_expr(&order_by, schema, 
planner_context, true)?;
                 let order_by = (!order_by.is_empty()).then_some(order_by);
                 let args = self.function_args_to_expr(args, schema, 
planner_context)?;
                 let filter: Option<Box<Expr>> = filter
diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs
index 2f44466c79..27351e10eb 100644
--- a/datafusion/sql/src/expr/mod.rs
+++ b/datafusion/sql/src/expr/mod.rs
@@ -555,7 +555,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         } = array_agg;
 
         let order_by = if let Some(order_by) = order_by {
-            Some(self.order_by_to_sort_expr(&order_by, input_schema, 
planner_context)?)
+            Some(self.order_by_to_sort_expr(
+                &order_by,
+                input_schema,
+                planner_context,
+                true,
+            )?)
         } else {
             None
         };
diff --git a/datafusion/sql/src/expr/order_by.rs 
b/datafusion/sql/src/expr/order_by.rs
index 1dccc2376f..772255bd97 100644
--- a/datafusion/sql/src/expr/order_by.rs
+++ b/datafusion/sql/src/expr/order_by.rs
@@ -24,12 +24,17 @@ use datafusion_expr::Expr;
 use sqlparser::ast::{Expr as SQLExpr, OrderByExpr, Value};
 
 impl<'a, S: ContextProvider> SqlToRel<'a, S> {
-    /// convert sql [OrderByExpr] to `Vec<Expr>`
+    /// Convert sql [OrderByExpr] to `Vec<Expr>`.
+    ///
+    /// If `literal_to_column` is true, treat any numeric literals (e.g. `2`) 
as a 1 based index
+    /// into the SELECT list (e.g. `SELECT a, b FROM table ORDER BY 2`).
+    /// If false, interpret numeric literals as constant values.
     pub(crate) fn order_by_to_sort_expr(
         &self,
         exprs: &[OrderByExpr],
         schema: &DFSchema,
         planner_context: &mut PlannerContext,
+        literal_to_column: bool,
     ) -> Result<Vec<Expr>> {
         let mut expr_vec = vec![];
         for e in exprs {
@@ -40,7 +45,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             } = e;
 
             let expr = match expr {
-                SQLExpr::Value(Value::Number(v, _)) => {
+                SQLExpr::Value(Value::Number(v, _)) if literal_to_column => {
                     let field_index = v
                         .parse::<usize>()
                         .map_err(|err| plan_datafusion_err!("{}", err))?;
diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs
index 643f41d844..dd4cab1262 100644
--- a/datafusion/sql/src/query.rs
+++ b/datafusion/sql/src/query.rs
@@ -161,7 +161,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         }
 
         let order_by_rex =
-            self.order_by_to_sort_expr(&order_by, plan.schema(), 
planner_context)?;
+            self.order_by_to_sort_expr(&order_by, plan.schema(), 
planner_context, true)?;
 
         if let LogicalPlan::Distinct(Distinct::On(ref distinct_on)) = plan {
             // In case of `DISTINCT ON` we must capture the sort expressions 
since during the plan
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index a64010a7c3..4220e83316 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -710,7 +710,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         let mut all_results = vec![];
         for expr in order_exprs {
             // Convert each OrderByExpr to a SortExpr:
-            let expr_vec = self.order_by_to_sort_expr(&expr, schema, 
planner_context)?;
+            let expr_vec =
+                self.order_by_to_sort_expr(&expr, schema, planner_context, 
true)?;
             // Verify that columns of all SortExprs exist in the schema:
             for expr in expr_vec.iter() {
                 for column in expr.to_columns()?.iter() {
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index c0dcd4ae1e..0179431ac8 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3778,10 +3778,10 @@ query error DataFusion error: Arrow error: Invalid 
argument error: must either s
 select rank() over (RANGE between UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) 
rnk
        from (select 1 a union select 2 a) q;
 
-# TODO: this is different to Postgres which returns [1, 1] for `rnk`.
-query I
-select rank() over (order by 1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING) rnk
-       from (select 1 a union select 2 a) q ORDER BY rnk
+query II
+select a,
+       rank() over (order by 1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING) rnk
+       from (select 1 a union select 2 a) q ORDER BY a
 ----
-1
-2
+1 1
+2 1

Reply via email to