This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 439339a651 fix: Literal in `ORDER BY` window definition should not be
an ordinal referring to relation column (#8419)
439339a651 is described below
commit 439339a6519f48b672615ce6acac8d48b8be4b8f
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Wed Dec 6 13:45:28 2023 -0800
fix: Literal in `ORDER BY` window definition should not be an ordinal
referring to relation column (#8419)
* fix: RANGE frame can be regularized to ROWS frame only if empty ORDER BY
clause
* Fix flaky test
* Update test comment
* Add code comment
* Update
* fix: Literal in window definition should not refer to relation column
* Remove unused import
* Update datafusion/sql/src/expr/function.rs
Co-authored-by: Andrew Lamb <[email protected]>
* Add code comment
* Fix format
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/physical-expr/src/sort_expr.rs | 8 ++------
.../physical-plan/src/windows/bounded_window_agg_exec.rs | 4 ++--
datafusion/sql/src/expr/function.rs | 11 ++++++++---
datafusion/sql/src/expr/mod.rs | 7 ++++++-
datafusion/sql/src/expr/order_by.rs | 9 +++++++--
datafusion/sql/src/query.rs | 2 +-
datafusion/sql/src/statement.rs | 3 ++-
datafusion/sqllogictest/test_files/window.slt | 12 ++++++------
8 files changed, 34 insertions(+), 22 deletions(-)
diff --git a/datafusion/physical-expr/src/sort_expr.rs
b/datafusion/physical-expr/src/sort_expr.rs
index 664a6b65b7..914d76f926 100644
--- a/datafusion/physical-expr/src/sort_expr.rs
+++ b/datafusion/physical-expr/src/sort_expr.rs
@@ -26,7 +26,7 @@ use crate::PhysicalExpr;
use arrow::compute::kernels::sort::{SortColumn, SortOptions};
use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
-use datafusion_common::{exec_err, DataFusionError, Result};
+use datafusion_common::Result;
use datafusion_expr::ColumnarValue;
/// Represents Sort operation for a column in a RecordBatch
@@ -65,11 +65,7 @@ impl PhysicalSortExpr {
let value_to_sort = self.expr.evaluate(batch)?;
let array_to_sort = match value_to_sort {
ColumnarValue::Array(array) => array,
- ColumnarValue::Scalar(scalar) => {
- return exec_err!(
- "Sort operation is not applicable to scalar value {scalar}"
- );
- }
+ ColumnarValue::Scalar(scalar) =>
scalar.to_array_of_size(batch.num_rows())?,
};
Ok(SortColumn {
values: array_to_sort,
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index 9e4d6c1370..f988b28cce 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -51,7 +51,7 @@ use datafusion_common::utils::{
evaluate_partition_ranges, get_arrayref_at_indices, get_at_indices,
get_record_batch_at_indices, get_row_at_idx,
};
-use datafusion_common::{exec_err, plan_err, DataFusionError, Result};
+use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
use datafusion_expr::window_state::{PartitionBatchState, WindowAggState};
use datafusion_expr::ColumnarValue;
@@ -585,7 +585,7 @@ impl LinearSearch {
.map(|item| match item.evaluate(record_batch)? {
ColumnarValue::Array(array) => Ok(array),
ColumnarValue::Scalar(scalar) => {
- plan_err!("Sort operation is not applicable to scalar
value {scalar}")
+ scalar.to_array_of_size(record_batch.num_rows())
}
})
.collect()
diff --git a/datafusion/sql/src/expr/function.rs
b/datafusion/sql/src/expr/function.rs
index 958e038798..14ea20c3fa 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -92,8 +92,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.into_iter()
.map(|e| self.sql_expr_to_logical_expr(e, schema,
planner_context))
.collect::<Result<Vec<_>>>()?;
- let order_by =
- self.order_by_to_sort_expr(&window.order_by, schema,
planner_context)?;
+ let order_by = self.order_by_to_sort_expr(
+ &window.order_by,
+ schema,
+ planner_context,
+ // Numeric literals in window function ORDER BY are treated as
constants
+ false,
+ )?;
let window_frame = window
.window_frame
.as_ref()
@@ -143,7 +148,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// next, aggregate built-ins
if let Ok(fun) = AggregateFunction::from_str(&name) {
let order_by =
- self.order_by_to_sort_expr(&order_by, schema,
planner_context)?;
+ self.order_by_to_sort_expr(&order_by, schema,
planner_context, true)?;
let order_by = (!order_by.is_empty()).then_some(order_by);
let args = self.function_args_to_expr(args, schema,
planner_context)?;
let filter: Option<Box<Expr>> = filter
diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs
index 2f44466c79..27351e10eb 100644
--- a/datafusion/sql/src/expr/mod.rs
+++ b/datafusion/sql/src/expr/mod.rs
@@ -555,7 +555,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
} = array_agg;
let order_by = if let Some(order_by) = order_by {
- Some(self.order_by_to_sort_expr(&order_by, input_schema,
planner_context)?)
+ Some(self.order_by_to_sort_expr(
+ &order_by,
+ input_schema,
+ planner_context,
+ true,
+ )?)
} else {
None
};
diff --git a/datafusion/sql/src/expr/order_by.rs
b/datafusion/sql/src/expr/order_by.rs
index 1dccc2376f..772255bd97 100644
--- a/datafusion/sql/src/expr/order_by.rs
+++ b/datafusion/sql/src/expr/order_by.rs
@@ -24,12 +24,17 @@ use datafusion_expr::Expr;
use sqlparser::ast::{Expr as SQLExpr, OrderByExpr, Value};
impl<'a, S: ContextProvider> SqlToRel<'a, S> {
- /// convert sql [OrderByExpr] to `Vec<Expr>`
+ /// Convert sql [OrderByExpr] to `Vec<Expr>`.
+ ///
+ /// If `literal_to_column` is true, treat any numeric literals (e.g. `2`)
as a 1 based index
+ /// into the SELECT list (e.g. `SELECT a, b FROM table ORDER BY 2`).
+ /// If false, interpret numeric literals as constant values.
pub(crate) fn order_by_to_sort_expr(
&self,
exprs: &[OrderByExpr],
schema: &DFSchema,
planner_context: &mut PlannerContext,
+ literal_to_column: bool,
) -> Result<Vec<Expr>> {
let mut expr_vec = vec![];
for e in exprs {
@@ -40,7 +45,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
} = e;
let expr = match expr {
- SQLExpr::Value(Value::Number(v, _)) => {
+ SQLExpr::Value(Value::Number(v, _)) if literal_to_column => {
let field_index = v
.parse::<usize>()
.map_err(|err| plan_datafusion_err!("{}", err))?;
diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs
index 643f41d844..dd4cab1262 100644
--- a/datafusion/sql/src/query.rs
+++ b/datafusion/sql/src/query.rs
@@ -161,7 +161,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
let order_by_rex =
- self.order_by_to_sort_expr(&order_by, plan.schema(),
planner_context)?;
+ self.order_by_to_sort_expr(&order_by, plan.schema(),
planner_context, true)?;
if let LogicalPlan::Distinct(Distinct::On(ref distinct_on)) = plan {
// In case of `DISTINCT ON` we must capture the sort expressions
since during the plan
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index a64010a7c3..4220e83316 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -710,7 +710,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let mut all_results = vec![];
for expr in order_exprs {
// Convert each OrderByExpr to a SortExpr:
- let expr_vec = self.order_by_to_sort_expr(&expr, schema,
planner_context)?;
+ let expr_vec =
+ self.order_by_to_sort_expr(&expr, schema, planner_context,
true)?;
// Verify that columns of all SortExprs exist in the schema:
for expr in expr_vec.iter() {
for column in expr.to_columns()?.iter() {
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index c0dcd4ae1e..0179431ac8 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3778,10 +3778,10 @@ query error DataFusion error: Arrow error: Invalid
argument error: must either s
select rank() over (RANGE between UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
rnk
from (select 1 a union select 2 a) q;
-# TODO: this is different to Postgres which returns [1, 1] for `rnk`.
-query I
-select rank() over (order by 1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING) rnk
- from (select 1 a union select 2 a) q ORDER BY rnk
+query II
+select a,
+ rank() over (order by 1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING) rnk
+ from (select 1 a union select 2 a) q ORDER BY a
----
-1
-2
+1 1
+2 1