This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a4233adf84 Named window support (#6419)
a4233adf84 is described below
commit a4233adf8438376363fae6bacb75131400c11ab4
Author: Berkay Şahin <[email protected]>
AuthorDate: Thu May 25 11:39:08 2023 +0300
Named window support (#6419)
* named window statements can be planned
* Remove unnecessary clone
* Semantic check is added, avro error fix PR is waiting
* SQLParser updates are handled better, a test is added.
* Fix document error
* Minor changes
* UnnamedExpr bug fix
* Refactor named window handling in select_to_plan()
* Remove unnecessary clone
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
Co-authored-by: Mustafa Akur <[email protected]>
---
.../tests/sqllogictests/test_files/groupby.slt | 35 +++++++
.../core/tests/sqllogictests/test_files/window.slt | 113 +++++++++++++++++++++
datafusion/sql/src/expr/function.rs | 21 +---
datafusion/sql/src/expr/mod.rs | 26 +----
datafusion/sql/src/expr/order_by.rs | 68 +++++++------
datafusion/sql/src/query.rs | 7 +-
datafusion/sql/src/select.rs | 57 ++++++++++-
datafusion/sql/src/statement.rs | 5 +-
8 files changed, 246 insertions(+), 86 deletions(-)
diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index 79520ff734..b81731e64c 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -2214,3 +2214,38 @@ SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.amount
DESC) AS amounts,
FRA [200.0, 50.0] 250
GRC [80.0, 30.0] 110
TUR [100.0, 75.0] 175
+
+# test_ordering_sensitive_aggregation7
+# Lexicographical ordering requirement can be given as
+# argument to the aggregate functions
+query TT
+EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.country DESC, s.amount
DESC) AS amounts,
+ SUM(s.amount) AS sum1
+ FROM (SELECT *
+ FROM sales_global
+ ORDER BY country) AS s
+ GROUP BY s.country
+----
+logical_plan
+Projection: s.country, ARRAYAGG(s.amount) ORDER BY [s.country DESC NULLS
FIRST, s.amount DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
+--Aggregate: groupBy=[[s.country]], aggr=[[ARRAYAGG(s.amount) ORDER BY
[s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST], SUM(s.amount)]]
+----SubqueryAlias: s
+------Sort: sales_global.country ASC NULLS LAST
+--------TableScan: sales_global projection=[country, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, ARRAYAGG(s.amount) ORDER BY
[s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST]@1 as amounts,
SUM(s.amount)@2 as sum1]
+--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAYAGG(s.amount), SUM(s.amount)], ordering_mode=FullyOrdered
+----SortExec: expr=[country@0 ASC NULLS LAST,amount@1 DESC]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query T?R
+SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.country DESC, s.amount DESC)
AS amounts,
+ SUM(s.amount) AS sum1
+ FROM (SELECT *
+ FROM sales_global
+ ORDER BY country) AS s
+ GROUP BY s.country
+----
+FRA [200.0, 50.0] 250
+GRC [80.0, 30.0] 110
+TUR [100.0, 75.0] 175
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt
b/datafusion/core/tests/sqllogictests/test_files/window.slt
index b15bc90fb1..32f45dbb57 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -3023,3 +3023,116 @@ drop table annotated_data_finite2
statement ok
drop table annotated_data_infinite2
+
+# window3 spec is not used in window functions.
+# The query should still work.
+query RR
+SELECT
+ MAX(c12) OVER window1,
+ MIN(c12) OVER window2 as max1
+ FROM aggregate_test_100
+ WINDOW window1 AS (ORDER BY C12),
+ window2 AS (PARTITION BY C11),
+ window3 AS (ORDER BY C1)
+ ORDER BY C3
+ LIMIT 5
+----
+0.970671228336 0.970671228336
+0.850672105305 0.850672105305
+0.152498292972 0.152498292972
+0.369363046006 0.369363046006
+0.56535284223 0.56535284223
+
+query TT
+EXPLAIN SELECT
+ MAX(c12) OVER window1 as min1,
+ MIN(c12) OVER window2 as max1
+ FROM aggregate_test_100
+ WINDOW window1 AS (ORDER BY C12),
+ window2 AS (PARTITION BY C11),
+ window3 AS (ORDER BY C1)
+ ORDER BY C3
+ LIMIT 5
+----
+logical_plan
+Projection: min1, max1
+--Limit: skip=0, fetch=5
+----Sort: aggregate_test_100.c3 ASC NULLS LAST, fetch=5
+------Projection: MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS min1,
MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS max1, aggregate_test_100.c3
+--------WindowAggr: windowExpr=[[MAX(aggregate_test_100.c12) ORDER BY
[aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]]
+----------Projection: aggregate_test_100.c3, aggregate_test_100.c12,
MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
+------------WindowAggr: windowExpr=[[MIN(aggregate_test_100.c12) PARTITION BY
[aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING]]
+--------------TableScan: aggregate_test_100 projection=[c3, c11, c12]
+physical_plan
+ProjectionExec: expr=[min1@0 as min1, max1@1 as max1]
+--GlobalLimitExec: skip=0, fetch=5
+----SortExec: fetch=5, expr=[c3@2 ASC NULLS LAST]
+------ProjectionExec: expr=[MAX(aggregate_test_100.c12) ORDER BY
[aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@3 as min1, MIN(aggregate_test_100.c12) PARTITION BY
[aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING@2 as max1, c3@0 as c3]
+--------BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c12): Ok(Field {
name: "MAX(aggregate_test_100.c12)", data_type: Float64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(Float64(NULL)), end_bound: CurrentRow }],
mode=[Sorted]
+----------SortExec: expr=[c12@1 ASC NULLS LAST]
+------------ProjectionExec: expr=[c3@0 as c3, c12@2 as c12,
MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@3 as MIN(aggregate_test_100.c12)]
+--------------WindowAggExec: wdw=[MIN(aggregate_test_100.c12): Ok(Field {
name: "MIN(aggregate_test_100.c12)", data_type: Float64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound:
Following(UInt64(NULL)) }]
+----------------SortExec: expr=[c11@1 ASC NULLS LAST]
+------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3,
c11, c12], has_header=true
+
+# window1 spec is used multiple times under different aggregations.
+# The query should still work.
+query RR
+SELECT
+ MAX(c12) OVER window1 as min1,
+ MIN(c12) OVER window1 as max1
+ FROM aggregate_test_100
+ WINDOW window1 AS (ORDER BY C12)
+ ORDER BY C3
+ LIMIT 5
+----
+0.970671228336 0.014793053078
+0.850672105305 0.014793053078
+0.152498292972 0.014793053078
+0.369363046006 0.014793053078
+0.56535284223 0.014793053078
+
+query TT
+EXPLAIN SELECT
+ MAX(c12) OVER window1 as min1,
+ MIN(c12) OVER window1 as max1
+ FROM aggregate_test_100
+ WINDOW window1 AS (ORDER BY C12)
+ ORDER BY C3
+ LIMIT 5
+----
+logical_plan
+Projection: min1, max1
+--Limit: skip=0, fetch=5
+----Sort: aggregate_test_100.c3 ASC NULLS LAST, fetch=5
+------Projection: MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS min1,
MIN(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS max1, aggregate_test_100.c3
+--------WindowAggr: windowExpr=[[MAX(aggregate_test_100.c12) ORDER BY
[aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW, MIN(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----------TableScan: aggregate_test_100 projection=[c3, c12]
+physical_plan
+ProjectionExec: expr=[min1@0 as min1, max1@1 as max1]
+--GlobalLimitExec: skip=0, fetch=5
+----SortExec: fetch=5, expr=[c3@2 ASC NULLS LAST]
+------ProjectionExec: expr=[MAX(aggregate_test_100.c12) ORDER BY
[aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@2 as min1, MIN(aggregate_test_100.c12) ORDER BY
[aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@3 as max1, c3@0 as c3]
+--------BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c12): Ok(Field {
name: "MAX(aggregate_test_100.c12)", data_type: Float64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(Float64(NULL)), end_bound: CurrentRow },
MIN(aggregate_test_100.c12): Ok(Field { name: "MIN(aggregate_test_100.c12)",
data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units [...]
+----------SortExec: expr=[c12@1 ASC NULLS LAST]
+------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3,
c12], has_header=true
+
+# window2 spec is not defined
+statement error DataFusion error: Error during planning: The window window2 is
not defined!
+SELECT
+ MAX(c12) OVER window1 as min1,
+ MIN(c12) OVER window2 as max1
+ FROM aggregate_test_100
+ WINDOW window1 AS (ORDER BY C12)
+ ORDER BY C3
+ LIMIT 5
+
+# window1 spec is defined multiple times
+statement error DataFusion error: Error during planning: The window window1 is
defined multiple times!
+SELECT
+ MAX(c12) OVER window1 as min1,
+ FROM aggregate_test_100
+ WINDOW window1 AS (ORDER BY C12),
+ window1 AS (ORDER BY C3)
+ ORDER BY C3
+ LIMIT 5
\ No newline at end of file
diff --git a/datafusion/sql/src/expr/function.rs
b/datafusion/sql/src/expr/function.rs
index bf69c381cd..0c5b460ead 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -54,29 +54,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
};
// then, window function
- let window = function
- .over
- .take()
- // TODO support named windows
- .map(|window| match window {
- WindowType::WindowSpec(window_spec) => Ok(window_spec),
- WindowType::NamedWindow(name) =>
Err(DataFusionError::NotImplemented(
- format!("Named windows ({name}) are not supported"),
- )),
- })
- .transpose()?;
-
- if let Some(window) = window {
+ if let Some(WindowType::WindowSpec(window)) = function.over.take() {
let partition_by = window
.partition_by
.into_iter()
.map(|e| self.sql_expr_to_logical_expr(e, schema,
planner_context))
.collect::<Result<Vec<_>>>()?;
- let order_by = window
- .order_by
- .into_iter()
- .map(|e| self.order_by_to_sort_expr(e, schema,
planner_context))
- .collect::<Result<Vec<_>>>()?;
+ let order_by =
+ self.order_by_to_sort_expr(&window.order_by, schema,
planner_context)?;
let window_frame = window
.window_frame
.as_ref()
diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs
index 05136a4a06..66422aa43f 100644
--- a/datafusion/sql/src/expr/mod.rs
+++ b/datafusion/sql/src/expr/mod.rs
@@ -167,7 +167,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
leading_precision,
last_field,
fractional_seconds_precision,
- }) => self.sql_interval_to_expr(
+ })=> self.sql_interval_to_expr(
*value,
schema,
planner_context,
@@ -350,28 +350,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
within_group,
} = array_agg;
- let order_by = if let Some(mut order_by) = order_by {
- // TODO: Once sqlparser supports multiple order by clause, handle
it
- // see issue:
https://github.com/sqlparser-rs/sqlparser-rs/issues/875
- let order_by = match order_by.len() {
- 0 => {
- return Err(DataFusionError::NotImplemented(
- "ARRAY_AGG with empty ORDER BY not
supported".to_string(),
- ))
- }
- 1 => order_by.pop().unwrap(),
- n => {
- return Err(DataFusionError::NotImplemented(format!(
- "ARRAY_AGG only supports a single ORDER BY expression.
Got {n}"
- )))
- }
- };
-
- Some(vec![self.order_by_to_sort_expr(
- order_by,
- input_schema,
- planner_context,
- )?])
+ let order_by = if let Some(order_by) = order_by {
+ Some(self.order_by_to_sort_expr(&order_by, input_schema,
planner_context)?)
} else {
None
};
diff --git a/datafusion/sql/src/expr/order_by.rs
b/datafusion/sql/src/expr/order_by.rs
index d8c0e34f34..b32388f1bc 100644
--- a/datafusion/sql/src/expr/order_by.rs
+++ b/datafusion/sql/src/expr/order_by.rs
@@ -22,51 +22,53 @@ use datafusion_expr::Expr;
use sqlparser::ast::{Expr as SQLExpr, OrderByExpr, Value};
impl<'a, S: ContextProvider> SqlToRel<'a, S> {
- /// convert sql OrderByExpr to Expr::Sort
+ /// convert sql [OrderByExpr] to `Vec<Expr>`
pub(crate) fn order_by_to_sort_expr(
&self,
- e: OrderByExpr,
+ exprs: &[OrderByExpr],
schema: &DFSchema,
planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- let OrderByExpr {
- asc,
- expr,
- nulls_first,
- } = e;
+ ) -> Result<Vec<Expr>> {
+ let mut expr_vec = vec![];
+ for e in exprs {
+ let OrderByExpr {
+ asc,
+ expr,
+ nulls_first,
+ } = e;
- let expr = match expr {
- SQLExpr::Value(Value::Number(v, _)) => {
- let field_index = v
- .parse::<usize>()
- .map_err(|err| DataFusionError::Plan(err.to_string()))?;
+ let expr = match expr {
+ SQLExpr::Value(Value::Number(v, _)) => {
+ let field_index = v
+ .parse::<usize>()
+ .map_err(|err|
DataFusionError::Plan(err.to_string()))?;
- if field_index == 0 {
- return Err(DataFusionError::Plan(
- "Order by index starts at 1 for column
indexes".to_string(),
- ));
- } else if schema.fields().len() < field_index {
- return Err(DataFusionError::Plan(format!(
- "Order by column out of bounds, specified: {}, max:
{}",
- field_index,
- schema.fields().len()
- )));
- }
+ if field_index == 0 {
+ return Err(DataFusionError::Plan(
+ "Order by index starts at 1 for column
indexes".to_string(),
+ ));
+ } else if schema.fields().len() < field_index {
+ return Err(DataFusionError::Plan(format!(
+ "Order by column out of bounds, specified: {},
max: {}",
+ field_index,
+ schema.fields().len()
+ )));
+ }
- let field = schema.field(field_index - 1);
- Expr::Column(field.qualified_column())
- }
- e => self.sql_expr_to_logical_expr(e, schema, planner_context)?,
- };
- Ok({
+ let field = schema.field(field_index - 1);
+ Expr::Column(field.qualified_column())
+ }
+ e => self.sql_expr_to_logical_expr(e.clone(), schema,
planner_context)?,
+ };
let asc = asc.unwrap_or(true);
- Expr::Sort(Sort::new(
+ expr_vec.push(Expr::Sort(Sort::new(
Box::new(expr),
asc,
// when asc is true, by default nulls last to be consistent
with postgres
// postgres rule:
https://www.postgresql.org/docs/current/queries-order.html
nulls_first.unwrap_or(!asc),
- ))
- })
+ )))
+ }
+ Ok(expr_vec)
}
}
diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs
index ea1f72e76c..27235a050d 100644
--- a/datafusion/sql/src/query.rs
+++ b/datafusion/sql/src/query.rs
@@ -143,11 +143,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
return Ok(plan);
}
- let order_by_rex = order_by
- .into_iter()
- .map(|e| self.order_by_to_sort_expr(e, plan.schema(),
planner_context))
- .collect::<Result<Vec<_>>>()?;
-
+ let order_by_rex =
+ self.order_by_to_sort_expr(&order_by, plan.schema(),
planner_context)?;
LogicalPlanBuilder::from(plan).sort(order_by_rex)?.build()
}
}
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index 2acc2c7a8b..01bd740bf2 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -34,8 +34,9 @@ use datafusion_expr::{
CreateMemoryTable, DdlStatement, Expr, Filter, GroupingSet, LogicalPlan,
LogicalPlanBuilder, Partitioning,
};
-use sqlparser::ast::{Distinct, Expr as SQLExpr, WildcardAdditionalOptions};
-use sqlparser::ast::{Select, SelectItem, TableWithJoins};
+
+use sqlparser::ast::{Distinct, Expr as SQLExpr, WildcardAdditionalOptions,
WindowType};
+use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem,
TableWithJoins};
use std::collections::HashSet;
use std::sync::Arc;
@@ -43,7 +44,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
/// Generate a logic plan from an SQL select
pub(super) fn select_to_plan(
&self,
- select: Select,
+ mut select: Select,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
// check for unsupported syntax first
@@ -70,6 +71,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// process `where` clause
let plan = self.plan_selection(select.selection, plan,
planner_context)?;
+ // handle named windows before processing the projection expression
+ check_conflicting_windows(&select.named_window)?;
+ match_window_definitions(&mut select.projection,
&select.named_window)?;
+
// process the SELECT expressions, with wildcards expanded.
let select_exprs = self.prepare_select_exprs(
&plan,
@@ -520,3 +525,49 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Ok((plan, select_exprs_post_aggr, having_expr_post_aggr))
}
}
+
+// If there are any multiple-defined windows, we raise an error.
+fn check_conflicting_windows(window_defs: &[NamedWindowDefinition]) ->
Result<()> {
+ for (i, window_def_i) in window_defs.iter().enumerate() {
+ for window_def_j in window_defs.iter().skip(i + 1) {
+ if window_def_i.0 == window_def_j.0 {
+ return Err(DataFusionError::Plan(format!(
+ "The window {} is defined multiple times!",
+ window_def_i.0
+ )));
+ }
+ }
+ }
+ Ok(())
+}
+
+// If the projection is done over a named window, that window
+// name must be defined. Otherwise, it gives an error.
+fn match_window_definitions(
+ projection: &mut [SelectItem],
+ named_windows: &[NamedWindowDefinition],
+) -> Result<()> {
+ for proj in projection.iter_mut() {
+ if let SelectItem::ExprWithAlias {
+ expr: SQLExpr::Function(f),
+ alias: _,
+ }
+ | SelectItem::UnnamedExpr(SQLExpr::Function(f)) = proj
+ {
+ for NamedWindowDefinition(window_ident, window_spec) in
named_windows.iter() {
+ if let Some(WindowType::NamedWindow(ident)) = &f.over {
+ if ident.eq(window_ident) {
+ f.over =
Some(WindowType::WindowSpec(window_spec.clone()))
+ }
+ }
+ }
+ // All named windows must be defined with a WindowSpec.
+ if let Some(WindowType::NamedWindow(ident)) = &f.over {
+ return Err(DataFusionError::Plan(format!(
+ "The window {ident} is not defined!"
+ )));
+ }
+ }
+ }
+ Ok(())
+}
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index e5ee3d43d9..dada4b147b 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -590,10 +590,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
));
}
// Convert each OrderByExpr to a SortExpr:
- let result = order_exprs
- .into_iter()
- .map(|e| self.order_by_to_sort_expr(e, schema, planner_context))
- .collect::<Result<Vec<_>>>()?;
+ let result = self.order_by_to_sort_expr(&order_exprs, schema,
planner_context)?;
// Verify that columns of all SortExprs exist in the schema:
for expr in result.iter() {
for column in expr.to_columns()?.iter() {