This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 205e315ed3 fix: RANGE frame for corner cases with empty ORDER BY
clause should be treated as constant sort (#8445)
205e315ed3 is described below
commit 205e315ed3eafbb016ffc5ac62a3be07734a8885
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Fri Dec 8 01:37:03 2023 -0800
fix: RANGE frame for corner cases with empty ORDER BY clause should be
treated as constant sort (#8445)
* fix: RANGE frame for corner cases with empty ORDER BY clause should be
treated as constant sort
* fix
* Make the test not flaky
* fix clippy
---
datafusion/expr/src/window_frame.rs | 56 ++++++++++++++++---------
datafusion/proto/src/logical_plan/from_proto.rs | 8 ++--
datafusion/sql/src/expr/function.rs | 10 +++--
datafusion/sqllogictest/test_files/window.slt | 16 ++++---
4 files changed, 56 insertions(+), 34 deletions(-)
diff --git a/datafusion/expr/src/window_frame.rs
b/datafusion/expr/src/window_frame.rs
index 2a64f21b85..2701ca1ecf 100644
--- a/datafusion/expr/src/window_frame.rs
+++ b/datafusion/expr/src/window_frame.rs
@@ -23,6 +23,8 @@
//! - An ending frame boundary,
//! - An EXCLUDE clause.
+use crate::expr::Sort;
+use crate::Expr;
use datafusion_common::{plan_err, sql_err, DataFusionError, Result,
ScalarValue};
use sqlparser::ast;
use sqlparser::parser::ParserError::ParserError;
@@ -142,41 +144,57 @@ impl WindowFrame {
}
}
-/// Construct equivalent explicit window frames for implicit corner cases.
-/// With this processing, we may assume in downstream code that RANGE/GROUPS
-/// frames contain an appropriate ORDER BY clause.
-pub fn regularize(mut frame: WindowFrame, order_bys: usize) ->
Result<WindowFrame> {
- if frame.units == WindowFrameUnits::Range && order_bys != 1 {
+/// Regularizes ORDER BY clause for window definition for implicit corner
cases.
+pub fn regularize_window_order_by(
+ frame: &WindowFrame,
+ order_by: &mut Vec<Expr>,
+) -> Result<()> {
+ if frame.units == WindowFrameUnits::Range && order_by.len() != 1 {
// Normally, RANGE frames require an ORDER BY clause with exactly one
// column. However, an ORDER BY clause may be absent or present but
with
// more than one column in two edge cases:
// 1. start bound is UNBOUNDED or CURRENT ROW
// 2. end bound is CURRENT ROW or UNBOUNDED.
- // In these cases, we regularize the RANGE frame to be equivalent to a
ROWS
- // frame with the UNBOUNDED bounds.
- // Note that this follows Postgres behavior.
+ // In these cases, we regularize the ORDER BY clause if the ORDER BY
clause
+ // is absent. If an ORDER BY clause is present but has more than one
column,
+ // the ORDER BY clause is unchanged. Note that this follows Postgres
behavior.
if (frame.start_bound.is_unbounded()
|| frame.start_bound == WindowFrameBound::CurrentRow)
&& (frame.end_bound == WindowFrameBound::CurrentRow
|| frame.end_bound.is_unbounded())
{
- // If an ORDER BY clause is absent, the frame is equivalent to a
ROWS
- // frame with the UNBOUNDED bounds.
- // If an ORDER BY clause is present but has more than one column,
the
- // frame is unchanged.
- if order_bys == 0 {
- frame.units = WindowFrameUnits::Rows;
- frame.start_bound =
- WindowFrameBound::Preceding(ScalarValue::UInt64(None));
- frame.end_bound =
WindowFrameBound::Following(ScalarValue::UInt64(None));
+ // If an ORDER BY clause is absent, it is equivalent to a ORDER BY
clause
+ // with constant value as sort key.
+ // If an ORDER BY clause is present but has more than one column,
it is
+ // unchanged.
+ if order_by.is_empty() {
+ order_by.push(Expr::Sort(Sort::new(
+ Box::new(Expr::Literal(ScalarValue::UInt64(Some(1)))),
+ true,
+ false,
+ )));
}
- } else {
+ }
+ }
+ Ok(())
+}
+
+/// Checks if given window frame is valid. In particular, if the frame is RANGE
+/// with offset PRECEDING/FOLLOWING, it must have exactly one ORDER BY column.
+pub fn check_window_frame(frame: &WindowFrame, order_bys: usize) -> Result<()>
{
+ if frame.units == WindowFrameUnits::Range && order_bys != 1 {
+ // See `regularize_window_order_by`.
+ if !(frame.start_bound.is_unbounded()
+ || frame.start_bound == WindowFrameBound::CurrentRow)
+ || !(frame.end_bound == WindowFrameBound::CurrentRow
+ || frame.end_bound.is_unbounded())
+ {
plan_err!("RANGE requires exactly one ORDER BY column")?
}
} else if frame.units == WindowFrameUnits::Groups && order_bys == 0 {
plan_err!("GROUPS requires an ORDER BY clause")?
};
- Ok(frame)
+ Ok(())
}
/// There are five ways to describe starting and ending frame boundaries:
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs
b/datafusion/proto/src/logical_plan/from_proto.rs
index 13576aaa08..22a3ed804a 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -39,6 +39,7 @@ use datafusion_common::{
internal_err, plan_datafusion_err, Column, Constraint, Constraints,
DFField,
DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, Result,
ScalarValue,
};
+use datafusion_expr::window_frame::{check_window_frame,
regularize_window_order_by};
use datafusion_expr::{
abs, acos, acosh, array, array_append, array_concat, array_dims,
array_element,
array_except, array_has, array_has_all, array_has_any, array_intersect,
array_length,
@@ -59,7 +60,6 @@ use datafusion_expr::{
sqrt, starts_with, string_to_array, strpos, struct_fun, substr,
substr_index,
substring, tan, tanh, to_hex, to_timestamp_micros, to_timestamp_millis,
to_timestamp_nanos, to_timestamp_seconds, translate, trim, trunc, upper,
uuid,
- window_frame::regularize,
AggregateFunction, Between, BinaryExpr, BuiltInWindowFunction,
BuiltinScalarFunction,
Case, Cast, Expr, GetFieldAccess, GetIndexedField, GroupingSet,
GroupingSet::GroupingSets,
@@ -1072,7 +1072,7 @@ pub fn parse_expr(
.iter()
.map(|e| parse_expr(e, registry))
.collect::<Result<Vec<_>, _>>()?;
- let order_by = expr
+ let mut order_by = expr
.order_by
.iter()
.map(|e| parse_expr(e, registry))
@@ -1082,7 +1082,8 @@ pub fn parse_expr(
.as_ref()
.map::<Result<WindowFrame, _>, _>(|window_frame| {
let window_frame = window_frame.clone().try_into()?;
- regularize(window_frame, order_by.len())
+ check_window_frame(&window_frame, order_by.len())
+ .map(|_| window_frame)
})
.transpose()?
.ok_or_else(|| {
@@ -1090,6 +1091,7 @@ pub fn parse_expr(
"missing window frame during
deserialization".to_string(),
)
})?;
+ regularize_window_order_by(&window_frame, &mut order_by)?;
match window_function {
window_expr_node::WindowFunction::AggrFunction(i) => {
diff --git a/datafusion/sql/src/expr/function.rs
b/datafusion/sql/src/expr/function.rs
index 14ea20c3fa..73de4fa439 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -21,7 +21,7 @@ use datafusion_common::{
};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::function::suggest_valid_function;
-use datafusion_expr::window_frame::regularize;
+use datafusion_expr::window_frame::{check_window_frame,
regularize_window_order_by};
use datafusion_expr::{
expr, window_function, AggregateFunction, BuiltinScalarFunction, Expr,
WindowFrame,
WindowFunction,
@@ -92,7 +92,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.into_iter()
.map(|e| self.sql_expr_to_logical_expr(e, schema,
planner_context))
.collect::<Result<Vec<_>>>()?;
- let order_by = self.order_by_to_sort_expr(
+ let mut order_by = self.order_by_to_sort_expr(
&window.order_by,
schema,
planner_context,
@@ -104,14 +104,18 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.as_ref()
.map(|window_frame| {
let window_frame = window_frame.clone().try_into()?;
- regularize(window_frame, order_by.len())
+ check_window_frame(&window_frame, order_by.len())
+ .map(|_| window_frame)
})
.transpose()?;
+
let window_frame = if let Some(window_frame) = window_frame {
+ regularize_window_order_by(&window_frame, &mut order_by)?;
window_frame
} else {
WindowFrame::new(!order_by.is_empty())
};
+
if let Ok(fun) = self.find_window_func(&name) {
let expr = match fun {
WindowFunction::AggregateFunction(aggregate_fun) => {
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index 5b69ead0ff..b660a9a0c2 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3763,15 +3763,13 @@ select a,
1 1
2 2
-# TODO: this is different to Postgres which returns [1, 1] for `rnk`.
-# Comment it because it is flaky now as it depends on the order of the `a`
column.
-# query II
-# select a,
-# rank() over (RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING) rnk
-# from (select 1 a union select 2 a) q ORDER BY rnk
-# ----
-# 1 1
-# 2 2
+query II
+select a,
+ rank() over (RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
rnk
+ from (select 1 a union select 2 a) q ORDER BY a
+----
+1 1
+2 1
# TODO: this works in Postgres which returns [1, 1].
query error DataFusion error: Arrow error: Invalid argument error: must either
specify a row count or at least one column