This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new b6146b91c Remove Option from window frame (#4516)
b6146b91c is described below
commit b6146b91c5506da1f2c95dd1348cc8fdd49c9020
Author: Mustafa akur <[email protected]>
AuthorDate: Mon Dec 5 20:53:35 2022 +0300
Remove Option from window frame (#4516)
* window frame none, non empty orderby handling logic
* window frame none, empty orderby handling logic
* Remove window frame option
* Minor changes
* Use ScalarValue::Null for unbounded
* Resolve proto errors
* combine functions under new
---
datafusion/core/src/dataframe.rs | 5 +-
datafusion/core/src/physical_plan/planner.rs | 9 +-
datafusion/core/src/physical_plan/windows/mod.rs | 10 +-
datafusion/core/tests/sql/select.rs | 16 +-
datafusion/core/tests/sql/window.rs | 372 ++++++++++-----------
datafusion/expr/src/expr.rs | 20 +-
datafusion/expr/src/utils.rs | 22 +-
datafusion/expr/src/window_frame.rs | 92 +++--
datafusion/optimizer/src/type_coercion.rs | 68 ++--
datafusion/physical-expr/src/window/aggregate.rs | 14 +-
datafusion/physical-expr/src/window/built_in.rs | 14 +-
.../physical-expr/src/window/window_frame_state.rs | 28 +-
datafusion/proto/proto/datafusion.proto | 6 +-
datafusion/proto/src/from_proto.rs | 20 +-
datafusion/proto/src/generated/pbjson.rs | 38 +--
datafusion/proto/src/generated/prost.rs | 12 +-
datafusion/proto/src/lib.rs | 8 +-
datafusion/proto/src/physical_plan/mod.rs | 2 +-
datafusion/proto/src/to_proto.rs | 7 +-
datafusion/sql/src/planner.rs | 111 +++---
20 files changed, 425 insertions(+), 449 deletions(-)
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index d7b7ccc94..8d02198b9 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -850,7 +850,8 @@ mod tests {
use arrow::datatypes::DataType;
use datafusion_expr::{
avg, cast, count, count_distinct, create_udf, lit, max, min, sum,
- BuiltInWindowFunction, ScalarFunctionImplementation, Volatility,
WindowFunction,
+ BuiltInWindowFunction, ScalarFunctionImplementation, Volatility,
WindowFrame,
+ WindowFunction,
};
use datafusion_physical_expr::expressions::Column;
@@ -896,7 +897,7 @@ mod tests {
args: vec![col("aggregate_test_100.c1")],
partition_by: vec![col("aggregate_test_100.c2")],
order_by: vec![],
- window_frame: None,
+ window_frame: WindowFrame::new(false),
};
let t2 = t.select(vec![col("c1"), first_row])?;
let plan = t2.plan.clone();
diff --git a/datafusion/core/src/physical_plan/planner.rs
b/datafusion/core/src/physical_plan/planner.rs
index d11b27670..62d44fac7 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1484,15 +1484,14 @@ pub fn create_window_expr_with_name(
)),
})
.collect::<Result<Vec<_>>>()?;
- if let Some(ref window_frame) = window_frame {
- if !is_window_valid(window_frame) {
- return Err(DataFusionError::Execution(format!(
+ if !is_window_valid(window_frame) {
+ return Err(DataFusionError::Execution(format!(
"Invalid window frame: start bound ({}) cannot be
larger than end bound ({})",
window_frame.start_bound, window_frame.end_bound
)));
- }
}
- let window_frame = window_frame.clone().map(Arc::new);
+
+ let window_frame = Arc::new(window_frame.clone());
windows::create_window_expr(
fun,
name,
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs
b/datafusion/core/src/physical_plan/windows/mod.rs
index 60ddb6f5d..0f837e581 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -51,7 +51,7 @@ pub fn create_window_expr(
args: &[Arc<dyn PhysicalExpr>],
partition_by: &[Arc<dyn PhysicalExpr>],
order_by: &[PhysicalSortExpr],
- window_frame: Option<Arc<WindowFrame>>,
+ window_frame: Arc<WindowFrame>,
input_schema: &Schema,
) -> Result<Arc<dyn WindowExpr>> {
Ok(match fun {
@@ -194,7 +194,7 @@ mod tests {
&[col("c3", &schema)?],
&[],
&[],
- Some(Arc::new(WindowFrame::default())),
+ Arc::new(WindowFrame::new(false)),
schema.as_ref(),
)?,
create_window_expr(
@@ -203,7 +203,7 @@ mod tests {
&[col("c3", &schema)?],
&[],
&[],
- Some(Arc::new(WindowFrame::default())),
+ Arc::new(WindowFrame::new(false)),
schema.as_ref(),
)?,
create_window_expr(
@@ -212,7 +212,7 @@ mod tests {
&[col("c3", &schema)?],
&[],
&[],
- Some(Arc::new(WindowFrame::default())),
+ Arc::new(WindowFrame::new(false)),
schema.as_ref(),
)?,
],
@@ -260,7 +260,7 @@ mod tests {
&[col("a", &schema)?],
&[],
&[],
- Some(Arc::new(WindowFrame::default())),
+ Arc::new(WindowFrame::new(false)),
schema.as_ref(),
)?],
blocking_exec,
diff --git a/datafusion/core/tests/sql/select.rs
b/datafusion/core/tests/sql/select.rs
index 6d7014507..5a56247e4 100644
--- a/datafusion/core/tests/sql/select.rs
+++ b/datafusion/core/tests/sql/select.rs
@@ -893,16 +893,16 @@ async fn query_on_string_dictionary() -> Result<()> {
assert_batches_sorted_eq!(expected, &actual);
// window functions
- let sql = "SELECT d1, row_number() OVER (partition by d1) FROM test";
+ let sql = "SELECT d1, row_number() OVER (partition by d1) as rn1 FROM
test";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
- "+-------+--------------+",
- "| d1 | ROW_NUMBER() |",
- "+-------+--------------+",
- "| | 1 |",
- "| one | 1 |",
- "| three | 1 |",
- "+-------+--------------+",
+ "+-------+-----+",
+ "| d1 | rn1 |",
+ "+-------+-----+",
+ "| | 1 |",
+ "| one | 1 |",
+ "| three | 1 |",
+ "+-------+-----+",
];
assert_batches_sorted_eq!(expected, &actual);
diff --git a/datafusion/core/tests/sql/window.rs
b/datafusion/core/tests/sql/window.rs
index 6d30d53f5..f8fbe67ab 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -24,23 +24,23 @@ async fn csv_query_window_with_empty_over() -> Result<()> {
register_aggregate_csv(&ctx).await?;
let sql = "select \
c9, \
- count(c5) over (), \
- max(c5) over (), \
- min(c5) over () \
+ count(c5) over () as count1, \
+ max(c5) over () as max1, \
+ min(c5) over () as min1 \
from aggregate_test_100 \
order by c9 \
limit 5";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
-
"+-----------+------------------------------+----------------------------+----------------------------+",
- "| c9 | COUNT(aggregate_test_100.c5) |
MAX(aggregate_test_100.c5) | MIN(aggregate_test_100.c5) |",
-
"+-----------+------------------------------+----------------------------+----------------------------+",
- "| 28774375 | 100 | 2143473091
| -2141999138 |",
- "| 63044568 | 100 | 2143473091
| -2141999138 |",
- "| 141047417 | 100 | 2143473091
| -2141999138 |",
- "| 141680161 | 100 | 2143473091
| -2141999138 |",
- "| 145294611 | 100 | 2143473091
| -2141999138 |",
-
"+-----------+------------------------------+----------------------------+----------------------------+",
+ "+-----------+--------+------------+-------------+",
+ "| c9 | count1 | max1 | min1 |",
+ "+-----------+--------+------------+-------------+",
+ "| 28774375 | 100 | 2143473091 | -2141999138 |",
+ "| 63044568 | 100 | 2143473091 | -2141999138 |",
+ "| 141047417 | 100 | 2143473091 | -2141999138 |",
+ "| 141680161 | 100 | 2143473091 | -2141999138 |",
+ "| 145294611 | 100 | 2143473091 | -2141999138 |",
+ "+-----------+--------+------------+-------------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
@@ -53,25 +53,25 @@ async fn csv_query_window_with_partition_by() -> Result<()>
{
register_aggregate_csv(&ctx).await?;
let sql = "select \
c9, \
- sum(cast(c4 as Int)) over (partition by c3), \
- avg(cast(c4 as Int)) over (partition by c3), \
- count(cast(c4 as Int)) over (partition by c3), \
- max(cast(c4 as Int)) over (partition by c3), \
- min(cast(c4 as Int)) over (partition by c3) \
+ sum(cast(c4 as Int)) over (partition by c3) as sum1, \
+ avg(cast(c4 as Int)) over (partition by c3) as avg1, \
+ count(cast(c4 as Int)) over (partition by c3) as count1, \
+ max(cast(c4 as Int)) over (partition by c3) as max1, \
+ min(cast(c4 as Int)) over (partition by c3) as min1 \
from aggregate_test_100 \
order by c9 \
limit 5";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
-
"+-----------+----------------------------+----------------------------+------------------------------+----------------------------+----------------------------+",
- "| c9 | SUM(aggregate_test_100.c4) | AVG(aggregate_test_100.c4)
| COUNT(aggregate_test_100.c4) | MAX(aggregate_test_100.c4) |
MIN(aggregate_test_100.c4) |",
-
"+-----------+----------------------------+----------------------------+------------------------------+----------------------------+----------------------------+",
- "| 28774375 | -16110 | -16110
| 1 | -16110 | -16110
|",
- "| 63044568 | 3917 | 3917
| 1 | 3917 | 3917
|",
- "| 141047417 | -38455 | -19227.5
| 2 | -16974 | -21481
|",
- "| 141680161 | -1114 | -1114
| 1 | -1114 | -1114
|",
- "| 145294611 | 15673 | 15673
| 1 | 15673 | 15673
|",
-
"+-----------+----------------------------+----------------------------+------------------------------+----------------------------+----------------------------+",
+ "+-----------+--------+----------+--------+--------+--------+",
+ "| c9 | sum1 | avg1 | count1 | max1 | min1 |",
+ "+-----------+--------+----------+--------+--------+--------+",
+ "| 28774375 | -16110 | -16110 | 1 | -16110 | -16110 |",
+ "| 63044568 | 3917 | 3917 | 1 | 3917 | 3917 |",
+ "| 141047417 | -38455 | -19227.5 | 2 | -16974 | -21481 |",
+ "| 141680161 | -1114 | -1114 | 1 | -1114 | -1114 |",
+ "| 145294611 | 15673 | 15673 | 1 | 15673 | 15673 |",
+ "+-----------+--------+----------+--------+--------+--------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
@@ -83,28 +83,28 @@ async fn csv_query_window_with_order_by() -> Result<()> {
register_aggregate_csv(&ctx).await?;
let sql = "select \
c9, \
- sum(c5) over (order by c9), \
- avg(c5) over (order by c9), \
- count(c5) over (order by c9), \
- max(c5) over (order by c9), \
- min(c5) over (order by c9), \
- first_value(c5) over (order by c9), \
- last_value(c5) over (order by c9), \
- nth_value(c5, 2) over (order by c9) \
+ sum(c5) over (order by c9) as sum1, \
+ avg(c5) over (order by c9) as avg1, \
+ count(c5) over (order by c9) as count1, \
+ max(c5) over (order by c9) as max1, \
+ min(c5) over (order by c9) as min1, \
+ first_value(c5) over (order by c9) as fv1, \
+ last_value(c5) over (order by c9) as lv1, \
+ nth_value(c5, 2) over (order by c9) as nv1 \
from aggregate_test_100 \
order by c9 \
limit 5";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
-
"+-----------+----------------------------+----------------------------+------------------------------+----------------------------+----------------------------+------------------------------------+-----------------------------------+-------------------------------------------+",
- "| c9 | SUM(aggregate_test_100.c5) | AVG(aggregate_test_100.c5)
| COUNT(aggregate_test_100.c5) | MAX(aggregate_test_100.c5) |
MIN(aggregate_test_100.c5) | FIRST_VALUE(aggregate_test_100.c5) |
LAST_VALUE(aggregate_test_100.c5) | NTH_VALUE(aggregate_test_100.c5,Int64(2))
|",
-
"+-----------+----------------------------+----------------------------+------------------------------+----------------------------+----------------------------+------------------------------------+-----------------------------------+-------------------------------------------+",
- "| 28774375 | 61035129 | 61035129
| 1 | 61035129 | 61035129
| 61035129 | 61035129
| |",
- "| 63044568 | -47938237 | -23969118.5
| 2 | 61035129 | -108973366
| 61035129 | -108973366
| -108973366 |",
- "| 141047417 | 575165281 | 191721760.33333334
| 3 | 623103518 | -108973366
| 61035129 | 623103518
| -108973366 |",
- "| 141680161 | -1352462829 | -338115707.25
| 4 | 623103518 | -1927628110
| 61035129 | -1927628110
| -108973366 |",
- "| 145294611 | -3251637940 | -650327588
| 5 | 623103518 | -1927628110
| 61035129 | -1899175111
| -108973366 |",
-
"+-----------+----------------------------+----------------------------+------------------------------+----------------------------+----------------------------+------------------------------------+-----------------------------------+-------------------------------------------+",
+
"+-----------+-------------+--------------------+--------+-----------+-------------+----------+-------------+------------+",
+ "| c9 | sum1 | avg1 | count1 | max1 |
min1 | fv1 | lv1 | nv1 |",
+
"+-----------+-------------+--------------------+--------+-----------+-------------+----------+-------------+------------+",
+ "| 28774375 | 61035129 | 61035129 | 1 | 61035129 |
61035129 | 61035129 | 61035129 | |",
+ "| 63044568 | -47938237 | -23969118.5 | 2 | 61035129 |
-108973366 | 61035129 | -108973366 | -108973366 |",
+ "| 141047417 | 575165281 | 191721760.33333334 | 3 | 623103518 |
-108973366 | 61035129 | 623103518 | -108973366 |",
+ "| 141680161 | -1352462829 | -338115707.25 | 4 | 623103518 |
-1927628110 | 61035129 | -1927628110 | -108973366 |",
+ "| 145294611 | -3251637940 | -650327588 | 5 | 623103518 |
-1927628110 | 61035129 | -1899175111 | -108973366 |",
+
"+-----------+-------------+--------------------+--------+-----------+-------------+----------+-------------+------------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
@@ -116,28 +116,28 @@ async fn csv_query_window_with_partition_by_order_by() ->
Result<()> {
register_aggregate_csv(&ctx).await?;
let sql = "select \
c9, \
- sum(c5) over (partition by c4 order by c9), \
- avg(c5) over (partition by c4 order by c9), \
- count(c5) over (partition by c4 order by c9), \
- max(c5) over (partition by c4 order by c9), \
- min(c5) over (partition by c4 order by c9), \
- first_value(c5) over (partition by c4 order by c9), \
- last_value(c5) over (partition by c4 order by c9), \
- nth_value(c5, 2) over (partition by c4 order by c9) \
+ sum(c5) over (partition by c4 order by c9) as sum1, \
+ avg(c5) over (partition by c4 order by c9) as avg1, \
+ count(c5) over (partition by c4 order by c9) as count1, \
+ max(c5) over (partition by c4 order by c9) as max1, \
+ min(c5) over (partition by c4 order by c9) as min1, \
+ first_value(c5) over (partition by c4 order by c9) as fv1, \
+ last_value(c5) over (partition by c4 order by c9) as lv1, \
+ nth_value(c5, 2) over (partition by c4 order by c9) as nv1 \
from aggregate_test_100 \
order by c9 \
limit 5";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
-
"+-----------+----------------------------+----------------------------+------------------------------+----------------------------+----------------------------+------------------------------------+-----------------------------------+-------------------------------------------+",
- "| c9 | SUM(aggregate_test_100.c5) | AVG(aggregate_test_100.c5)
| COUNT(aggregate_test_100.c5) | MAX(aggregate_test_100.c5) |
MIN(aggregate_test_100.c5) | FIRST_VALUE(aggregate_test_100.c5) |
LAST_VALUE(aggregate_test_100.c5) | NTH_VALUE(aggregate_test_100.c5,Int64(2))
|",
-
"+-----------+----------------------------+----------------------------+------------------------------+----------------------------+----------------------------+------------------------------------+-----------------------------------+-------------------------------------------+",
- "| 28774375 | 61035129 | 61035129
| 1 | 61035129 | 61035129
| 61035129 | 61035129
| |",
- "| 63044568 | -108973366 | -108973366
| 1 | -108973366 | -108973366
| -108973366 | -108973366
| |",
- "| 141047417 | 623103518 | 623103518
| 1 | 623103518 | 623103518
| 623103518 | 623103518
| |",
- "| 141680161 | -1927628110 | -1927628110
| 1 | -1927628110 | -1927628110
| -1927628110 | -1927628110
| |",
- "| 145294611 | -1899175111 | -1899175111
| 1 | -1899175111 | -1899175111
| -1899175111 | -1899175111
| |",
-
"+-----------+----------------------------+----------------------------+------------------------------+----------------------------+----------------------------+------------------------------------+-----------------------------------+-------------------------------------------+"
+
"+-----------+-------------+-------------+--------+-------------+-------------+-------------+-------------+-----+",
+ "| c9 | sum1 | avg1 | count1 | max1 | min1
| fv1 | lv1 | nv1 |",
+
"+-----------+-------------+-------------+--------+-------------+-------------+-------------+-------------+-----+",
+ "| 28774375 | 61035129 | 61035129 | 1 | 61035129 |
61035129 | 61035129 | 61035129 | |",
+ "| 63044568 | -108973366 | -108973366 | 1 | -108973366 |
-108973366 | -108973366 | -108973366 | |",
+ "| 141047417 | 623103518 | 623103518 | 1 | 623103518 |
623103518 | 623103518 | 623103518 | |",
+ "| 141680161 | -1927628110 | -1927628110 | 1 | -1927628110 |
-1927628110 | -1927628110 | -1927628110 | |",
+ "| 145294611 | -1899175111 | -1899175111 | 1 | -1899175111 |
-1899175111 | -1899175111 | -1899175111 | |",
+
"+-----------+-------------+-------------+--------+-------------+-------------+-------------+-------------+-----+",
];
assert_batches_eq!(expected, &actual);
Ok(())
@@ -149,11 +149,11 @@ async fn window() -> Result<()> {
"SELECT \
c1, \
c2, \
- SUM(c2) OVER (), \
- COUNT(c2) OVER (), \
- MAX(c2) OVER (), \
- MIN(c2) OVER (), \
- AVG(c2) OVER () \
+ SUM(c2) OVER () as sum1, \
+ COUNT(c2) OVER () as count1, \
+ MAX(c2) OVER () as max1, \
+ MIN(c2) OVER () as min1, \
+ AVG(c2) OVER () as avg1 \
FROM test \
ORDER BY c1, c2 \
LIMIT 5",
@@ -166,15 +166,15 @@ async fn window() -> Result<()> {
assert_eq!(results.len(), 1);
let expected = vec![
-
"+----+----+--------------+----------------+--------------+--------------+--------------+",
- "| c1 | c2 | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) |
MIN(test.c2) | AVG(test.c2) |",
-
"+----+----+--------------+----------------+--------------+--------------+--------------+",
- "| 0 | 1 | 220 | 40 | 10 | 1
| 5.5 |",
- "| 0 | 2 | 220 | 40 | 10 | 1
| 5.5 |",
- "| 0 | 3 | 220 | 40 | 10 | 1
| 5.5 |",
- "| 0 | 4 | 220 | 40 | 10 | 1
| 5.5 |",
- "| 0 | 5 | 220 | 40 | 10 | 1
| 5.5 |",
-
"+----+----+--------------+----------------+--------------+--------------+--------------+",
+ "+----+----+------+--------+------+------+------+",
+ "| c1 | c2 | sum1 | count1 | max1 | min1 | avg1 |",
+ "+----+----+------+--------+------+------+------+",
+ "| 0 | 1 | 220 | 40 | 10 | 1 | 5.5 |",
+ "| 0 | 2 | 220 | 40 | 10 | 1 | 5.5 |",
+ "| 0 | 3 | 220 | 40 | 10 | 1 | 5.5 |",
+ "| 0 | 4 | 220 | 40 | 10 | 1 | 5.5 |",
+ "| 0 | 5 | 220 | 40 | 10 | 1 | 5.5 |",
+ "+----+----+------+--------+------+------+------+",
];
// window function shall respect ordering
@@ -188,15 +188,15 @@ async fn window_order_by() -> Result<()> {
"SELECT \
c1, \
c2, \
- ROW_NUMBER() OVER (ORDER BY c1, c2), \
- FIRST_VALUE(c2) OVER (ORDER BY c1, c2), \
- LAST_VALUE(c2) OVER (ORDER BY c1, c2), \
- NTH_VALUE(c2, 2) OVER (ORDER BY c1, c2), \
- SUM(c2) OVER (ORDER BY c1, c2), \
- COUNT(c2) OVER (ORDER BY c1, c2), \
- MAX(c2) OVER (ORDER BY c1, c2), \
- MIN(c2) OVER (ORDER BY c1, c2), \
- AVG(c2) OVER (ORDER BY c1, c2) \
+ ROW_NUMBER() OVER (ORDER BY c1, c2) as rn1, \
+ FIRST_VALUE(c2) OVER (ORDER BY c1, c2) as fv1, \
+ LAST_VALUE(c2) OVER (ORDER BY c1, c2) as lv1, \
+ NTH_VALUE(c2, 2) OVER (ORDER BY c1, c2) as nv1, \
+ SUM(c2) OVER (ORDER BY c1, c2) as sum1, \
+ COUNT(c2) OVER (ORDER BY c1, c2) as count1, \
+ MAX(c2) OVER (ORDER BY c1, c2) as max1, \
+ MIN(c2) OVER (ORDER BY c1, c2) as min1, \
+ AVG(c2) OVER (ORDER BY c1, c2) as avg1 \
FROM test \
ORDER BY c1, c2 \
LIMIT 5",
@@ -209,15 +209,15 @@ async fn window_order_by() -> Result<()> {
assert_eq!(results.len(), 1);
let expected = vec![
-
"+----+----+--------------+----------------------+---------------------+-----------------------------+--------------+----------------+--------------+--------------+--------------+",
- "| c1 | c2 | ROW_NUMBER() | FIRST_VALUE(test.c2) | LAST_VALUE(test.c2)
| NTH_VALUE(test.c2,Int64(2)) | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) |
MIN(test.c2) | AVG(test.c2) |",
-
"+----+----+--------------+----------------------+---------------------+-----------------------------+--------------+----------------+--------------+--------------+--------------+",
- "| 0 | 1 | 1 | 1 | 1
| | 1 | 1 | 1 |
1 | 1 |",
- "| 0 | 2 | 2 | 1 | 2
| 2 | 3 | 2 | 2 |
1 | 1.5 |",
- "| 0 | 3 | 3 | 1 | 3
| 2 | 6 | 3 | 3 |
1 | 2 |",
- "| 0 | 4 | 4 | 1 | 4
| 2 | 10 | 4 | 4 |
1 | 2.5 |",
- "| 0 | 5 | 5 | 1 | 5
| 2 | 15 | 5 | 5 |
1 | 3 |",
-
"+----+----+--------------+----------------------+---------------------+-----------------------------+--------------+----------------+--------------+--------------+--------------+",
+
"+----+----+-----+-----+-----+-----+------+--------+------+------+------+",
+ "| c1 | c2 | rn1 | fv1 | lv1 | nv1 | sum1 | count1 | max1 | min1 |
avg1 |",
+
"+----+----+-----+-----+-----+-----+------+--------+------+------+------+",
+ "| 0 | 1 | 1 | 1 | 1 | | 1 | 1 | 1 | 1 | 1
|",
+ "| 0 | 2 | 2 | 1 | 2 | 2 | 3 | 2 | 2 | 1 | 1.5
|",
+ "| 0 | 3 | 3 | 1 | 3 | 2 | 6 | 3 | 3 | 1 | 2
|",
+ "| 0 | 4 | 4 | 1 | 4 | 2 | 10 | 4 | 4 | 1 | 2.5
|",
+ "| 0 | 5 | 5 | 1 | 5 | 2 | 15 | 5 | 5 | 1 | 3
|",
+
"+----+----+-----+-----+-----+-----+------+--------+------+------+------+",
];
// window function shall respect ordering
@@ -231,11 +231,11 @@ async fn window_partition_by() -> Result<()> {
"SELECT \
c1, \
c2, \
- SUM(c2) OVER (PARTITION BY c2), \
- COUNT(c2) OVER (PARTITION BY c2), \
- MAX(c2) OVER (PARTITION BY c2), \
- MIN(c2) OVER (PARTITION BY c2), \
- AVG(c2) OVER (PARTITION BY c2) \
+ SUM(c2) OVER (PARTITION BY c2) as sum1, \
+ COUNT(c2) OVER (PARTITION BY c2) as count1, \
+ MAX(c2) OVER (PARTITION BY c2) as max1, \
+ MIN(c2) OVER (PARTITION BY c2) as min1, \
+ AVG(c2) OVER (PARTITION BY c2) as avg1 \
FROM test \
ORDER BY c1, c2 \
LIMIT 5",
@@ -244,15 +244,15 @@ async fn window_partition_by() -> Result<()> {
.await?;
let expected = vec![
-
"+----+----+--------------+----------------+--------------+--------------+--------------+",
- "| c1 | c2 | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) |
MIN(test.c2) | AVG(test.c2) |",
-
"+----+----+--------------+----------------+--------------+--------------+--------------+",
- "| 0 | 1 | 4 | 4 | 1 | 1
| 1 |",
- "| 0 | 2 | 8 | 4 | 2 | 2
| 2 |",
- "| 0 | 3 | 12 | 4 | 3 | 3
| 3 |",
- "| 0 | 4 | 16 | 4 | 4 | 4
| 4 |",
- "| 0 | 5 | 20 | 4 | 5 | 5
| 5 |",
-
"+----+----+--------------+----------------+--------------+--------------+--------------+",
+ "+----+----+------+--------+------+------+------+",
+ "| c1 | c2 | sum1 | count1 | max1 | min1 | avg1 |",
+ "+----+----+------+--------+------+------+------+",
+ "| 0 | 1 | 4 | 4 | 1 | 1 | 1 |",
+ "| 0 | 2 | 8 | 4 | 2 | 2 | 2 |",
+ "| 0 | 3 | 12 | 4 | 3 | 3 | 3 |",
+ "| 0 | 4 | 16 | 4 | 4 | 4 | 4 |",
+ "| 0 | 5 | 20 | 4 | 5 | 5 | 5 |",
+ "+----+----+------+--------+------+------+------+",
];
// window function shall respect ordering
@@ -266,15 +266,15 @@ async fn window_partition_by_order_by() -> Result<()> {
"SELECT \
c1, \
c2, \
- ROW_NUMBER() OVER (PARTITION BY c2 ORDER BY c1), \
- FIRST_VALUE(c2 + c1) OVER (PARTITION BY c2 ORDER BY c1), \
- LAST_VALUE(c2 + c1) OVER (PARTITION BY c2 ORDER BY c1), \
- NTH_VALUE(c2 + c1, 1) OVER (PARTITION BY c2 ORDER BY c1), \
- SUM(c2) OVER (PARTITION BY c2 ORDER BY c1), \
- COUNT(c2) OVER (PARTITION BY c2 ORDER BY c1), \
- MAX(c2) OVER (PARTITION BY c2 ORDER BY c1), \
- MIN(c2) OVER (PARTITION BY c2 ORDER BY c1), \
- AVG(c2) OVER (PARTITION BY c2 ORDER BY c1) \
+ ROW_NUMBER() OVER (PARTITION BY c2 ORDER BY c1) as rn1, \
+ FIRST_VALUE(c2 + c1) OVER (PARTITION BY c2 ORDER BY c1) as fv1, \
+ LAST_VALUE(c2 + c1) OVER (PARTITION BY c2 ORDER BY c1) as lv1, \
+ NTH_VALUE(c2 + c1, 1) OVER (PARTITION BY c2 ORDER BY c1) as nv1, \
+ SUM(c2) OVER (PARTITION BY c2 ORDER BY c1) as sum1, \
+ COUNT(c2) OVER (PARTITION BY c2 ORDER BY c1) as count1, \
+ MAX(c2) OVER (PARTITION BY c2 ORDER BY c1) as max1, \
+ MIN(c2) OVER (PARTITION BY c2 ORDER BY c1) as min1, \
+ AVG(c2) OVER (PARTITION BY c2 ORDER BY c1) as avg1 \
FROM test \
ORDER BY c1, c2 \
LIMIT 5",
@@ -283,15 +283,15 @@ async fn window_partition_by_order_by() -> Result<()> {
.await?;
let expected = vec![
-
"+----+----+--------------+--------------------------------+-------------------------------+---------------------------------------+--------------+----------------+--------------+--------------+--------------+",
- "| c1 | c2 | ROW_NUMBER() | FIRST_VALUE(test.c2 + test.c1) |
LAST_VALUE(test.c2 + test.c1) | NTH_VALUE(test.c2 + test.c1,Int64(1)) |
SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |",
-
"+----+----+--------------+--------------------------------+-------------------------------+---------------------------------------+--------------+----------------+--------------+--------------+--------------+",
- "| 0 | 1 | 1 | 1 | 1
| 1 | 1 | 1
| 1 | 1 | 1 |",
- "| 0 | 2 | 1 | 2 | 2
| 2 | 2 | 1
| 2 | 2 | 2 |",
- "| 0 | 3 | 1 | 3 | 3
| 3 | 3 | 1
| 3 | 3 | 3 |",
- "| 0 | 4 | 1 | 4 | 4
| 4 | 4 | 1
| 4 | 4 | 4 |",
- "| 0 | 5 | 1 | 5 | 5
| 5 | 5 | 1
| 5 | 5 | 5 |",
-
"+----+----+--------------+--------------------------------+-------------------------------+---------------------------------------+--------------+----------------+--------------+--------------+--------------+",
+
"+----+----+-----+-----+-----+-----+------+--------+------+------+------+",
+ "| c1 | c2 | rn1 | fv1 | lv1 | nv1 | sum1 | count1 | max1 | min1 |
avg1 |",
+
"+----+----+-----+-----+-----+-----+------+--------+------+------+------+",
+ "| 0 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1
|",
+ "| 0 | 2 | 1 | 2 | 2 | 2 | 2 | 1 | 2 | 2 | 2
|",
+ "| 0 | 3 | 1 | 3 | 3 | 3 | 3 | 1 | 3 | 3 | 3
|",
+ "| 0 | 4 | 1 | 4 | 4 | 4 | 4 | 1 | 4 | 4 | 4
|",
+ "| 0 | 5 | 1 | 5 | 5 | 5 | 5 | 1 | 5 | 5 | 5
|",
+
"+----+----+-----+-----+-----+-----+------+--------+------+------+------+",
];
// window function shall respect ordering
@@ -401,8 +401,8 @@ async fn window_expr_eliminate() -> Result<()> {
" Aggregate: groupBy=[[d.b]], aggr=[[MAX(d.a), MAX(d.seq)]]
[b:Utf8, MAX(d.a):Int64;N, MAX(d.seq):UInt64;N]",
" SubqueryAlias: d [seq:UInt64;N, a:Int64, b:Utf8]",
" SubqueryAlias: _data2 [seq:UInt64;N, a:Int64, b:Utf8]",
- " Projection: ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a
ASC NULLS LAST] AS seq, s.a, s.b [seq:UInt64;N, a:Int64, b:Utf8]",
- " WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY
[s.b] ORDER BY [s.a ASC NULLS LAST]]] [ROW_NUMBER() PARTITION BY [s.b] ORDER BY
[s.a ASC NULLS LAST]:UInt64;N, a:Int64, b:Utf8]",
+ " Projection: ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS seq, s.a,
s.b [seq:UInt64;N, a:Int64, b:Utf8]",
+ " WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY
[s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]] [ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N, a:Int64, b:Utf8]",
" SubqueryAlias: s [a:Int64, b:Utf8]",
" Union [a:Int64, b:Utf8]",
" Projection: Int64(1) AS a, Utf8(\"aa\") AS b
[a:Int64, b:Utf8]",
@@ -439,15 +439,15 @@ async fn window_expr_eliminate() -> Result<()> {
#[tokio::test]
async fn window_in_expression() -> Result<()> {
let ctx = SessionContext::new();
- let sql = "select 1 - lag(amount, 1) over (order by idx) from (values
('a', 1, 100), ('a', 2, 150)) as t (col1, idx, amount)";
+ let sql = "select 1 - lag(amount, 1) over (order by idx) as column1 from
(values ('a', 1, 100), ('a', 2, 150)) as t (col1, idx, amount)";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
-
"+-------------------------------------------------------------------+",
- "| Int64(1) - LAG(t.amount,Int64(1)) ORDER BY [t.idx ASC NULLS LAST]
|",
-
"+-------------------------------------------------------------------+",
- "|
|",
- "| -99
|",
-
"+-------------------------------------------------------------------+",
+ "+---------+",
+ "| column1 |",
+ "+---------+",
+ "| |",
+ "| -99 |",
+ "+---------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
@@ -479,22 +479,22 @@ async fn window_frame_empty() -> Result<()> {
let ctx = SessionContext::new();
register_aggregate_csv(&ctx).await?;
let sql = "SELECT \
- SUM(c3) OVER(),\
- COUNT(*) OVER ()\
+ SUM(c3) OVER() as sum1, \
+ COUNT(*) OVER () as count1 \
FROM aggregate_test_100 \
ORDER BY c9 \
LIMIT 5";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
- "+----------------------------+-----------------+",
- "| SUM(aggregate_test_100.c3) | COUNT(UInt8(1)) |",
- "+----------------------------+-----------------+",
- "| 781 | 100 |",
- "| 781 | 100 |",
- "| 781 | 100 |",
- "| 781 | 100 |",
- "| 781 | 100 |",
- "+----------------------------+-----------------+",
+ "+------+--------+",
+ "| sum1 | count1 |",
+ "+------+--------+",
+ "| 781 | 100 |",
+ "| 781 | 100 |",
+ "| 781 | 100 |",
+ "| 781 | 100 |",
+ "| 781 | 100 |",
+ "+------+--------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
@@ -613,20 +613,20 @@ async fn window_frame_order_by_asc_desc_large() ->
Result<()> {
let ctx = SessionContext::new();
register_aggregate_csv(&ctx).await?;
let sql = "SELECT
- SUM(c5) OVER (ORDER BY c2 ASC, c6 DESC)
+ SUM(c5) OVER (ORDER BY c2 ASC, c6 DESC) as sum1
FROM aggregate_test_100
LIMIT 5";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
- "+----------------------------+",
- "| SUM(aggregate_test_100.c5) |",
- "+----------------------------+",
- "| -1383162419 |",
- "| -3265456275 |",
- "| -3909681744 |",
- "| -5241214934 |",
- "| -4246910946 |",
- "+----------------------------+",
+ "+-------------+",
+ "| sum1 |",
+ "+-------------+",
+ "| -1383162419 |",
+ "| -3265456275 |",
+ "| -3909681744 |",
+ "| -5241214934 |",
+ "| -4246910946 |",
+ "+-------------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
@@ -637,21 +637,21 @@ async fn window_frame_order_by_desc_large() -> Result<()>
{
let ctx = SessionContext::new();
register_aggregate_csv(&ctx).await?;
let sql = "SELECT
- SUM(c5) OVER (ORDER BY c2 DESC, c6 ASC)
+ SUM(c5) OVER (ORDER BY c2 DESC, c6 ASC) as sum1
FROM aggregate_test_100
ORDER BY c9
LIMIT 5";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
- "+----------------------------+",
- "| SUM(aggregate_test_100.c5) |",
- "+----------------------------+",
- "| 11212193439 |",
- "| 22799733943 |",
- "| 2935356871 |",
- "| 15810962683 |",
- "| 18035025006 |",
- "+----------------------------+",
+ "+-------------+",
+ "| sum1 |",
+ "+-------------+",
+ "| 11212193439 |",
+ "| 22799733943 |",
+ "| 2935356871 |",
+ "| 15810962683 |",
+ "| 18035025006 |",
+ "+-------------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
@@ -662,20 +662,20 @@ async fn window_frame_order_by_null_timestamp_order_by()
-> Result<()> {
let ctx = SessionContext::new();
register_aggregate_null_cases_csv(&ctx).await?;
let sql = "SELECT
- SUM(c1) OVER (ORDER BY c2 DESC)
+ SUM(c1) OVER (ORDER BY c2 DESC) as summation1
FROM null_cases
LIMIT 5";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
- "+--------------------+",
- "| SUM(null_cases.c1) |",
- "+--------------------+",
- "| 962 |",
- "| 962 |",
- "| 962 |",
- "| 962 |",
- "| 962 |",
- "+--------------------+",
+ "+------------+",
+ "| summation1 |",
+ "+------------+",
+ "| 962 |",
+ "| 962 |",
+ "| 962 |",
+ "| 962 |",
+ "| 962 |",
+ "+------------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
@@ -923,22 +923,22 @@ async fn window_frame_order_by_unique() -> Result<()> {
let ctx = SessionContext::new();
register_aggregate_csv(&ctx).await?;
let sql = "SELECT \
- SUM(c5) OVER (ORDER BY c5), \
- COUNT(*) OVER (ORDER BY c9) \
+ SUM(c5) OVER (ORDER BY c5) as sum1, \
+ COUNT(*) OVER (ORDER BY c9) as count1 \
FROM aggregate_test_100 \
ORDER BY c9 \
LIMIT 5";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
- "+----------------------------+-----------------+",
- "| SUM(aggregate_test_100.c5) | COUNT(UInt8(1)) |",
- "+----------------------------+-----------------+",
- "| -49877765574 | 1 |",
- "| -50025861694 | 2 |",
- "| -45402230071 | 3 |",
- "| -14557735645 | 4 |",
- "| -18365391649 | 5 |",
- "+----------------------------+-----------------+",
+ "+--------------+--------+",
+ "| sum1 | count1 |",
+ "+--------------+--------+",
+ "| -49877765574 | 1 |",
+ "| -50025861694 | 2 |",
+ "| -45402230071 | 3 |",
+ "| -14557735645 | 4 |",
+ "| -18365391649 | 5 |",
+ "+--------------+--------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
@@ -1627,7 +1627,7 @@ async fn test_window_agg_sort() -> Result<()> {
// Only 1 SortExec was added
let expected = {
vec![
- "ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9)
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST]@0 as sum1,
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST,
aggregate_test_100.c8 ASC NULLS LAST]@1 as sum2]",
+ "ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9)
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum2]",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field {
name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} })]",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field {
name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} })]",
" SortExec: [c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]",
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index ecab1afd2..8d0d5c180 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -199,7 +199,7 @@ pub enum Expr {
/// List of order by expressions
order_by: Vec<Expr>,
/// Window frame
- window_frame: Option<window_frame::WindowFrame>,
+ window_frame: window_frame::WindowFrame,
},
/// aggregate function
AggregateUDF {
@@ -827,15 +827,11 @@ impl fmt::Debug for Expr {
if !order_by.is_empty() {
write!(f, " ORDER BY {:?}", order_by)?;
}
- if let Some(window_frame) = window_frame {
- write!(
- f,
- " {} BETWEEN {} AND {}",
- window_frame.units,
- window_frame.start_bound,
- window_frame.end_bound
- )?;
- }
+ write!(
+ f,
+ " {} BETWEEN {} AND {}",
+ window_frame.units, window_frame.start_bound,
window_frame.end_bound
+ )?;
Ok(())
}
Expr::AggregateFunction {
@@ -1187,9 +1183,7 @@ fn create_name(e: &Expr) -> Result<String> {
if !order_by.is_empty() {
parts.push(format!("ORDER BY {:?}", order_by));
}
- if let Some(window_frame) = window_frame {
- parts.push(format!("{}", window_frame));
- }
+ parts.push(format!("{}", window_frame));
Ok(parts.join(" "))
}
Expr::AggregateFunction {
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 71088d2db..1fa3d06b7 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -789,7 +789,7 @@ pub fn check_all_column_from_schema(
#[cfg(test)]
mod tests {
use super::*;
- use crate::{col, AggregateFunction, WindowFunction};
+ use crate::{col, AggregateFunction, WindowFrame, WindowFunction};
#[test]
fn test_group_window_expr_by_sort_keys_empty_case() -> Result<()> {
@@ -806,28 +806,28 @@ mod tests {
args: vec![col("name")],
partition_by: vec![],
order_by: vec![],
- window_frame: None,
+ window_frame: WindowFrame::new(false),
};
let max2 = Expr::WindowFunction {
fun: WindowFunction::AggregateFunction(AggregateFunction::Max),
args: vec![col("name")],
partition_by: vec![],
order_by: vec![],
- window_frame: None,
+ window_frame: WindowFrame::new(false),
};
let min3 = Expr::WindowFunction {
fun: WindowFunction::AggregateFunction(AggregateFunction::Min),
args: vec![col("name")],
partition_by: vec![],
order_by: vec![],
- window_frame: None,
+ window_frame: WindowFrame::new(false),
};
let sum4 = Expr::WindowFunction {
fun: WindowFunction::AggregateFunction(AggregateFunction::Sum),
args: vec![col("age")],
partition_by: vec![],
order_by: vec![],
- window_frame: None,
+ window_frame: WindowFrame::new(false),
};
let exprs = &[max1.clone(), max2.clone(), min3.clone(), sum4.clone()];
let result = group_window_expr_by_sort_keys(exprs)?;
@@ -860,28 +860,28 @@ mod tests {
args: vec![col("name")],
partition_by: vec![],
order_by: vec![age_asc.clone(), name_desc.clone()],
- window_frame: None,
+ window_frame: WindowFrame::new(true),
};
let max2 = Expr::WindowFunction {
fun: WindowFunction::AggregateFunction(AggregateFunction::Max),
args: vec![col("name")],
partition_by: vec![],
order_by: vec![],
- window_frame: None,
+ window_frame: WindowFrame::new(false),
};
let min3 = Expr::WindowFunction {
fun: WindowFunction::AggregateFunction(AggregateFunction::Min),
args: vec![col("name")],
partition_by: vec![],
order_by: vec![age_asc.clone(), name_desc.clone()],
- window_frame: None,
+ window_frame: WindowFrame::new(true),
};
let sum4 = Expr::WindowFunction {
fun: WindowFunction::AggregateFunction(AggregateFunction::Sum),
args: vec![col("age")],
partition_by: vec![],
order_by: vec![name_desc.clone(), age_asc.clone(),
created_at_desc.clone()],
- window_frame: None,
+ window_frame: WindowFrame::new(true),
};
// FIXME use as_ref
let exprs = &[max1.clone(), max2.clone(), min3.clone(), sum4.clone()];
@@ -919,7 +919,7 @@ mod tests {
nulls_first: true,
},
],
- window_frame: None,
+ window_frame: WindowFrame::new(true),
},
Expr::WindowFunction {
fun: WindowFunction::AggregateFunction(AggregateFunction::Sum),
@@ -942,7 +942,7 @@ mod tests {
nulls_first: true,
},
],
- window_frame: None,
+ window_frame: WindowFrame::new(true),
},
];
let expected = vec![
diff --git a/datafusion/expr/src/window_frame.rs
b/datafusion/expr/src/window_frame.rs
index 5bf81d165..35790885e 100644
--- a/datafusion/expr/src/window_frame.rs
+++ b/datafusion/expr/src/window_frame.rs
@@ -66,33 +66,51 @@ impl TryFrom<ast::WindowFrame> for WindowFrame {
None => WindowFrameBound::CurrentRow,
};
- if let WindowFrameBound::Following(ScalarValue::Utf8(None)) =
start_bound {
- Err(DataFusionError::Execution(
- "Invalid window frame: start bound cannot be unbounded
following"
- .to_owned(),
- ))
- } else if let WindowFrameBound::Preceding(ScalarValue::Utf8(None)) =
end_bound {
- Err(DataFusionError::Execution(
- "Invalid window frame: end bound cannot be unbounded preceding"
- .to_owned(),
- ))
- } else {
- let units = value.units.into();
- Ok(Self {
- units,
- start_bound,
- end_bound,
- })
- }
+ if let WindowFrameBound::Following(val) = &start_bound {
+ if val.is_null() {
+ return Err(DataFusionError::Execution(
+ "Invalid window frame: start bound cannot be unbounded
following"
+ .to_owned(),
+ ));
+ }
+ } else if let WindowFrameBound::Preceding(val) = &end_bound {
+ if val.is_null() {
+ return Err(DataFusionError::Execution(
+ "Invalid window frame: end bound cannot be unbounded
preceding"
+ .to_owned(),
+ ));
+ }
+ };
+ Ok(Self {
+ units: value.units.into(),
+ start_bound,
+ end_bound,
+ })
}
}
-impl Default for WindowFrame {
- fn default() -> Self {
- WindowFrame {
- units: WindowFrameUnits::Range,
- start_bound: WindowFrameBound::Preceding(ScalarValue::Utf8(None)),
- end_bound: WindowFrameBound::CurrentRow,
+impl WindowFrame {
+ /// Creates a new, default window frame (with the meaning of default
depending on whether the
+ /// frame contains an `ORDER BY` clause.
+ pub fn new(has_order_by: bool) -> Self {
+ if has_order_by {
+ // This window frame covers the table (or partition if `PARTITION
BY` is used)
+ // from beginning to the `CURRENT ROW` (with same rank). It is
used when the `OVER`
+ // clause contains an `ORDER BY` clause but no frame.
+ WindowFrame {
+ units: WindowFrameUnits::Range,
+ start_bound: WindowFrameBound::Preceding(ScalarValue::Null),
+ end_bound: WindowFrameBound::CurrentRow,
+ }
+ } else {
+ // This window frame covers the whole table (or partition if
`PARTITION BY` is used).
+ // It is used when the `OVER` clause does not contain an `ORDER
BY` clause and there is
+ // no frame.
+ WindowFrame {
+ units: WindowFrameUnits::Rows,
+ start_bound:
WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
+ end_bound:
WindowFrameBound::Following(ScalarValue::UInt64(None)),
+ }
}
}
}
@@ -137,15 +155,11 @@ impl TryFrom<ast::WindowFrameBound> for WindowFrameBound {
ast::WindowFrameBound::Preceding(Some(v)) => {
Self::Preceding(convert_frame_bound_to_scalar_value(*v)?)
}
- ast::WindowFrameBound::Preceding(None) => {
- Self::Preceding(ScalarValue::Utf8(None))
- }
+ ast::WindowFrameBound::Preceding(None) =>
Self::Preceding(ScalarValue::Null),
ast::WindowFrameBound::Following(Some(v)) => {
Self::Following(convert_frame_bound_to_scalar_value(*v)?)
}
- ast::WindowFrameBound::Following(None) => {
- Self::Following(ScalarValue::Utf8(None))
- }
+ ast::WindowFrameBound::Following(None) =>
Self::Following(ScalarValue::Null),
ast::WindowFrameBound::CurrentRow => Self::CurrentRow,
})
}
@@ -183,15 +197,21 @@ pub fn convert_frame_bound_to_scalar_value(v: ast::Expr)
-> Result<ScalarValue>
impl fmt::Display for WindowFrameBound {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
- WindowFrameBound::Preceding(ScalarValue::Utf8(None)) => {
- f.write_str("UNBOUNDED PRECEDING")
+ WindowFrameBound::Preceding(n) => {
+ if n.is_null() {
+ f.write_str("UNBOUNDED PRECEDING")
+ } else {
+ write!(f, "{} PRECEDING", n)
+ }
}
- WindowFrameBound::Preceding(n) => write!(f, "{} PRECEDING", n),
WindowFrameBound::CurrentRow => f.write_str("CURRENT ROW"),
- WindowFrameBound::Following(ScalarValue::Utf8(None)) => {
- f.write_str("UNBOUNDED FOLLOWING")
+ WindowFrameBound::Following(n) => {
+ if n.is_null() {
+ f.write_str("UNBOUNDED FOLLOWING")
+ } else {
+ write!(f, "{} FOLLOWING", n)
+ }
}
- WindowFrameBound::Following(n) => write!(f, "{} FOLLOWING", n),
}
}
}
diff --git a/datafusion/optimizer/src/type_coercion.rs
b/datafusion/optimizer/src/type_coercion.rs
index 50f4e0838..0e06f80d8 100644
--- a/datafusion/optimizer/src/type_coercion.rs
+++ b/datafusion/optimizer/src/type_coercion.rs
@@ -431,12 +431,11 @@ fn convert_to_coerced_type(
value: &ScalarValue,
) -> Result<ScalarValue> {
match value {
- // In here we do casting either for ScalarValue::Utf8(None) or
+ // In here we do casting either for NULL types or
// ScalarValue::Utf8(Some(val)). The other types are already casted.
// The reason is that we convert the sqlparser result
// to the Utf8 for all possible cases. Hence the types other than Utf8
// are already casted to appropriate type. Therefore they can be
returned directly.
- ScalarValue::Utf8(None) => ScalarValue::try_from(coerced_type),
ScalarValue::Utf8(Some(val)) => {
// we need special handling for Interval types
if let DataType::Interval(..) = coerced_type {
@@ -445,7 +444,13 @@ fn convert_to_coerced_type(
ScalarValue::try_from_string(val.clone(), coerced_type)
}
}
- s => Ok(s.clone()),
+ s => {
+ if s.is_null() {
+ ScalarValue::try_from(coerced_type)
+ } else {
+ Ok(s.clone())
+ }
+ }
}
}
@@ -465,10 +470,10 @@ fn coerce_frame_bound(
}
fn get_coerced_window_frame(
- window_frame: Option<WindowFrame>,
+ window_frame: WindowFrame,
schema: &DFSchemaRef,
expressions: &[Expr],
-) -> Result<Option<WindowFrame>> {
+) -> Result<WindowFrame> {
fn get_coerced_type(column_type: &DataType) -> Result<DataType> {
if is_numeric(column_type) {
Ok(column_type.clone())
@@ -482,38 +487,31 @@ fn get_coerced_window_frame(
}
}
- if let Some(window_frame) = window_frame {
- let mut window_frame = window_frame;
- let current_types = expressions
- .iter()
- .map(|e| e.get_type(schema))
- .collect::<Result<Vec<_>>>()?;
- match &mut window_frame.units {
- WindowFrameUnits::Range => {
- let col_type = current_types.first().ok_or_else(|| {
- DataFusionError::Internal(
- "ORDER BY column cannot be empty".to_string(),
- )
- })?;
- let coerced_type = get_coerced_type(col_type)?;
- window_frame.start_bound =
- coerce_frame_bound(&coerced_type,
&window_frame.start_bound)?;
- window_frame.end_bound =
- coerce_frame_bound(&coerced_type,
&window_frame.end_bound)?;
- }
- WindowFrameUnits::Rows | WindowFrameUnits::Groups => {
- let coerced_type = DataType::UInt64;
- window_frame.start_bound =
- coerce_frame_bound(&coerced_type,
&window_frame.start_bound)?;
- window_frame.end_bound =
- coerce_frame_bound(&coerced_type,
&window_frame.end_bound)?;
- }
+ let mut window_frame = window_frame;
+ let current_types = expressions
+ .iter()
+ .map(|e| e.get_type(schema))
+ .collect::<Result<Vec<_>>>()?;
+ match &mut window_frame.units {
+ WindowFrameUnits::Range => {
+ let col_type = current_types.first().ok_or_else(|| {
+ DataFusionError::Internal("ORDER BY column cannot be
empty".to_string())
+ })?;
+ let coerced_type = get_coerced_type(col_type)?;
+ window_frame.start_bound =
+ coerce_frame_bound(&coerced_type, &window_frame.start_bound)?;
+ window_frame.end_bound =
+ coerce_frame_bound(&coerced_type, &window_frame.end_bound)?;
+ }
+ WindowFrameUnits::Rows | WindowFrameUnits::Groups => {
+ let coerced_type = DataType::UInt64;
+ window_frame.start_bound =
+ coerce_frame_bound(&coerced_type, &window_frame.start_bound)?;
+ window_frame.end_bound =
+ coerce_frame_bound(&coerced_type, &window_frame.end_bound)?;
}
-
- Ok(Some(window_frame))
- } else {
- Ok(None)
}
+ Ok(window_frame)
}
// Support the `IsTrue` `IsNotTrue` `IsFalse` `IsNotFalse` type coercion.
// The above op will be rewrite to the binary op when creating the physical op.
diff --git a/datafusion/physical-expr/src/window/aggregate.rs
b/datafusion/physical-expr/src/window/aggregate.rs
index f835c40a6..da2421fd9 100644
--- a/datafusion/physical-expr/src/window/aggregate.rs
+++ b/datafusion/physical-expr/src/window/aggregate.rs
@@ -41,7 +41,7 @@ pub struct AggregateWindowExpr {
aggregate: Arc<dyn AggregateExpr>,
partition_by: Vec<Arc<dyn PhysicalExpr>>,
order_by: Vec<PhysicalSortExpr>,
- window_frame: Option<Arc<WindowFrame>>,
+ window_frame: Arc<WindowFrame>,
}
impl AggregateWindowExpr {
@@ -50,7 +50,7 @@ impl AggregateWindowExpr {
aggregate: Arc<dyn AggregateExpr>,
partition_by: &[Arc<dyn PhysicalExpr>],
order_by: &[PhysicalSortExpr],
- window_frame: Option<Arc<WindowFrame>>,
+ window_frame: Arc<WindowFrame>,
) -> Self {
Self {
aggregate,
@@ -94,14 +94,6 @@ impl WindowExpr for AggregateWindowExpr {
self.evaluate_partition_points(batch.num_rows(),
&partition_columns)?;
let sort_options: Vec<SortOptions> =
self.order_by.iter().map(|o| o.options).collect();
- let (_, order_bys) = self.get_values_orderbys(batch)?;
- let window_frame = if !order_bys.is_empty() &&
self.window_frame.is_none() {
- // OVER (ORDER BY a) case
- // We create an implicit window for ORDER BY.
- Some(Arc::new(WindowFrame::default()))
- } else {
- self.window_frame.clone()
- };
let mut row_wise_results: Vec<ScalarValue> = vec![];
for partition_range in &partition_points {
let mut accumulator = self.aggregate.create_accumulator()?;
@@ -109,7 +101,7 @@ impl WindowExpr for AggregateWindowExpr {
let (values, order_bys) =
self.get_values_orderbys(&batch.slice(partition_range.start,
length))?;
- let mut window_frame_ctx = WindowFrameContext::new(&window_frame);
+ let mut window_frame_ctx =
WindowFrameContext::new(&self.window_frame);
let mut last_range: (usize, usize) = (0, 0);
// We iterate on each row to perform a running calculation.
diff --git a/datafusion/physical-expr/src/window/built_in.rs
b/datafusion/physical-expr/src/window/built_in.rs
index 314905bae..e291e8ca5 100644
--- a/datafusion/physical-expr/src/window/built_in.rs
+++ b/datafusion/physical-expr/src/window/built_in.rs
@@ -37,7 +37,7 @@ pub struct BuiltInWindowExpr {
expr: Arc<dyn BuiltInWindowFunctionExpr>,
partition_by: Vec<Arc<dyn PhysicalExpr>>,
order_by: Vec<PhysicalSortExpr>,
- window_frame: Option<Arc<WindowFrame>>,
+ window_frame: Arc<WindowFrame>,
}
impl BuiltInWindowExpr {
@@ -46,7 +46,7 @@ impl BuiltInWindowExpr {
expr: Arc<dyn BuiltInWindowFunctionExpr>,
partition_by: &[Arc<dyn PhysicalExpr>],
order_by: &[PhysicalSortExpr],
- window_frame: Option<Arc<WindowFrame>>,
+ window_frame: Arc<WindowFrame>,
) -> Self {
Self {
expr,
@@ -98,20 +98,12 @@ impl WindowExpr for BuiltInWindowExpr {
let results = if evaluator.uses_window_frame() {
let sort_options: Vec<SortOptions> =
self.order_by.iter().map(|o| o.options).collect();
- let (_, order_bys) = self.get_values_orderbys(batch)?;
- let window_frame = if !order_bys.is_empty() &&
self.window_frame.is_none() {
- // OVER (ORDER BY a) case
- // We create an implicit window for ORDER BY.
- Some(Arc::new(WindowFrame::default()))
- } else {
- self.window_frame.clone()
- };
let mut row_wise_results = vec![];
for partition_range in &partition_points {
let length = partition_range.end - partition_range.start;
let (values, order_bys) = self
.get_values_orderbys(&batch.slice(partition_range.start,
length))?;
- let mut window_frame_ctx =
WindowFrameContext::new(&window_frame);
+ let mut window_frame_ctx =
WindowFrameContext::new(&self.window_frame);
// We iterate on each row to calculate window frame range and
and window function result
for idx in 0..length {
let range = window_frame_ctx.calculate_range(
diff --git a/datafusion/physical-expr/src/window/window_frame_state.rs
b/datafusion/physical-expr/src/window/window_frame_state.rs
index fc8d7d03a..307ea9144 100644
--- a/datafusion/physical-expr/src/window/window_frame_state.rs
+++ b/datafusion/physical-expr/src/window/window_frame_state.rs
@@ -43,26 +43,21 @@ pub enum WindowFrameContext<'a> {
window_frame: &'a Arc<WindowFrame>,
state: WindowFrameStateGroups,
},
- Default,
}
impl<'a> WindowFrameContext<'a> {
/// Create a new default state for the given window frame.
- pub fn new(window_frame: &'a Option<Arc<WindowFrame>>) -> Self {
- if let Some(window_frame) = window_frame {
- match window_frame.units {
- WindowFrameUnits::Rows =>
WindowFrameContext::Rows(window_frame),
- WindowFrameUnits::Range => WindowFrameContext::Range {
- window_frame,
- state: WindowFrameStateRange::default(),
- },
- WindowFrameUnits::Groups => WindowFrameContext::Groups {
- window_frame,
- state: WindowFrameStateGroups::default(),
- },
- }
- } else {
- WindowFrameContext::Default
+ pub fn new(window_frame: &'a Arc<WindowFrame>) -> Self {
+ match window_frame.units {
+ WindowFrameUnits::Rows => WindowFrameContext::Rows(window_frame),
+ WindowFrameUnits::Range => WindowFrameContext::Range {
+ window_frame,
+ state: WindowFrameStateRange::default(),
+ },
+ WindowFrameUnits::Groups => WindowFrameContext::Groups {
+ window_frame,
+ state: WindowFrameStateGroups::default(),
+ },
}
}
@@ -96,7 +91,6 @@ impl<'a> WindowFrameContext<'a> {
window_frame,
ref mut state,
} => state.calculate_range(window_frame, range_columns, length,
idx),
- WindowFrameContext::Default => Ok((0, length)),
}
}
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 9ec28f223..e529626a6 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -573,9 +573,7 @@ message WindowExprNode {
repeated LogicalExprNode partition_by = 5;
repeated LogicalExprNode order_by = 6;
// repeated LogicalExprNode filter = 7;
- oneof window_frame {
- WindowFrame frame = 8;
- }
+ WindowFrame window_frame = 8;
}
message BetweenNode {
@@ -1366,4 +1364,4 @@ message ExecutorStatus {
string dead = 2;
string unknown = 3;
}
-}
\ No newline at end of file
+}
diff --git a/datafusion/proto/src/from_proto.rs
b/datafusion/proto/src/from_proto.rs
index 95d605d37..935dd4f44 100644
--- a/datafusion/proto/src/from_proto.rs
+++ b/datafusion/proto/src/from_proto.rs
@@ -773,19 +773,17 @@ pub fn parse_expr(
let window_frame = expr
.window_frame
.as_ref()
- .map::<Result<WindowFrame, _>, _>(|e| match e {
- window_expr_node::WindowFrame::Frame(frame) => {
- let window_frame: WindowFrame =
frame.clone().try_into()?;
- if WindowFrameUnits::Range == window_frame.units
- && order_by.len() != 1
- {
- Err(proto_error("With window frame of type RANGE,
the order by expression must be of length 1"))
- } else {
- Ok(window_frame)
- }
+ .map::<Result<WindowFrame, _>, _>(|window_frame| {
+ let window_frame: WindowFrame =
window_frame.clone().try_into()?;
+ if WindowFrameUnits::Range == window_frame.units
+ && order_by.len() != 1
+ {
+ Err(proto_error("With window frame of type RANGE, the
order by expression must be of length 1"))
+ } else {
+ Ok(window_frame)
}
})
- .transpose()?;
+
.transpose()?.ok_or_else(||{DataFusionError::Execution("expects
somothing".to_string())})?;
match window_function {
window_expr_node::WindowFunction::AggrFunction(i) => {
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 22964281a..8c98b76c0 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -21340,10 +21340,10 @@ impl serde::Serialize for WindowExprNode {
if !self.order_by.is_empty() {
len += 1;
}
- if self.window_function.is_some() {
+ if self.window_frame.is_some() {
len += 1;
}
- if self.window_frame.is_some() {
+ if self.window_function.is_some() {
len += 1;
}
let mut struct_ser =
serializer.serialize_struct("datafusion.WindowExprNode", len)?;
@@ -21356,6 +21356,9 @@ impl serde::Serialize for WindowExprNode {
if !self.order_by.is_empty() {
struct_ser.serialize_field("orderBy", &self.order_by)?;
}
+ if let Some(v) = self.window_frame.as_ref() {
+ struct_ser.serialize_field("windowFrame", v)?;
+ }
if let Some(v) = self.window_function.as_ref() {
match v {
window_expr_node::WindowFunction::AggrFunction(v) => {
@@ -21370,13 +21373,6 @@ impl serde::Serialize for WindowExprNode {
}
}
}
- if let Some(v) = self.window_frame.as_ref() {
- match v {
- window_expr_node::WindowFrame::Frame(v) => {
- struct_ser.serialize_field("frame", v)?;
- }
- }
- }
struct_ser.end()
}
}
@@ -21392,11 +21388,12 @@ impl<'de> serde::Deserialize<'de> for WindowExprNode {
"partitionBy",
"order_by",
"orderBy",
+ "window_frame",
+ "windowFrame",
"aggr_function",
"aggrFunction",
"built_in_function",
"builtInFunction",
- "frame",
];
#[allow(clippy::enum_variant_names)]
@@ -21404,9 +21401,9 @@ impl<'de> serde::Deserialize<'de> for WindowExprNode {
Expr,
PartitionBy,
OrderBy,
+ WindowFrame,
AggrFunction,
BuiltInFunction,
- Frame,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -21431,9 +21428,9 @@ impl<'de> serde::Deserialize<'de> for WindowExprNode {
"expr" => Ok(GeneratedField::Expr),
"partitionBy" | "partition_by" =>
Ok(GeneratedField::PartitionBy),
"orderBy" | "order_by" =>
Ok(GeneratedField::OrderBy),
+ "windowFrame" | "window_frame" =>
Ok(GeneratedField::WindowFrame),
"aggrFunction" | "aggr_function" =>
Ok(GeneratedField::AggrFunction),
"builtInFunction" | "built_in_function" =>
Ok(GeneratedField::BuiltInFunction),
- "frame" => Ok(GeneratedField::Frame),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -21456,8 +21453,8 @@ impl<'de> serde::Deserialize<'de> for WindowExprNode {
let mut expr__ = None;
let mut partition_by__ = None;
let mut order_by__ = None;
- let mut window_function__ = None;
let mut window_frame__ = None;
+ let mut window_function__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::Expr => {
@@ -21478,6 +21475,12 @@ impl<'de> serde::Deserialize<'de> for WindowExprNode {
}
order_by__ = Some(map.next_value()?);
}
+ GeneratedField::WindowFrame => {
+ if window_frame__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("windowFrame"));
+ }
+ window_frame__ = map.next_value()?;
+ }
GeneratedField::AggrFunction => {
if window_function__.is_some() {
return
Err(serde::de::Error::duplicate_field("aggrFunction"));
@@ -21490,21 +21493,14 @@ impl<'de> serde::Deserialize<'de> for WindowExprNode {
}
window_function__ =
map.next_value::<::std::option::Option<BuiltInWindowFunction>>()?.map(|x|
window_expr_node::WindowFunction::BuiltInFunction(x as i32));
}
- GeneratedField::Frame => {
- if window_frame__.is_some() {
- return
Err(serde::de::Error::duplicate_field("frame"));
- }
- window_frame__ =
map.next_value::<::std::option::Option<_>>()?.map(window_expr_node::WindowFrame::Frame)
-;
- }
}
}
Ok(WindowExprNode {
expr: expr__,
partition_by: partition_by__.unwrap_or_default(),
order_by: order_by__.unwrap_or_default(),
- window_function: window_function__,
window_frame: window_frame__,
+ window_function: window_function__,
})
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 6350e20dc..ea281b717 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -635,11 +635,11 @@ pub struct WindowExprNode {
pub partition_by: ::prost::alloc::vec::Vec<LogicalExprNode>,
#[prost(message, repeated, tag = "6")]
pub order_by: ::prost::alloc::vec::Vec<LogicalExprNode>,
+ /// repeated LogicalExprNode filter = 7;
+ #[prost(message, optional, tag = "8")]
+ pub window_frame: ::core::option::Option<WindowFrame>,
#[prost(oneof = "window_expr_node::WindowFunction", tags = "1, 2")]
pub window_function:
::core::option::Option<window_expr_node::WindowFunction>,
- /// repeated LogicalExprNode filter = 7;
- #[prost(oneof = "window_expr_node::WindowFrame", tags = "8")]
- pub window_frame: ::core::option::Option<window_expr_node::WindowFrame>,
}
/// Nested message and enum types in `WindowExprNode`.
pub mod window_expr_node {
@@ -651,12 +651,6 @@ pub mod window_expr_node {
#[prost(enumeration = "super::BuiltInWindowFunction", tag = "2")]
BuiltInFunction(i32),
}
- /// repeated LogicalExprNode filter = 7;
- #[derive(Clone, PartialEq, ::prost::Oneof)]
- pub enum WindowFrame {
- #[prost(message, tag = "8")]
- Frame(super::WindowFrame),
- }
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct BetweenNode {
diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs
index 12c2a5e78..7847ae068 100644
--- a/datafusion/proto/src/lib.rs
+++ b/datafusion/proto/src/lib.rs
@@ -1345,7 +1345,7 @@ mod roundtrip_tests {
args: vec![],
partition_by: vec![col("col1")],
order_by: vec![col("col2")],
- window_frame: None,
+ window_frame: WindowFrame::new(true),
};
// 2. with default window_frame
@@ -1356,7 +1356,7 @@ mod roundtrip_tests {
args: vec![],
partition_by: vec![col("col1")],
order_by: vec![col("col2")],
- window_frame: Some(WindowFrame::default()),
+ window_frame: WindowFrame::new(true),
};
// 3. with window_frame with row numbers
@@ -1373,7 +1373,7 @@ mod roundtrip_tests {
args: vec![],
partition_by: vec![col("col1")],
order_by: vec![col("col2")],
- window_frame: Some(range_number_frame),
+ window_frame: range_number_frame,
};
// 4. test with AggregateFunction
@@ -1388,7 +1388,7 @@ mod roundtrip_tests {
args: vec![col("col1")],
partition_by: vec![col("col1")],
order_by: vec![col("col2")],
- window_frame: Some(row_number_frame),
+ window_frame: row_number_frame,
};
roundtrip_expr_test(test_expr1, ctx.clone());
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index 34db3c3cd..ee42c304e 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -308,7 +308,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
&[window_node_expr],
&[],
&[],
- Some(Arc::new(WindowFrame::default())),
+ Arc::new(WindowFrame::new(false)),
&physical_schema,
)?)
}
diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs
index 92e4214b8..8dc72a1f2 100644
--- a/datafusion/proto/src/to_proto.rs
+++ b/datafusion/proto/src/to_proto.rs
@@ -562,12 +562,7 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
.map(|e| e.try_into())
.collect::<Result<Vec<_>, _>>()?;
- let window_frame = match window_frame {
- Some(frame) => Some(
-
protobuf::window_expr_node::WindowFrame::Frame(frame.try_into()?)
- ),
- None => None
- };
+ let window_frame: Option<protobuf::WindowFrame> =
Some(window_frame.try_into()?);
let window_expr = Box::new(protobuf::WindowExprNode {
expr: arg_expr,
window_function: Some(window_function),
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index c2ea7001b..a126e8ac8 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -2235,6 +2235,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
})
.transpose()?;
+ let window_frame = if let Some(window_frame) =
window_frame {
+ window_frame
+ } else {
+ WindowFrame::new(!order_by.is_empty())
+ };
let fun = WindowFunction::from_str(&name)?;
match fun {
WindowFunction::AggregateFunction(
@@ -4778,8 +4783,8 @@ mod tests {
fn empty_over() {
let sql = "SELECT order_id, MAX(order_id) OVER () from orders";
let expected = "\
- Projection: orders.order_id, MAX(orders.order_id)\
- \n WindowAggr: windowExpr=[[MAX(orders.order_id)]]\
+ Projection: orders.order_id, MAX(orders.order_id) ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\
+ \n WindowAggr: windowExpr=[[MAX(orders.order_id) ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
\n TableScan: orders";
quick_test(sql, expected);
}
@@ -4788,8 +4793,8 @@ mod tests {
fn empty_over_with_alias() {
let sql = "SELECT order_id oid, MAX(order_id) OVER () max_oid from
orders";
let expected = "\
- Projection: orders.order_id AS oid, MAX(orders.order_id) AS max_oid\
- \n WindowAggr: windowExpr=[[MAX(orders.order_id)]]\
+ Projection: orders.order_id AS oid, MAX(orders.order_id) ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS max_oid\
+ \n WindowAggr: windowExpr=[[MAX(orders.order_id) ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
\n TableScan: orders";
quick_test(sql, expected);
}
@@ -4798,8 +4803,8 @@ mod tests {
fn empty_over_dup_with_alias() {
let sql = "SELECT order_id oid, MAX(order_id) OVER () max_oid,
MAX(order_id) OVER () max_oid_dup from orders";
let expected = "\
- Projection: orders.order_id AS oid, MAX(orders.order_id) AS max_oid,
MAX(orders.order_id) AS max_oid_dup\
- \n WindowAggr: windowExpr=[[MAX(orders.order_id)]]\
+ Projection: orders.order_id AS oid, MAX(orders.order_id) ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS max_oid, MAX(orders.order_id)
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS max_oid_dup\
+ \n WindowAggr: windowExpr=[[MAX(orders.order_id) ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
\n TableScan: orders";
quick_test(sql, expected);
}
@@ -4808,9 +4813,9 @@ mod tests {
fn empty_over_dup_with_different_sort() {
let sql = "SELECT order_id oid, MAX(order_id) OVER (), MAX(order_id)
OVER (ORDER BY order_id) from orders";
let expected = "\
- Projection: orders.order_id AS oid, MAX(orders.order_id),
MAX(orders.order_id) ORDER BY [orders.order_id ASC NULLS LAST]\
- \n WindowAggr: windowExpr=[[MAX(orders.order_id)]]\
- \n WindowAggr: windowExpr=[[MAX(orders.order_id) ORDER BY
[orders.order_id ASC NULLS LAST]]]\
+ Projection: orders.order_id AS oid, MAX(orders.order_id) ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, MAX(orders.order_id) ORDER BY
[orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT
ROW\
+ \n WindowAggr: windowExpr=[[MAX(orders.order_id) ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
+ \n WindowAggr: windowExpr=[[MAX(orders.order_id) ORDER BY
[orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT
ROW]]\
\n TableScan: orders";
quick_test(sql, expected);
}
@@ -4819,8 +4824,8 @@ mod tests {
fn empty_over_plus() {
let sql = "SELECT order_id, MAX(qty * 1.1) OVER () from orders";
let expected = "\
- Projection: orders.order_id, MAX(orders.qty * Float64(1.1))\
- \n WindowAggr: windowExpr=[[MAX(orders.qty * Float64(1.1))]]\
+ Projection: orders.order_id, MAX(orders.qty * Float64(1.1)) ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\
+ \n WindowAggr: windowExpr=[[MAX(orders.qty * Float64(1.1)) ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
\n TableScan: orders";
quick_test(sql, expected);
}
@@ -4830,8 +4835,8 @@ mod tests {
let sql =
"SELECT order_id, MAX(qty) OVER (), min(qty) over (), aVg(qty)
OVER () from orders";
let expected = "\
- Projection: orders.order_id, MAX(orders.qty), MIN(orders.qty),
AVG(orders.qty)\
- \n WindowAggr: windowExpr=[[MAX(orders.qty), MIN(orders.qty),
AVG(orders.qty)]]\
+ Projection: orders.order_id, MAX(orders.qty) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING, MIN(orders.qty) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING, AVG(orders.qty) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING\
+ \n WindowAggr: windowExpr=[[MAX(orders.qty) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING, MIN(orders.qty) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING, AVG(orders.qty) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING]]\
\n TableScan: orders";
quick_test(sql, expected);
}
@@ -4849,8 +4854,8 @@ mod tests {
fn over_partition_by() {
let sql = "SELECT order_id, MAX(qty) OVER (PARTITION BY order_id) from
orders";
let expected = "\
- Projection: orders.order_id, MAX(orders.qty) PARTITION BY
[orders.order_id]\
- \n WindowAggr: windowExpr=[[MAX(orders.qty) PARTITION BY
[orders.order_id]]]\
+ Projection: orders.order_id, MAX(orders.qty) PARTITION BY
[orders.order_id] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\
+ \n WindowAggr: windowExpr=[[MAX(orders.qty) PARTITION BY
[orders.order_id] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
\n TableScan: orders";
quick_test(sql, expected);
}
@@ -4871,9 +4876,9 @@ mod tests {
fn over_order_by() {
let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY order_id),
MIN(qty) OVER (ORDER BY order_id DESC) from orders";
let expected = "\
- Projection: orders.order_id, MAX(orders.qty) ORDER BY [orders.order_id
ASC NULLS LAST], MIN(orders.qty) ORDER BY [orders.order_id DESC NULLS FIRST]\
- \n WindowAggr: windowExpr=[[MAX(orders.qty) ORDER BY [orders.order_id
ASC NULLS LAST]]]\
- \n WindowAggr: windowExpr=[[MIN(orders.qty) ORDER BY
[orders.order_id DESC NULLS FIRST]]]\
+ Projection: orders.order_id, MAX(orders.qty) ORDER BY [orders.order_id
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW,
MIN(orders.qty) ORDER BY [orders.order_id DESC NULLS FIRST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW\
+ \n WindowAggr: windowExpr=[[MAX(orders.qty) ORDER BY [orders.order_id
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
+ \n WindowAggr: windowExpr=[[MIN(orders.qty) ORDER BY
[orders.order_id DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]]\
\n TableScan: orders";
quick_test(sql, expected);
}
@@ -4882,9 +4887,9 @@ mod tests {
fn over_order_by_with_window_frame_double_end() {
let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY order_id ROWS
BETWEEN 3 PRECEDING and 3 FOLLOWING), MIN(qty) OVER (ORDER BY order_id DESC)
from orders";
let expected = "\
- Projection: orders.order_id, MAX(orders.qty) ORDER BY [orders.order_id
ASC NULLS LAST] ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING, MIN(orders.qty) ORDER
BY [orders.order_id DESC NULLS FIRST]\
+ Projection: orders.order_id, MAX(orders.qty) ORDER BY [orders.order_id
ASC NULLS LAST] ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING, MIN(orders.qty) ORDER
BY [orders.order_id DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW\
\n WindowAggr: windowExpr=[[MAX(orders.qty) ORDER BY [orders.order_id
ASC NULLS LAST] ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING]]\
- \n WindowAggr: windowExpr=[[MIN(orders.qty) ORDER BY
[orders.order_id DESC NULLS FIRST]]]\
+ \n WindowAggr: windowExpr=[[MIN(orders.qty) ORDER BY
[orders.order_id DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]]\
\n TableScan: orders";
quick_test(sql, expected);
}
@@ -4893,9 +4898,9 @@ mod tests {
fn over_order_by_with_window_frame_single_end() {
let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY order_id ROWS 3
PRECEDING), MIN(qty) OVER (ORDER BY order_id DESC) from orders";
let expected = "\
- Projection: orders.order_id, MAX(orders.qty) ORDER BY [orders.order_id
ASC NULLS LAST] ROWS BETWEEN 3 PRECEDING AND CURRENT ROW, MIN(orders.qty) ORDER
BY [orders.order_id DESC NULLS FIRST]\
+ Projection: orders.order_id, MAX(orders.qty) ORDER BY [orders.order_id
ASC NULLS LAST] ROWS BETWEEN 3 PRECEDING AND CURRENT ROW, MIN(orders.qty) ORDER
BY [orders.order_id DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW\
\n WindowAggr: windowExpr=[[MAX(orders.qty) ORDER BY [orders.order_id
ASC NULLS LAST] ROWS BETWEEN 3 PRECEDING AND CURRENT ROW]]\
- \n WindowAggr: windowExpr=[[MIN(orders.qty) ORDER BY
[orders.order_id DESC NULLS FIRST]]]\
+ \n WindowAggr: windowExpr=[[MIN(orders.qty) ORDER BY
[orders.order_id DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]]\
\n TableScan: orders";
quick_test(sql, expected);
}
@@ -4926,9 +4931,9 @@ mod tests {
fn over_order_by_with_window_frame_single_end_groups() {
let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY order_id GROUPS 3
PRECEDING), MIN(qty) OVER (ORDER BY order_id DESC) from orders";
let expected = "\
- Projection: orders.order_id, MAX(orders.qty) ORDER BY [orders.order_id
ASC NULLS LAST] GROUPS BETWEEN 3 PRECEDING AND CURRENT ROW, MIN(orders.qty)
ORDER BY [orders.order_id DESC NULLS FIRST]\
+ Projection: orders.order_id, MAX(orders.qty) ORDER BY [orders.order_id
ASC NULLS LAST] GROUPS BETWEEN 3 PRECEDING AND CURRENT ROW, MIN(orders.qty)
ORDER BY [orders.order_id DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW\
\n WindowAggr: windowExpr=[[MAX(orders.qty) ORDER BY [orders.order_id
ASC NULLS LAST] GROUPS BETWEEN 3 PRECEDING AND CURRENT ROW]]\
- \n WindowAggr: windowExpr=[[MIN(orders.qty) ORDER BY
[orders.order_id DESC NULLS FIRST]]]\
+ \n WindowAggr: windowExpr=[[MIN(orders.qty) ORDER BY
[orders.order_id DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]]\
\n TableScan: orders";
quick_test(sql, expected);
}
@@ -4949,9 +4954,9 @@ mod tests {
fn over_order_by_two_sort_keys() {
let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY order_id),
MIN(qty) OVER (ORDER BY (order_id + 1)) from orders";
let expected = "\
- Projection: orders.order_id, MAX(orders.qty) ORDER BY [orders.order_id
ASC NULLS LAST], MIN(orders.qty) ORDER BY [orders.order_id + Int64(1) ASC NULLS
LAST]\
- \n WindowAggr: windowExpr=[[MAX(orders.qty) ORDER BY [orders.order_id
ASC NULLS LAST]]]\
- \n WindowAggr: windowExpr=[[MIN(orders.qty) ORDER BY
[orders.order_id + Int64(1) ASC NULLS LAST]]]\
+ Projection: orders.order_id, MAX(orders.qty) ORDER BY [orders.order_id
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW,
MIN(orders.qty) ORDER BY [orders.order_id + Int64(1) ASC NULLS LAST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\
+ \n WindowAggr: windowExpr=[[MAX(orders.qty) ORDER BY [orders.order_id
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
+ \n WindowAggr: windowExpr=[[MIN(orders.qty) ORDER BY
[orders.order_id + Int64(1) ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]\
\n TableScan: orders";
quick_test(sql, expected);
}
@@ -4973,10 +4978,10 @@ mod tests {
fn over_order_by_sort_keys_sorting() {
let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY qty, order_id),
SUM(qty) OVER (), MIN(qty) OVER (ORDER BY order_id, qty) from orders";
let expected = "\
- Projection: orders.order_id, MAX(orders.qty) ORDER BY [orders.qty ASC
NULLS LAST, orders.order_id ASC NULLS LAST], SUM(orders.qty), MIN(orders.qty)
ORDER BY [orders.order_id ASC NULLS LAST, orders.qty ASC NULLS LAST]\
- \n WindowAggr: windowExpr=[[SUM(orders.qty)]]\
- \n WindowAggr: windowExpr=[[MAX(orders.qty) ORDER BY [orders.qty
ASC NULLS LAST, orders.order_id ASC NULLS LAST]]]\
- \n WindowAggr: windowExpr=[[MIN(orders.qty) ORDER BY
[orders.order_id ASC NULLS LAST, orders.qty ASC NULLS LAST]]]\
+ Projection: orders.order_id, MAX(orders.qty) ORDER BY [orders.qty ASC
NULLS LAST, orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW, SUM(orders.qty) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING, MIN(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST, orders.qty
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\
+ \n WindowAggr: windowExpr=[[SUM(orders.qty) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING]]\
+ \n WindowAggr: windowExpr=[[MAX(orders.qty) ORDER BY [orders.qty
ASC NULLS LAST, orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW]]\
+ \n WindowAggr: windowExpr=[[MIN(orders.qty) ORDER BY
[orders.order_id ASC NULLS LAST, orders.qty ASC NULLS LAST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: orders";
quick_test(sql, expected);
}
@@ -4998,10 +5003,10 @@ mod tests {
fn over_order_by_sort_keys_sorting_prefix_compacting() {
let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY order_id),
SUM(qty) OVER (), MIN(qty) OVER (ORDER BY order_id, qty) from orders";
let expected = "\
- Projection: orders.order_id, MAX(orders.qty) ORDER BY [orders.order_id
ASC NULLS LAST], SUM(orders.qty), MIN(orders.qty) ORDER BY [orders.order_id ASC
NULLS LAST, orders.qty ASC NULLS LAST]\
- \n WindowAggr: windowExpr=[[SUM(orders.qty)]]\
- \n WindowAggr: windowExpr=[[MAX(orders.qty) ORDER BY
[orders.order_id ASC NULLS LAST]]]\
- \n WindowAggr: windowExpr=[[MIN(orders.qty) ORDER BY
[orders.order_id ASC NULLS LAST, orders.qty ASC NULLS LAST]]]\
+ Projection: orders.order_id, MAX(orders.qty) ORDER BY [orders.order_id
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW,
SUM(orders.qty) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING,
MIN(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST, orders.qty ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\
+ \n WindowAggr: windowExpr=[[SUM(orders.qty) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING]]\
+ \n WindowAggr: windowExpr=[[MAX(orders.qty) ORDER BY
[orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT
ROW]]\
+ \n WindowAggr: windowExpr=[[MIN(orders.qty) ORDER BY
[orders.order_id ASC NULLS LAST, orders.qty ASC NULLS LAST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: orders";
quick_test(sql, expected);
}
@@ -5027,10 +5032,10 @@ mod tests {
let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY qty, order_id),
SUM(qty) OVER (), MIN(qty) OVER (ORDER BY order_id, qty) from orders ORDER BY
order_id";
let expected = "\
Sort: orders.order_id ASC NULLS LAST\
- \n Projection: orders.order_id, MAX(orders.qty) ORDER BY [orders.qty
ASC NULLS LAST, orders.order_id ASC NULLS LAST], SUM(orders.qty),
MIN(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST, orders.qty ASC NULLS
LAST]\
- \n WindowAggr: windowExpr=[[SUM(orders.qty)]]\
- \n WindowAggr: windowExpr=[[MAX(orders.qty) ORDER BY [orders.qty
ASC NULLS LAST, orders.order_id ASC NULLS LAST]]]\
- \n WindowAggr: windowExpr=[[MIN(orders.qty) ORDER BY
[orders.order_id ASC NULLS LAST, orders.qty ASC NULLS LAST]]]\
+ \n Projection: orders.order_id, MAX(orders.qty) ORDER BY [orders.qty
ASC NULLS LAST, orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW, SUM(orders.qty) ROWS BETWEEN UNBOUNDED PRECEDING AND
UNBOUNDED FOLLOWING, MIN(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST,
orders.qty ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\
+ \n WindowAggr: windowExpr=[[SUM(orders.qty) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING]]\
+ \n WindowAggr: windowExpr=[[MAX(orders.qty) ORDER BY [orders.qty
ASC NULLS LAST, orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW]]\
+ \n WindowAggr: windowExpr=[[MIN(orders.qty) ORDER BY
[orders.order_id ASC NULLS LAST, orders.qty ASC NULLS LAST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: orders";
quick_test(sql, expected);
}
@@ -5049,8 +5054,8 @@ mod tests {
let sql =
"SELECT order_id, MAX(qty) OVER (PARTITION BY order_id ORDER BY
qty) from orders";
let expected = "\
- Projection: orders.order_id, MAX(orders.qty) PARTITION BY
[orders.order_id] ORDER BY [orders.qty ASC NULLS LAST]\
- \n WindowAggr: windowExpr=[[MAX(orders.qty) PARTITION BY
[orders.order_id] ORDER BY [orders.qty ASC NULLS LAST]]]\
+ Projection: orders.order_id, MAX(orders.qty) PARTITION BY
[orders.order_id] ORDER BY [orders.qty ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW\
+ \n WindowAggr: windowExpr=[[MAX(orders.qty) PARTITION BY
[orders.order_id] ORDER BY [orders.qty ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW]]\
\n TableScan: orders";
quick_test(sql, expected);
}
@@ -5069,8 +5074,8 @@ mod tests {
let sql =
"SELECT order_id, MAX(qty) OVER (PARTITION BY order_id, qty ORDER
BY qty) from orders";
let expected = "\
- Projection: orders.order_id, MAX(orders.qty) PARTITION BY
[orders.order_id, orders.qty] ORDER BY [orders.qty ASC NULLS LAST]\
- \n WindowAggr: windowExpr=[[MAX(orders.qty) PARTITION BY
[orders.order_id, orders.qty] ORDER BY [orders.qty ASC NULLS LAST]]]\
+ Projection: orders.order_id, MAX(orders.qty) PARTITION BY
[orders.order_id, orders.qty] ORDER BY [orders.qty ASC NULLS LAST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\
+ \n WindowAggr: windowExpr=[[MAX(orders.qty) PARTITION BY
[orders.order_id, orders.qty] ORDER BY [orders.qty ASC NULLS LAST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: orders";
quick_test(sql, expected);
}
@@ -5092,9 +5097,9 @@ mod tests {
let sql =
"SELECT order_id, MAX(qty) OVER (PARTITION BY order_id, qty ORDER
BY qty), MIN(qty) OVER (PARTITION BY qty ORDER BY order_id) from orders";
let expected = "\
- Projection: orders.order_id, MAX(orders.qty) PARTITION BY
[orders.order_id, orders.qty] ORDER BY [orders.qty ASC NULLS LAST],
MIN(orders.qty) PARTITION BY [orders.qty] ORDER BY [orders.order_id ASC NULLS
LAST]\
- \n WindowAggr: windowExpr=[[MIN(orders.qty) PARTITION BY [orders.qty]
ORDER BY [orders.order_id ASC NULLS LAST]]]\
- \n WindowAggr: windowExpr=[[MAX(orders.qty) PARTITION BY
[orders.order_id, orders.qty] ORDER BY [orders.qty ASC NULLS LAST]]]\
+ Projection: orders.order_id, MAX(orders.qty) PARTITION BY
[orders.order_id, orders.qty] ORDER BY [orders.qty ASC NULLS LAST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, MIN(orders.qty) PARTITION BY
[orders.qty] ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW\
+ \n WindowAggr: windowExpr=[[MIN(orders.qty) PARTITION BY [orders.qty]
ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]]\
+ \n WindowAggr: windowExpr=[[MAX(orders.qty) PARTITION BY
[orders.order_id, orders.qty] ORDER BY [orders.qty ASC NULLS LAST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: orders";
quick_test(sql, expected);
}
@@ -5115,9 +5120,9 @@ mod tests {
let sql =
"SELECT order_id, MAX(qty) OVER (PARTITION BY order_id ORDER BY
qty), MIN(qty) OVER (PARTITION BY order_id, qty ORDER BY price) from orders";
let expected = "\
- Projection: orders.order_id, MAX(orders.qty) PARTITION BY
[orders.order_id] ORDER BY [orders.qty ASC NULLS LAST], MIN(orders.qty)
PARTITION BY [orders.order_id, orders.qty] ORDER BY [orders.price ASC NULLS
LAST]\
- \n WindowAggr: windowExpr=[[MAX(orders.qty) PARTITION BY
[orders.order_id] ORDER BY [orders.qty ASC NULLS LAST]]]\
- \n WindowAggr: windowExpr=[[MIN(orders.qty) PARTITION BY
[orders.order_id, orders.qty] ORDER BY [orders.price ASC NULLS LAST]]]\
+ Projection: orders.order_id, MAX(orders.qty) PARTITION BY
[orders.order_id] ORDER BY [orders.qty ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW, MIN(orders.qty) PARTITION BY [orders.order_id,
orders.qty] ORDER BY [orders.price ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW\
+ \n WindowAggr: windowExpr=[[MAX(orders.qty) PARTITION BY
[orders.order_id] ORDER BY [orders.qty ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW]]\
+ \n WindowAggr: windowExpr=[[MIN(orders.qty) PARTITION BY
[orders.order_id, orders.qty] ORDER BY [orders.price ASC NULLS LAST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: orders";
quick_test(sql, expected);
}
@@ -5127,8 +5132,8 @@ mod tests {
let sql =
"SELECT order_id, APPROX_MEDIAN(qty) OVER(PARTITION BY order_id)
from orders";
let expected = "\
- Projection: orders.order_id, APPROXMEDIAN(orders.qty) PARTITION BY
[orders.order_id]\
- \n WindowAggr: windowExpr=[[APPROXMEDIAN(orders.qty) PARTITION BY
[orders.order_id]]]\
+ Projection: orders.order_id, APPROXMEDIAN(orders.qty) PARTITION BY
[orders.order_id] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\
+ \n WindowAggr: windowExpr=[[APPROXMEDIAN(orders.qty) PARTITION BY
[orders.order_id] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
\n TableScan: orders";
quick_test(sql, expected);
}
@@ -5616,8 +5621,8 @@ mod tests {
from
person
group by rollup(state, last_name)";
- let expected = "Projection: SUM(person.age) AS total_sum,
person.state, person.last_name, GROUPING(person.state) +
GROUPING(person.last_name) AS x, RANK() PARTITION BY [GROUPING(person.state) +
GROUPING(person.last_name), CASE WHEN GROUPING(person.last_name) = Int64(0)
THEN person.state END] ORDER BY [SUM(person.age) DESC NULLS FIRST] AS the_rank\
- \n WindowAggr: windowExpr=[[RANK() PARTITION BY
[GROUPING(person.state) + GROUPING(person.last_name), CASE WHEN
GROUPING(person.last_name) = Int64(0) THEN person.state END] ORDER BY
[SUM(person.age) DESC NULLS FIRST]]]\
+ let expected = "Projection: SUM(person.age) AS total_sum,
person.state, person.last_name, GROUPING(person.state) +
GROUPING(person.last_name) AS x, RANK() PARTITION BY [GROUPING(person.state) +
GROUPING(person.last_name), CASE WHEN GROUPING(person.last_name) = Int64(0)
THEN person.state END] ORDER BY [SUM(person.age) DESC NULLS FIRST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS the_rank\
+ \n WindowAggr: windowExpr=[[RANK() PARTITION BY
[GROUPING(person.state) + GROUPING(person.last_name), CASE WHEN
GROUPING(person.last_name) = Int64(0) THEN person.state END] ORDER BY
[SUM(person.age) DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]]\
\n Aggregate: groupBy=[[ROLLUP (person.state, person.last_name)]],
aggr=[[SUM(person.age), GROUPING(person.state), GROUPING(person.last_name)]]\
\n TableScan: person";
quick_test(sql, expected);