This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new efe5708978 Convert `BuiltInWindowFunction::{Lead, Lag}` to a user
defined window function (#12857)
efe5708978 is described below
commit efe5708978a480d11d5406a7d7df76d73e15c5d7
Author: jcsherin <[email protected]>
AuthorDate: Fri Oct 18 16:56:41 2024 +0530
Convert `BuiltInWindowFunction::{Lead, Lag}` to a user defined window
function (#12857)
* Move `lead-lag` to `functions-window` package
* Builds with warnings
* Adds `PartitionEvaluatorArgs`
* Extracts `shift_offset` from input expressions
* Computes shift offset
* Get default value from input expression
* Implements `partition_evaluator`
* Fixes compiler warnings
* Comments out failing tests
* Fixes `cargo test` errors and warnings
* Minor: taplo formatting
* Delete code
* Define `lead`, `lag` user-defined window functions
* Fixes `cargo build` errors
* Export udwf and expression public APIs
* Mark result field as nullable
* Delete `return_type` tests for `lead` and `lag`
* Disables test: window function case insensitive
* Fixes: lowercase name in logical plan
* Reverts to old methods for computing `shift_offset`, `default_value`
* Implements expression reversal
* Fixes: lowercase name in logical plans
* Fixes: doc test compilation errors
Fixes: doc test build errors
* Temporarily quite clippy errors
* Fixes proto defintion
* Minor: fixes formatting
* Fixes: doc tests
* Uses macro for defining `lag_udwf()` and `leag_udwf()`
* Fixes: window fuzz test cases
* Copies doc comments verbatim from `BuiltInWindowFunction` enum
* Deletes from window function case insensitive test
* Deletes `BuiltInWindowFunction` expression APIs
* Delete from `create_built_in_window_expr`
* Deletes proto serialization
* Delete from `BuiltInWindowFunction` enum
* Deletes test for finding built-in window function
* Fixes build errors + deletes redundant code
* Deletes more code
* Delete unnecessary structs
* Refactors shift offset computation
* Passes range unit test
* Fixes: clippy::get-first error
* Rewrite unit tests for WindowUDF
* Fixes: unit test for lag with default value
* Consistent input expressions and data types in unit tests
* Minor: fixes formatting
* Restore original helper method for unit tests
* Revert "Refactors shift offset computation"
This reverts commit 000ceb76409e66230f9c5017a30fa3c9bb1e6575.
* Moves helper functions into `functions-window-common` package
* Uses common helper functions in `{lead, lag}`
* Minor: formatting
* Revert "Moves helper functions into `functions-window-common` package"
This reverts commit ab8a83c9c11ca3a245278f6f300438feaacb0978.
* Moves common functions to utils
* Minor: formatting fixes
* Update lowercase names in explain output
* Adds doc for `lead()` and `lag()` expression functions
* Add doc for `WindowShiftKind::shift_offset`
* Remove `arrow` dev dependency
* Minor: formatting
* Update inner doc comment
* Serialize 1 or more window function arguments
* Adds logical plan roundtrip test cases
* Refactor: readability of unit tests
* Minor: rename variable bindings
* Minor: copy edit
* Revert "Remove `arrow` dev dependency"
This reverts commit 3eb09856c8ec4ddce20472deee2df590c2fd3f35.
* Move null argument handling helper to utils
* Disable failing sqllogic tests for handling NULL input
* Revert "Disable failing sqllogic tests for handling NULL input"
This reverts commit 270a2030637012d549c001e973a0a1bb6b3d4dd0.
* Fixes: incorrect NULL handling in `lead`/`lag` window function
* Adds more tests cases
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion-cli/Cargo.lock | 1 +
datafusion/core/tests/fuzz_cases/window_fuzz.rs | 13 +-
datafusion/expr/src/built_in_window_function.rs | 32 +-
datafusion/expr/src/expr.rs | 38 --
datafusion/expr/src/udwf.rs | 23 ++
datafusion/expr/src/window_function.rs | 34 --
datafusion/functions-window-common/src/expr.rs | 64 ++++
datafusion/functions-window-common/src/lib.rs | 1 +
datafusion/functions-window/Cargo.toml | 1 +
.../window => functions-window/src}/lead_lag.rs | 392 ++++++++++++++-------
datafusion/functions-window/src/lib.rs | 8 +
datafusion/functions-window/src/utils.rs | 53 +++
datafusion/physical-expr/src/expressions/mod.rs | 1 -
datafusion/physical-expr/src/window/mod.rs | 1 -
datafusion/physical-plan/src/windows/mod.rs | 88 +----
datafusion/proto/proto/datafusion.proto | 6 +-
datafusion/proto/src/generated/pbjson.rs | 30 +-
datafusion/proto/src/generated/prost.rs | 14 +-
datafusion/proto/src/logical_plan/from_proto.rs | 17 +-
datafusion/proto/src/logical_plan/to_proto.rs | 14 +-
datafusion/proto/src/physical_plan/to_proto.rs | 20 --
.../proto/tests/cases/roundtrip_logical_plan.rs | 12 +-
datafusion/sqllogictest/test_files/union.slt | 8 +-
datafusion/sqllogictest/test_files/window.slt | 56 ++-
24 files changed, 520 insertions(+), 407 deletions(-)
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index aa64e14fca..dfd07a7658 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1445,6 +1445,7 @@ dependencies = [
"datafusion-common",
"datafusion-expr",
"datafusion-functions-window-common",
+ "datafusion-physical-expr",
"datafusion-physical-expr-common",
"log",
"paste",
diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
index 4a33334770..d649919f1b 100644
--- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
@@ -45,6 +45,7 @@ use datafusion_physical_expr::{PhysicalExpr,
PhysicalSortExpr};
use test_utils::add_empty_batches;
use datafusion::functions_window::row_number::row_number_udwf;
+use datafusion_functions_window::lead_lag::{lag_udwf, lead_udwf};
use datafusion_functions_window::rank::{dense_rank_udwf, rank_udwf};
use hashbrown::HashMap;
use rand::distributions::Alphanumeric;
@@ -197,7 +198,7 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
// )
(
// Window function
-
WindowFunctionDefinition::BuiltInWindowFunction(BuiltInWindowFunction::Lag),
+ WindowFunctionDefinition::WindowUDF(lag_udwf()),
// its name
"LAG",
// no argument
@@ -211,7 +212,7 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
// )
(
// Window function
-
WindowFunctionDefinition::BuiltInWindowFunction(BuiltInWindowFunction::Lead),
+ WindowFunctionDefinition::WindowUDF(lead_udwf()),
// its name
"LEAD",
// no argument
@@ -393,9 +394,7 @@ fn get_random_function(
window_fn_map.insert(
"lead",
(
- WindowFunctionDefinition::BuiltInWindowFunction(
- BuiltInWindowFunction::Lead,
- ),
+ WindowFunctionDefinition::WindowUDF(lead_udwf()),
vec![
arg.clone(),
lit(ScalarValue::Int64(Some(rng.gen_range(1..10)))),
@@ -406,9 +405,7 @@ fn get_random_function(
window_fn_map.insert(
"lag",
(
- WindowFunctionDefinition::BuiltInWindowFunction(
- BuiltInWindowFunction::Lag,
- ),
+ WindowFunctionDefinition::WindowUDF(lag_udwf()),
vec![
arg.clone(),
lit(ScalarValue::Int64(Some(rng.gen_range(1..10)))),
diff --git a/datafusion/expr/src/built_in_window_function.rs
b/datafusion/expr/src/built_in_window_function.rs
index 6a30080fb3..2c70a07a4e 100644
--- a/datafusion/expr/src/built_in_window_function.rs
+++ b/datafusion/expr/src/built_in_window_function.rs
@@ -22,7 +22,7 @@ use std::str::FromStr;
use crate::type_coercion::functions::data_types;
use crate::utils;
-use crate::{Signature, TypeSignature, Volatility};
+use crate::{Signature, Volatility};
use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError,
Result};
use arrow::datatypes::DataType;
@@ -44,17 +44,7 @@ pub enum BuiltInWindowFunction {
CumeDist,
/// Integer ranging from 1 to the argument value, dividing the partition
as equally as possible
Ntile,
- /// Returns value evaluated at the row that is offset rows before the
current row within the partition;
- /// If there is no such row, instead return default (which must be of the
same type as value).
- /// Both offset and default are evaluated with respect to the current row.
- /// If omitted, offset defaults to 1 and default to null
- Lag,
- /// Returns value evaluated at the row that is offset rows after the
current row within the partition;
- /// If there is no such row, instead return default (which must be of the
same type as value).
- /// Both offset and default are evaluated with respect to the current row.
- /// If omitted, offset defaults to 1 and default to null
- Lead,
- /// Returns value evaluated at the row that is the first row of the window
frame
+ /// returns value evaluated at the row that is the first row of the window
frame
FirstValue,
/// Returns value evaluated at the row that is the last row of the window
frame
LastValue,
@@ -68,8 +58,6 @@ impl BuiltInWindowFunction {
match self {
CumeDist => "CUME_DIST",
Ntile => "NTILE",
- Lag => "LAG",
- Lead => "LEAD",
FirstValue => "first_value",
LastValue => "last_value",
NthValue => "NTH_VALUE",
@@ -83,8 +71,6 @@ impl FromStr for BuiltInWindowFunction {
Ok(match name.to_uppercase().as_str() {
"CUME_DIST" => BuiltInWindowFunction::CumeDist,
"NTILE" => BuiltInWindowFunction::Ntile,
- "LAG" => BuiltInWindowFunction::Lag,
- "LEAD" => BuiltInWindowFunction::Lead,
"FIRST_VALUE" => BuiltInWindowFunction::FirstValue,
"LAST_VALUE" => BuiltInWindowFunction::LastValue,
"NTH_VALUE" => BuiltInWindowFunction::NthValue,
@@ -117,9 +103,7 @@ impl BuiltInWindowFunction {
match self {
BuiltInWindowFunction::Ntile => Ok(DataType::UInt64),
BuiltInWindowFunction::CumeDist => Ok(DataType::Float64),
- BuiltInWindowFunction::Lag
- | BuiltInWindowFunction::Lead
- | BuiltInWindowFunction::FirstValue
+ BuiltInWindowFunction::FirstValue
| BuiltInWindowFunction::LastValue
| BuiltInWindowFunction::NthValue =>
Ok(input_expr_types[0].clone()),
}
@@ -130,16 +114,6 @@ impl BuiltInWindowFunction {
// Note: The physical expression must accept the type returned by this
function or the execution panics.
match self {
BuiltInWindowFunction::CumeDist => Signature::any(0,
Volatility::Immutable),
- BuiltInWindowFunction::Lag | BuiltInWindowFunction::Lead => {
- Signature::one_of(
- vec![
- TypeSignature::Any(1),
- TypeSignature::Any(2),
- TypeSignature::Any(3),
- ],
- Volatility::Immutable,
- )
- }
BuiltInWindowFunction::FirstValue |
BuiltInWindowFunction::LastValue => {
Signature::any(1, Volatility::Immutable)
}
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 3e692189e4..f3f71a8727 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -2560,30 +2560,6 @@ mod test {
Ok(())
}
- #[test]
- fn test_lead_return_type() -> Result<()> {
- let fun = find_df_window_func("lead").unwrap();
- let observed = fun.return_type(&[DataType::Utf8], &[true], "")?;
- assert_eq!(DataType::Utf8, observed);
-
- let observed = fun.return_type(&[DataType::Float64], &[true], "")?;
- assert_eq!(DataType::Float64, observed);
-
- Ok(())
- }
-
- #[test]
- fn test_lag_return_type() -> Result<()> {
- let fun = find_df_window_func("lag").unwrap();
- let observed = fun.return_type(&[DataType::Utf8], &[true], "")?;
- assert_eq!(DataType::Utf8, observed);
-
- let observed = fun.return_type(&[DataType::Float64], &[true], "")?;
- assert_eq!(DataType::Float64, observed);
-
- Ok(())
- }
-
#[test]
fn test_nth_value_return_type() -> Result<()> {
let fun = find_df_window_func("nth_value").unwrap();
@@ -2621,8 +2597,6 @@ mod test {
let names = vec![
"cume_dist",
"ntile",
- "lag",
- "lead",
"first_value",
"last_value",
"nth_value",
@@ -2660,18 +2634,6 @@ mod test {
built_in_window_function::BuiltInWindowFunction::LastValue
))
);
- assert_eq!(
- find_df_window_func("LAG"),
- Some(WindowFunctionDefinition::BuiltInWindowFunction(
- built_in_window_function::BuiltInWindowFunction::Lag
- ))
- );
- assert_eq!(
- find_df_window_func("LEAD"),
- Some(WindowFunctionDefinition::BuiltInWindowFunction(
- built_in_window_function::BuiltInWindowFunction::Lead
- ))
- );
assert_eq!(find_df_window_func("not_exist"), None)
}
diff --git a/datafusion/expr/src/udwf.rs b/datafusion/expr/src/udwf.rs
index 6d8f2be97e..6ab94c1e84 100644
--- a/datafusion/expr/src/udwf.rs
+++ b/datafusion/expr/src/udwf.rs
@@ -34,8 +34,10 @@ use crate::{
Signature,
};
use datafusion_common::{not_impl_err, Result};
+use datafusion_functions_window_common::expr::ExpressionArgs;
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
/// Logical representation of a user-defined window function (UDWF)
/// A UDWF is different from a UDF in that it is stateful across batches.
@@ -149,6 +151,12 @@ impl WindowUDF {
self.inner.simplify()
}
+ /// Expressions that are passed to the [`PartitionEvaluator`].
+ ///
+ /// See [`WindowUDFImpl::expressions`] for more details.
+ pub fn expressions(&self, expr_args: ExpressionArgs) -> Vec<Arc<dyn
PhysicalExpr>> {
+ self.inner.expressions(expr_args)
+ }
/// Return a `PartitionEvaluator` for evaluating this window function
pub fn partition_evaluator_factory(
&self,
@@ -302,6 +310,14 @@ pub trait WindowUDFImpl: Debug + Send + Sync {
/// types are accepted and the function's Volatility.
fn signature(&self) -> &Signature;
+ /// Returns the expressions that are passed to the [`PartitionEvaluator`].
+ fn expressions(&self, expr_args: ExpressionArgs) -> Vec<Arc<dyn
PhysicalExpr>> {
+ expr_args
+ .input_exprs()
+ .first()
+ .map_or(vec![], |expr| vec![Arc::clone(expr)])
+ }
+
/// Invoke the function, returning the [`PartitionEvaluator`] instance
fn partition_evaluator(
&self,
@@ -480,6 +496,13 @@ impl WindowUDFImpl for AliasedWindowUDFImpl {
self.inner.signature()
}
+ fn expressions(&self, expr_args: ExpressionArgs) -> Vec<Arc<dyn
PhysicalExpr>> {
+ expr_args
+ .input_exprs()
+ .first()
+ .map_or(vec![], |expr| vec![Arc::clone(expr)])
+ }
+
fn partition_evaluator(
&self,
partition_evaluator_args: PartitionEvaluatorArgs,
diff --git a/datafusion/expr/src/window_function.rs
b/datafusion/expr/src/window_function.rs
index 7ac6fb7d16..3e1870c59c 100644
--- a/datafusion/expr/src/window_function.rs
+++ b/datafusion/expr/src/window_function.rs
@@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use datafusion_common::ScalarValue;
-
use crate::{expr::WindowFunction, BuiltInWindowFunction, Expr, Literal};
/// Create an expression to represent the `cume_dist` window function
@@ -29,38 +27,6 @@ pub fn ntile(arg: Expr) -> Expr {
Expr::WindowFunction(WindowFunction::new(BuiltInWindowFunction::Ntile,
vec![arg]))
}
-/// Create an expression to represent the `lag` window function
-pub fn lag(
- arg: Expr,
- shift_offset: Option<i64>,
- default_value: Option<ScalarValue>,
-) -> Expr {
- let shift_offset_lit = shift_offset
- .map(|v| v.lit())
- .unwrap_or(ScalarValue::Null.lit());
- let default_lit = default_value.unwrap_or(ScalarValue::Null).lit();
- Expr::WindowFunction(WindowFunction::new(
- BuiltInWindowFunction::Lag,
- vec![arg, shift_offset_lit, default_lit],
- ))
-}
-
-/// Create an expression to represent the `lead` window function
-pub fn lead(
- arg: Expr,
- shift_offset: Option<i64>,
- default_value: Option<ScalarValue>,
-) -> Expr {
- let shift_offset_lit = shift_offset
- .map(|v| v.lit())
- .unwrap_or(ScalarValue::Null.lit());
- let default_lit = default_value.unwrap_or(ScalarValue::Null).lit();
- Expr::WindowFunction(WindowFunction::new(
- BuiltInWindowFunction::Lead,
- vec![arg, shift_offset_lit, default_lit],
- ))
-}
-
/// Create an expression to represent the `nth_value` window function
pub fn nth_value(arg: Expr, n: i64) -> Expr {
Expr::WindowFunction(WindowFunction::new(
diff --git a/datafusion/functions-window-common/src/expr.rs
b/datafusion/functions-window-common/src/expr.rs
new file mode 100644
index 0000000000..1d99fe7acf
--- /dev/null
+++ b/datafusion/functions-window-common/src/expr.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::arrow::datatypes::DataType;
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use std::sync::Arc;
+
+/// Arguments passed to user-defined window function
+#[derive(Debug, Default)]
+pub struct ExpressionArgs<'a> {
+ /// The expressions passed as arguments to the user-defined window
+ /// function.
+ input_exprs: &'a [Arc<dyn PhysicalExpr>],
+ /// The corresponding data types of expressions passed as arguments
+ /// to the user-defined window function.
+ input_types: &'a [DataType],
+}
+
+impl<'a> ExpressionArgs<'a> {
+ /// Create an instance of [`ExpressionArgs`].
+ ///
+ /// # Arguments
+ ///
+ /// * `input_exprs` - The expressions passed as arguments
+ /// to the user-defined window function.
+ /// * `input_types` - The data types corresponding to the
+ /// arguments to the user-defined window function.
+ ///
+ pub fn new(
+ input_exprs: &'a [Arc<dyn PhysicalExpr>],
+ input_types: &'a [DataType],
+ ) -> Self {
+ Self {
+ input_exprs,
+ input_types,
+ }
+ }
+
+ /// Returns the expressions passed as arguments to the user-defined
+ /// window function.
+ pub fn input_exprs(&self) -> &'a [Arc<dyn PhysicalExpr>] {
+ self.input_exprs
+ }
+
+ /// Returns the [`DataType`]s corresponding to the input expressions
+ /// to the user-defined window function.
+ pub fn input_types(&self) -> &'a [DataType] {
+ self.input_types
+ }
+}
diff --git a/datafusion/functions-window-common/src/lib.rs
b/datafusion/functions-window-common/src/lib.rs
index 53f9eb1c9a..da8d096da5 100644
--- a/datafusion/functions-window-common/src/lib.rs
+++ b/datafusion/functions-window-common/src/lib.rs
@@ -18,5 +18,6 @@
//! Common user-defined window functionality for [DataFusion]
//!
//! [DataFusion]: <https://crates.io/crates/datafusion>
+pub mod expr;
pub mod field;
pub mod partition;
diff --git a/datafusion/functions-window/Cargo.toml
b/datafusion/functions-window/Cargo.toml
index 952e5720c7..262c21fcec 100644
--- a/datafusion/functions-window/Cargo.toml
+++ b/datafusion/functions-window/Cargo.toml
@@ -41,6 +41,7 @@ path = "src/lib.rs"
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-functions-window-common = { workspace = true }
+datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
log = { workspace = true }
paste = "1.0.15"
diff --git a/datafusion/physical-expr/src/window/lead_lag.rs
b/datafusion/functions-window/src/lead_lag.rs
similarity index 59%
rename from datafusion/physical-expr/src/window/lead_lag.rs
rename to datafusion/functions-window/src/lead_lag.rs
index 1656b7c303..f815210997 100644
--- a/datafusion/physical-expr/src/window/lead_lag.rs
+++ b/datafusion/functions-window/src/lead_lag.rs
@@ -15,125 +15,275 @@
// specific language governing permissions and limitations
// under the License.
-//! Defines physical expression for `lead` and `lag` that can evaluated
-//! at runtime during query execution
-use crate::window::BuiltInWindowFunctionExpr;
-use crate::PhysicalExpr;
-use arrow::array::ArrayRef;
-use arrow::datatypes::{DataType, Field};
-use arrow_array::Array;
+//! `lead` and `lag` window function implementations
+
+use crate::utils::{get_scalar_value_from_args, get_signed_integer};
+use datafusion_common::arrow::array::ArrayRef;
+use datafusion_common::arrow::datatypes::DataType;
+use datafusion_common::arrow::datatypes::Field;
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result,
ScalarValue};
-use datafusion_expr::PartitionEvaluator;
+use datafusion_expr::{
+ Literal, PartitionEvaluator, ReversedUDWF, Signature, TypeSignature,
Volatility,
+ WindowUDFImpl,
+};
+use datafusion_functions_window_common::expr::ExpressionArgs;
+use datafusion_functions_window_common::field::WindowUDFFieldArgs;
+use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use std::any::Any;
use std::cmp::min;
use std::collections::VecDeque;
use std::ops::{Neg, Range};
use std::sync::Arc;
-/// window shift expression
+get_or_init_udwf!(
+ Lag,
+ lag,
+ "Returns the row value that precedes the current row by a specified \
+ offset within partition. If no such row exists, then returns the \
+ default value.",
+ WindowShift::lag
+);
+get_or_init_udwf!(
+ Lead,
+ lead,
+ "Returns the value from a row that follows the current row by a \
+ specified offset within the partition. If no such row exists, then \
+ returns the default value.",
+ WindowShift::lead
+);
+
+/// Create an expression to represent the `lag` window function
+///
+/// returns value evaluated at the row that is offset rows before the current
row within the partition;
+/// if there is no such row, instead return default (which must be of the same
type as value).
+/// Both offset and default are evaluated with respect to the current row.
+/// If omitted, offset defaults to 1 and default to null
+pub fn lag(
+ arg: datafusion_expr::Expr,
+ shift_offset: Option<i64>,
+ default_value: Option<ScalarValue>,
+) -> datafusion_expr::Expr {
+ let shift_offset_lit = shift_offset
+ .map(|v| v.lit())
+ .unwrap_or(ScalarValue::Null.lit());
+ let default_lit = default_value.unwrap_or(ScalarValue::Null).lit();
+
+ lag_udwf().call(vec![arg, shift_offset_lit, default_lit])
+}
+
+/// Create an expression to represent the `lead` window function
+///
+/// returns value evaluated at the row that is offset rows after the current
row within the partition;
+/// if there is no such row, instead return default (which must be of the same
type as value).
+/// Both offset and default are evaluated with respect to the current row.
+/// If omitted, offset defaults to 1 and default to null
+pub fn lead(
+ arg: datafusion_expr::Expr,
+ shift_offset: Option<i64>,
+ default_value: Option<ScalarValue>,
+) -> datafusion_expr::Expr {
+ let shift_offset_lit = shift_offset
+ .map(|v| v.lit())
+ .unwrap_or(ScalarValue::Null.lit());
+ let default_lit = default_value.unwrap_or(ScalarValue::Null).lit();
+
+ lead_udwf().call(vec![arg, shift_offset_lit, default_lit])
+}
+
#[derive(Debug)]
-pub struct WindowShift {
- name: String,
- /// Output data type
- data_type: DataType,
- shift_offset: i64,
- expr: Arc<dyn PhysicalExpr>,
- default_value: ScalarValue,
- ignore_nulls: bool,
+enum WindowShiftKind {
+ Lag,
+ Lead,
}
-impl WindowShift {
- /// Get shift_offset of window shift expression
- pub fn get_shift_offset(&self) -> i64 {
- self.shift_offset
+impl WindowShiftKind {
+ fn name(&self) -> &'static str {
+ match self {
+ WindowShiftKind::Lag => "lag",
+ WindowShiftKind::Lead => "lead",
+ }
}
- /// Get the default_value for window shift expression.
- pub fn get_default_value(&self) -> ScalarValue {
- self.default_value.clone()
+ /// In [`WindowShiftEvaluator`] a positive offset is used to signal
+ /// computation of `lag()`. So here we negate the input offset
+ /// value when computing `lead()`.
+ fn shift_offset(&self, value: Option<i64>) -> i64 {
+ match self {
+ WindowShiftKind::Lag => value.unwrap_or(1),
+ WindowShiftKind::Lead => value.map(|v| v.neg()).unwrap_or(-1),
+ }
}
}
-/// lead() window function
-pub fn lead(
- name: String,
- data_type: DataType,
- expr: Arc<dyn PhysicalExpr>,
- shift_offset: Option<i64>,
- default_value: ScalarValue,
- ignore_nulls: bool,
-) -> WindowShift {
- WindowShift {
- name,
- data_type,
- shift_offset: shift_offset.map(|v| v.neg()).unwrap_or(-1),
- expr,
- default_value,
- ignore_nulls,
- }
+/// window shift expression
+#[derive(Debug)]
+pub struct WindowShift {
+ signature: Signature,
+ kind: WindowShiftKind,
}
-/// lag() window function
-pub fn lag(
- name: String,
- data_type: DataType,
- expr: Arc<dyn PhysicalExpr>,
- shift_offset: Option<i64>,
- default_value: ScalarValue,
- ignore_nulls: bool,
-) -> WindowShift {
- WindowShift {
- name,
- data_type,
- shift_offset: shift_offset.unwrap_or(1),
- expr,
- default_value,
- ignore_nulls,
+impl WindowShift {
+ fn new(kind: WindowShiftKind) -> Self {
+ Self {
+ signature: Signature::one_of(
+ vec![
+ TypeSignature::Any(1),
+ TypeSignature::Any(2),
+ TypeSignature::Any(3),
+ ],
+ Volatility::Immutable,
+ ),
+ kind,
+ }
+ }
+
+ pub fn lag() -> Self {
+ Self::new(WindowShiftKind::Lag)
+ }
+
+ pub fn lead() -> Self {
+ Self::new(WindowShiftKind::Lead)
}
}
-impl BuiltInWindowFunctionExpr for WindowShift {
- /// Return a reference to Any that can be used for downcasting
+impl WindowUDFImpl for WindowShift {
fn as_any(&self) -> &dyn Any {
self
}
- fn field(&self) -> Result<Field> {
- let nullable = true;
- Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+ fn name(&self) -> &str {
+ self.kind.name()
}
- fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![Arc::clone(&self.expr)]
+ fn signature(&self) -> &Signature {
+ &self.signature
}
- fn name(&self) -> &str {
- &self.name
+ /// Handles the case where `NULL` expression is passed as an
+ /// argument to `lead`/`lag`. The type is refined depending
+ /// on the default value argument.
+ ///
+ /// For more details see:
<https://github.com/apache/datafusion/issues/12717>
+ fn expressions(&self, expr_args: ExpressionArgs) -> Vec<Arc<dyn
PhysicalExpr>> {
+ parse_expr(expr_args.input_exprs(), expr_args.input_types())
+ .into_iter()
+ .collect::<Vec<_>>()
}
- fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
+ fn partition_evaluator(
+ &self,
+ partition_evaluator_args: PartitionEvaluatorArgs,
+ ) -> Result<Box<dyn PartitionEvaluator>> {
+ let shift_offset =
+ get_scalar_value_from_args(partition_evaluator_args.input_exprs(),
1)?
+ .map(get_signed_integer)
+ .map_or(Ok(None), |v| v.map(Some))
+ .map(|n| self.kind.shift_offset(n))
+ .map(|offset| {
+ if partition_evaluator_args.is_reversed() {
+ -offset
+ } else {
+ offset
+ }
+ })?;
+ let default_value = parse_default_value(
+ partition_evaluator_args.input_exprs(),
+ partition_evaluator_args.input_types(),
+ )?;
+
Ok(Box::new(WindowShiftEvaluator {
- shift_offset: self.shift_offset,
- default_value: self.default_value.clone(),
- ignore_nulls: self.ignore_nulls,
+ shift_offset,
+ default_value,
+ ignore_nulls: partition_evaluator_args.ignore_nulls(),
non_null_offsets: VecDeque::new(),
}))
}
- fn reverse_expr(&self) -> Option<Arc<dyn BuiltInWindowFunctionExpr>> {
- Some(Arc::new(Self {
- name: self.name.clone(),
- data_type: self.data_type.clone(),
- shift_offset: -self.shift_offset,
- expr: Arc::clone(&self.expr),
- default_value: self.default_value.clone(),
- ignore_nulls: self.ignore_nulls,
- }))
+ fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+ let return_type = parse_expr_type(field_args.input_types())?;
+
+ Ok(Field::new(field_args.name(), return_type, true))
}
+
+ fn reverse_expr(&self) -> ReversedUDWF {
+ match self.kind {
+ WindowShiftKind::Lag => ReversedUDWF::Reversed(lag_udwf()),
+ WindowShiftKind::Lead => ReversedUDWF::Reversed(lead_udwf()),
+ }
+ }
+}
+
+/// When `lead`/`lag` is evaluated on a `NULL` expression we attempt to
+/// refine it by matching it with the type of the default value.
+///
+/// For e.g. in `lead(NULL, 1, false)` the generic `ScalarValue::Null`
+/// is refined into `ScalarValue::Boolean(None)`. Only the type is
+/// refined, the expression value remains `NULL`.
+///
+/// When the window function is evaluated with `NULL` expression
+/// this guarantees that the type matches with that of the default
+/// value.
+///
+/// For more details see: <https://github.com/apache/datafusion/issues/12717>
+fn parse_expr(
+ input_exprs: &[Arc<dyn PhysicalExpr>],
+ input_types: &[DataType],
+) -> Result<Arc<dyn PhysicalExpr>> {
+ assert!(!input_exprs.is_empty());
+ assert!(!input_types.is_empty());
+
+ let expr = Arc::clone(input_exprs.first().unwrap());
+ let expr_type = input_types.first().unwrap();
+
+ // Handles the most common case where NULL is unexpected
+ if !expr_type.is_null() {
+ return Ok(expr);
+ }
+
+ let default_value = get_scalar_value_from_args(input_exprs, 2)?;
+ default_value.map_or(Ok(expr), |value| {
+ ScalarValue::try_from(&value.data_type()).map(|v| {
+ Arc::new(datafusion_physical_expr::expressions::Literal::new(v))
+ as Arc<dyn PhysicalExpr>
+ })
+ })
+}
+
+/// Returns the data type of the default value(if provided) when the
+/// expression is `NULL`.
+///
+/// Otherwise, returns the expression type unchanged.
+fn parse_expr_type(input_types: &[DataType]) -> Result<DataType> {
+ assert!(!input_types.is_empty());
+ let expr_type = input_types.first().unwrap_or(&DataType::Null);
+
+ // Handles the most common case where NULL is unexpected
+ if !expr_type.is_null() {
+ return Ok(expr_type.clone());
+ }
+
+ let default_value_type = input_types.get(2).unwrap_or(&DataType::Null);
+ Ok(default_value_type.clone())
+}
+
+/// Handles type coercion and null value refinement for default value
+/// argument depending on the data type of the input expression.
+fn parse_default_value(
+ input_exprs: &[Arc<dyn PhysicalExpr>],
+ input_types: &[DataType],
+) -> Result<ScalarValue> {
+ let expr_type = parse_expr_type(input_types)?;
+ let unparsed = get_scalar_value_from_args(input_exprs, 2)?;
+
+ unparsed
+ .filter(|v| !v.data_type().is_null())
+ .map(|v| v.cast_to(&expr_type))
+ .unwrap_or(ScalarValue::try_from(expr_type))
}
#[derive(Debug)]
-pub(crate) struct WindowShiftEvaluator {
+struct WindowShiftEvaluator {
shift_offset: i64,
default_value: ScalarValue,
ignore_nulls: bool,
@@ -205,7 +355,7 @@ fn shift_with_default_value(
offset: i64,
default_value: &ScalarValue,
) -> Result<ArrayRef> {
- use arrow::compute::concat;
+ use datafusion_common::arrow::compute::concat;
let value_len = array.len() as i64;
if offset == 0 {
@@ -402,19 +552,22 @@ impl PartitionEvaluator for WindowShiftEvaluator {
#[cfg(test)]
mod tests {
use super::*;
- use crate::expressions::Column;
- use arrow::{array::*, datatypes::*};
+ use arrow::array::*;
use datafusion_common::cast::as_int32_array;
-
- fn test_i32_result(expr: WindowShift, expected: Int32Array) -> Result<()> {
+ use datafusion_physical_expr::expressions::{Column, Literal};
+ use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+
+ fn test_i32_result(
+ expr: WindowShift,
+ partition_evaluator_args: PartitionEvaluatorArgs,
+ expected: Int32Array,
+ ) -> Result<()> {
let arr: ArrayRef = Arc::new(Int32Array::from(vec![1, -2, 3, -4, 5,
-6, 7, 8]));
let values = vec![arr];
- let schema = Schema::new(vec![Field::new("arr", DataType::Int32,
false)]);
- let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
- let values = expr.evaluate_args(&batch)?;
+ let num_rows = values.len();
let result = expr
- .create_evaluator()?
- .evaluate_all(&values, batch.num_rows())?;
+ .partition_evaluator(partition_evaluator_args)?
+ .evaluate_all(&values, num_rows)?;
let result = as_int32_array(&result)?;
assert_eq!(expected, *result);
Ok(())
@@ -466,16 +619,12 @@ mod tests {
}
#[test]
- fn lead_lag_window_shift() -> Result<()> {
+ fn test_lead_window_shift() -> Result<()> {
+ let expr = Arc::new(Column::new("c3", 0)) as Arc<dyn PhysicalExpr>;
+
test_i32_result(
- lead(
- "lead".to_owned(),
- DataType::Int32,
- Arc::new(Column::new("c3", 0)),
- None,
- ScalarValue::Null.cast_to(&DataType::Int32)?,
- false,
- ),
+ WindowShift::lead(),
+ PartitionEvaluatorArgs::new(&[expr], &[DataType::Int32], false,
false),
[
Some(-2),
Some(3),
@@ -488,17 +637,16 @@ mod tests {
]
.iter()
.collect::<Int32Array>(),
- )?;
+ )
+ }
+
+ #[test]
+ fn test_lag_window_shift() -> Result<()> {
+ let expr = Arc::new(Column::new("c3", 0)) as Arc<dyn PhysicalExpr>;
test_i32_result(
- lag(
- "lead".to_owned(),
- DataType::Int32,
- Arc::new(Column::new("c3", 0)),
- None,
- ScalarValue::Null.cast_to(&DataType::Int32)?,
- false,
- ),
+ WindowShift::lag(),
+ PartitionEvaluatorArgs::new(&[expr], &[DataType::Int32], false,
false),
[
None,
Some(1),
@@ -511,17 +659,24 @@ mod tests {
]
.iter()
.collect::<Int32Array>(),
- )?;
+ )
+ }
+
+ #[test]
+ fn test_lag_with_default() -> Result<()> {
+ let expr = Arc::new(Column::new("c3", 0)) as Arc<dyn PhysicalExpr>;
+ let shift_offset =
+ Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn
PhysicalExpr>;
+ let default_value =
Arc::new(Literal::new(ScalarValue::Int32(Some(100))))
+ as Arc<dyn PhysicalExpr>;
+
+ let input_exprs = &[expr, shift_offset, default_value];
+ let input_types: &[DataType] =
+ &[DataType::Int32, DataType::Int32, DataType::Int32];
test_i32_result(
- lag(
- "lead".to_owned(),
- DataType::Int32,
- Arc::new(Column::new("c3", 0)),
- None,
- ScalarValue::Int32(Some(100)),
- false,
- ),
+ WindowShift::lag(),
+ PartitionEvaluatorArgs::new(input_exprs, input_types, false,
false),
[
Some(100),
Some(1),
@@ -534,7 +689,6 @@ mod tests {
]
.iter()
.collect::<Int32Array>(),
- )?;
- Ok(())
+ )
}
}
diff --git a/datafusion/functions-window/src/lib.rs
b/datafusion/functions-window/src/lib.rs
index ef624e13e6..5a2aafa289 100644
--- a/datafusion/functions-window/src/lib.rs
+++ b/datafusion/functions-window/src/lib.rs
@@ -31,11 +31,17 @@ use datafusion_expr::WindowUDF;
#[macro_use]
pub mod macros;
+
+pub mod lead_lag;
+
pub mod rank;
pub mod row_number;
+mod utils;
/// Fluent-style API for creating `Expr`s
pub mod expr_fn {
+ pub use super::lead_lag::lag;
+ pub use super::lead_lag::lead;
pub use super::rank::{dense_rank, percent_rank, rank};
pub use super::row_number::row_number;
}
@@ -44,6 +50,8 @@ pub mod expr_fn {
pub fn all_default_window_functions() -> Vec<Arc<WindowUDF>> {
vec![
row_number::row_number_udwf(),
+ lead_lag::lead_udwf(),
+ lead_lag::lag_udwf(),
rank::rank_udwf(),
rank::dense_rank_udwf(),
rank::percent_rank_udwf(),
diff --git a/datafusion/functions-window/src/utils.rs
b/datafusion/functions-window/src/utils.rs
new file mode 100644
index 0000000000..69f68aa78f
--- /dev/null
+++ b/datafusion/functions-window/src/utils.rs
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::arrow::datatypes::DataType;
+use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue};
+use datafusion_physical_expr::expressions::Literal;
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use std::sync::Arc;
+
+pub(crate) fn get_signed_integer(value: ScalarValue) -> Result<i64> {
+ if value.is_null() {
+ return Ok(0);
+ }
+
+ if !value.data_type().is_integer() {
+ return exec_err!("Expected an integer value");
+ }
+
+ value.cast_to(&DataType::Int64)?.try_into()
+}
+
+pub(crate) fn get_scalar_value_from_args(
+ args: &[Arc<dyn PhysicalExpr>],
+ index: usize,
+) -> Result<Option<ScalarValue>> {
+ Ok(if let Some(field) = args.get(index) {
+ let tmp = field
+ .as_any()
+ .downcast_ref::<Literal>()
+ .ok_or_else(|| DataFusionError::NotImplemented(
+ format!("There is only support Literal types for field at idx:
{index} in Window Function"),
+ ))?
+ .value()
+ .clone();
+ Some(tmp)
+ } else {
+ None
+ })
+}
diff --git a/datafusion/physical-expr/src/expressions/mod.rs
b/datafusion/physical-expr/src/expressions/mod.rs
index e07e11e431..54b8aafdb4 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -36,7 +36,6 @@ mod unknown_column;
/// Module with some convenient methods used in expression building
pub use crate::aggregate::stats::StatsType;
pub use crate::window::cume_dist::{cume_dist, CumeDist};
-pub use crate::window::lead_lag::{lag, lead, WindowShift};
pub use crate::window::nth_value::NthValue;
pub use crate::window::ntile::Ntile;
pub use crate::PhysicalSortExpr;
diff --git a/datafusion/physical-expr/src/window/mod.rs
b/datafusion/physical-expr/src/window/mod.rs
index 938bdac50f..c0fe3c2933 100644
--- a/datafusion/physical-expr/src/window/mod.rs
+++ b/datafusion/physical-expr/src/window/mod.rs
@@ -19,7 +19,6 @@ mod aggregate;
mod built_in;
mod built_in_window_function_expr;
pub(crate) mod cume_dist;
-pub(crate) mod lead_lag;
pub(crate) mod nth_value;
pub(crate) mod ntile;
mod sliding_aggregate;
diff --git a/datafusion/physical-plan/src/windows/mod.rs
b/datafusion/physical-plan/src/windows/mod.rs
index e6a773f6b1..adf61f27bc 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -21,7 +21,7 @@ use std::borrow::Borrow;
use std::sync::Arc;
use crate::{
- expressions::{cume_dist, lag, lead, Literal, NthValue, Ntile,
PhysicalSortExpr},
+ expressions::{cume_dist, Literal, NthValue, Ntile, PhysicalSortExpr},
ExecutionPlan, ExecutionPlanProperties, InputOrderMode, PhysicalExpr,
};
@@ -48,6 +48,7 @@ mod utils;
mod window_agg_exec;
pub use bounded_window_agg_exec::BoundedWindowAggExec;
+use datafusion_functions_window_common::expr::ExpressionArgs;
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use datafusion_physical_expr::expressions::Column;
@@ -206,52 +207,6 @@ fn get_unsigned_integer(value: ScalarValue) -> Result<u64>
{
value.cast_to(&DataType::UInt64)?.try_into()
}
-fn get_casted_value(
- default_value: Option<ScalarValue>,
- dtype: &DataType,
-) -> Result<ScalarValue> {
- match default_value {
- Some(v) if !v.data_type().is_null() => v.cast_to(dtype),
- // If None or Null datatype
- _ => ScalarValue::try_from(dtype),
- }
-}
-
-/// Rewrites the NULL expression (1st argument) with an expression
-/// which is the same data type as the default value (3rd argument).
-/// Also rewrites the return type with the same data type as the
-/// default value.
-///
-/// If a default value is not provided, or it is NULL the original
-/// expression (1st argument) and return type is returned without
-/// any modifications.
-fn rewrite_null_expr_and_data_type(
- args: &[Arc<dyn PhysicalExpr>],
- expr_type: &DataType,
-) -> Result<(Arc<dyn PhysicalExpr>, DataType)> {
- assert!(!args.is_empty());
- let expr = Arc::clone(&args[0]);
-
- // The input expression and the return is type is unchanged
- // when the input expression is not NULL.
- if !expr_type.is_null() {
- return Ok((expr, expr_type.clone()));
- }
-
- get_scalar_value_from_args(args, 2)?
- .and_then(|value| {
- ScalarValue::try_from(value.data_type().clone())
- .map(|sv| {
- Ok((
- Arc::new(Literal::new(sv)) as Arc<dyn PhysicalExpr>,
- value.data_type().clone(),
- ))
- })
- .ok()
- })
- .unwrap_or(Ok((expr, expr_type.clone())))
-}
-
fn create_built_in_window_expr(
fun: &BuiltInWindowFunction,
args: &[Arc<dyn PhysicalExpr>],
@@ -286,42 +241,6 @@ fn create_built_in_window_expr(
Arc::new(Ntile::new(name, n as u64, out_data_type))
}
}
- BuiltInWindowFunction::Lag => {
- // rewrite NULL expression and the return datatype
- let (arg, out_data_type) =
- rewrite_null_expr_and_data_type(args, out_data_type)?;
- let shift_offset = get_scalar_value_from_args(args, 1)?
- .map(get_signed_integer)
- .map_or(Ok(None), |v| v.map(Some))?;
- let default_value =
- get_casted_value(get_scalar_value_from_args(args, 2)?,
&out_data_type)?;
- Arc::new(lag(
- name,
- default_value.data_type().clone(),
- arg,
- shift_offset,
- default_value,
- ignore_nulls,
- ))
- }
- BuiltInWindowFunction::Lead => {
- // rewrite NULL expression and the return datatype
- let (arg, out_data_type) =
- rewrite_null_expr_and_data_type(args, out_data_type)?;
- let shift_offset = get_scalar_value_from_args(args, 1)?
- .map(get_signed_integer)
- .map_or(Ok(None), |v| v.map(Some))?;
- let default_value =
- get_casted_value(get_scalar_value_from_args(args, 2)?,
&out_data_type)?;
- Arc::new(lead(
- name,
- default_value.data_type().clone(),
- arg,
- shift_offset,
- default_value,
- ignore_nulls,
- ))
- }
BuiltInWindowFunction::NthValue => {
let arg = Arc::clone(&args[0]);
let n = get_signed_integer(
@@ -415,7 +334,8 @@ impl BuiltInWindowFunctionExpr for WindowUDFExpr {
}
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- self.args.clone()
+ self.fun
+ .expressions(ExpressionArgs::new(&self.args, &self.input_types))
}
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 5256f7473c..9964ab498f 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -515,8 +515,8 @@ enum BuiltInWindowFunction {
// PERCENT_RANK = 3;
CUME_DIST = 4;
NTILE = 5;
- LAG = 6;
- LEAD = 7;
+ // LAG = 6;
+ // LEAD = 7;
FIRST_VALUE = 8;
LAST_VALUE = 9;
NTH_VALUE = 10;
@@ -528,7 +528,7 @@ message WindowExprNode {
string udaf = 3;
string udwf = 9;
}
- LogicalExprNode expr = 4;
+ repeated LogicalExprNode exprs = 4;
repeated LogicalExprNode partition_by = 5;
repeated SortExprNode order_by = 6;
// repeated LogicalExprNode filter = 7;
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index e876008e85..4417d11496 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -1664,8 +1664,6 @@ impl serde::Serialize for BuiltInWindowFunction {
Self::Unspecified => "UNSPECIFIED",
Self::CumeDist => "CUME_DIST",
Self::Ntile => "NTILE",
- Self::Lag => "LAG",
- Self::Lead => "LEAD",
Self::FirstValue => "FIRST_VALUE",
Self::LastValue => "LAST_VALUE",
Self::NthValue => "NTH_VALUE",
@@ -1683,8 +1681,6 @@ impl<'de> serde::Deserialize<'de> for
BuiltInWindowFunction {
"UNSPECIFIED",
"CUME_DIST",
"NTILE",
- "LAG",
- "LEAD",
"FIRST_VALUE",
"LAST_VALUE",
"NTH_VALUE",
@@ -1731,8 +1727,6 @@ impl<'de> serde::Deserialize<'de> for
BuiltInWindowFunction {
"UNSPECIFIED" => Ok(BuiltInWindowFunction::Unspecified),
"CUME_DIST" => Ok(BuiltInWindowFunction::CumeDist),
"NTILE" => Ok(BuiltInWindowFunction::Ntile),
- "LAG" => Ok(BuiltInWindowFunction::Lag),
- "LEAD" => Ok(BuiltInWindowFunction::Lead),
"FIRST_VALUE" => Ok(BuiltInWindowFunction::FirstValue),
"LAST_VALUE" => Ok(BuiltInWindowFunction::LastValue),
"NTH_VALUE" => Ok(BuiltInWindowFunction::NthValue),
@@ -21157,7 +21151,7 @@ impl serde::Serialize for WindowExprNode {
{
use serde::ser::SerializeStruct;
let mut len = 0;
- if self.expr.is_some() {
+ if !self.exprs.is_empty() {
len += 1;
}
if !self.partition_by.is_empty() {
@@ -21176,8 +21170,8 @@ impl serde::Serialize for WindowExprNode {
len += 1;
}
let mut struct_ser =
serializer.serialize_struct("datafusion.WindowExprNode", len)?;
- if let Some(v) = self.expr.as_ref() {
- struct_ser.serialize_field("expr", v)?;
+ if !self.exprs.is_empty() {
+ struct_ser.serialize_field("exprs", &self.exprs)?;
}
if !self.partition_by.is_empty() {
struct_ser.serialize_field("partitionBy", &self.partition_by)?;
@@ -21218,7 +21212,7 @@ impl<'de> serde::Deserialize<'de> for WindowExprNode {
D: serde::Deserializer<'de>,
{
const FIELDS: &[&str] = &[
- "expr",
+ "exprs",
"partition_by",
"partitionBy",
"order_by",
@@ -21235,7 +21229,7 @@ impl<'de> serde::Deserialize<'de> for WindowExprNode {
#[allow(clippy::enum_variant_names)]
enum GeneratedField {
- Expr,
+ Exprs,
PartitionBy,
OrderBy,
WindowFrame,
@@ -21264,7 +21258,7 @@ impl<'de> serde::Deserialize<'de> for WindowExprNode {
E: serde::de::Error,
{
match value {
- "expr" => Ok(GeneratedField::Expr),
+ "exprs" => Ok(GeneratedField::Exprs),
"partitionBy" | "partition_by" =>
Ok(GeneratedField::PartitionBy),
"orderBy" | "order_by" =>
Ok(GeneratedField::OrderBy),
"windowFrame" | "window_frame" =>
Ok(GeneratedField::WindowFrame),
@@ -21291,7 +21285,7 @@ impl<'de> serde::Deserialize<'de> for WindowExprNode {
where
V: serde::de::MapAccess<'de>,
{
- let mut expr__ = None;
+ let mut exprs__ = None;
let mut partition_by__ = None;
let mut order_by__ = None;
let mut window_frame__ = None;
@@ -21299,11 +21293,11 @@ impl<'de> serde::Deserialize<'de> for WindowExprNode {
let mut window_function__ = None;
while let Some(k) = map_.next_key()? {
match k {
- GeneratedField::Expr => {
- if expr__.is_some() {
- return
Err(serde::de::Error::duplicate_field("expr"));
+ GeneratedField::Exprs => {
+ if exprs__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("exprs"));
}
- expr__ = map_.next_value()?;
+ exprs__ = Some(map_.next_value()?);
}
GeneratedField::PartitionBy => {
if partition_by__.is_some() {
@@ -21352,7 +21346,7 @@ impl<'de> serde::Deserialize<'de> for WindowExprNode {
}
}
Ok(WindowExprNode {
- expr: expr__,
+ exprs: exprs__.unwrap_or_default(),
partition_by: partition_by__.unwrap_or_default(),
order_by: order_by__.unwrap_or_default(),
window_frame: window_frame__,
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 2aa14f7e80..d3fe031a48 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -538,7 +538,7 @@ pub mod logical_expr_node {
TryCast(::prost::alloc::boxed::Box<super::TryCastNode>),
/// window expressions
#[prost(message, tag = "18")]
- WindowExpr(::prost::alloc::boxed::Box<super::WindowExprNode>),
+ WindowExpr(super::WindowExprNode),
/// AggregateUDF expressions
#[prost(message, tag = "19")]
AggregateUdfExpr(::prost::alloc::boxed::Box<super::AggregateUdfExprNode>),
@@ -735,8 +735,8 @@ pub struct ScalarUdfExprNode {
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct WindowExprNode {
- #[prost(message, optional, boxed, tag = "4")]
- pub expr:
::core::option::Option<::prost::alloc::boxed::Box<LogicalExprNode>>,
+ #[prost(message, repeated, tag = "4")]
+ pub exprs: ::prost::alloc::vec::Vec<LogicalExprNode>,
#[prost(message, repeated, tag = "5")]
pub partition_by: ::prost::alloc::vec::Vec<LogicalExprNode>,
#[prost(message, repeated, tag = "6")]
@@ -1828,8 +1828,8 @@ pub enum BuiltInWindowFunction {
/// PERCENT_RANK = 3;
CumeDist = 4,
Ntile = 5,
- Lag = 6,
- Lead = 7,
+ /// LAG = 6;
+ /// LEAD = 7;
FirstValue = 8,
LastValue = 9,
NthValue = 10,
@@ -1844,8 +1844,6 @@ impl BuiltInWindowFunction {
Self::Unspecified => "UNSPECIFIED",
Self::CumeDist => "CUME_DIST",
Self::Ntile => "NTILE",
- Self::Lag => "LAG",
- Self::Lead => "LEAD",
Self::FirstValue => "FIRST_VALUE",
Self::LastValue => "LAST_VALUE",
Self::NthValue => "NTH_VALUE",
@@ -1857,8 +1855,6 @@ impl BuiltInWindowFunction {
"UNSPECIFIED" => Some(Self::Unspecified),
"CUME_DIST" => Some(Self::CumeDist),
"NTILE" => Some(Self::Ntile),
- "LAG" => Some(Self::Lag),
- "LEAD" => Some(Self::Lead),
"FIRST_VALUE" => Some(Self::FirstValue),
"LAST_VALUE" => Some(Self::LastValue),
"NTH_VALUE" => Some(Self::NthValue),
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs
b/datafusion/proto/src/logical_plan/from_proto.rs
index 32e1b68203..20d007048a 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -142,8 +142,6 @@ impl From<protobuf::BuiltInWindowFunction> for
BuiltInWindowFunction {
fn from(built_in_function: protobuf::BuiltInWindowFunction) -> Self {
match built_in_function {
protobuf::BuiltInWindowFunction::Unspecified => todo!(),
- protobuf::BuiltInWindowFunction::Lag => Self::Lag,
- protobuf::BuiltInWindowFunction::Lead => Self::Lead,
protobuf::BuiltInWindowFunction::FirstValue => Self::FirstValue,
protobuf::BuiltInWindowFunction::CumeDist => Self::CumeDist,
protobuf::BuiltInWindowFunction::Ntile => Self::Ntile,
@@ -286,10 +284,7 @@ pub fn parse_expr(
.map_err(|_| Error::unknown("BuiltInWindowFunction",
*i))?
.into();
- let args =
- parse_optional_expr(expr.expr.as_deref(), registry,
codec)?
- .map(|e| vec![e])
- .unwrap_or_else(Vec::new);
+ let args = parse_exprs(&expr.exprs, registry, codec)?;
Expr::WindowFunction(WindowFunction::new(
expr::WindowFunctionDefinition::BuiltInWindowFunction(
@@ -309,10 +304,7 @@ pub fn parse_expr(
None => registry.udaf(udaf_name)?,
};
- let args =
- parse_optional_expr(expr.expr.as_deref(), registry,
codec)?
- .map(|e| vec![e])
- .unwrap_or_else(Vec::new);
+ let args = parse_exprs(&expr.exprs, registry, codec)?;
Expr::WindowFunction(WindowFunction::new(
expr::WindowFunctionDefinition::AggregateUDF(udaf_function),
args,
@@ -329,10 +321,7 @@ pub fn parse_expr(
None => registry.udwf(udwf_name)?,
};
- let args =
- parse_optional_expr(expr.expr.as_deref(), registry,
codec)?
- .map(|e| vec![e])
- .unwrap_or_else(Vec::new);
+ let args = parse_exprs(&expr.exprs, registry, codec)?;
Expr::WindowFunction(WindowFunction::new(
expr::WindowFunctionDefinition::WindowUDF(udwf_function),
args,
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs
b/datafusion/proto/src/logical_plan/to_proto.rs
index 07823b422f..15fec3a8b2 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -119,8 +119,6 @@ impl From<&BuiltInWindowFunction> for
protobuf::BuiltInWindowFunction {
BuiltInWindowFunction::NthValue => Self::NthValue,
BuiltInWindowFunction::Ntile => Self::Ntile,
BuiltInWindowFunction::CumeDist => Self::CumeDist,
- BuiltInWindowFunction::Lag => Self::Lag,
- BuiltInWindowFunction::Lead => Self::Lead,
}
}
}
@@ -333,25 +331,19 @@ pub fn serialize_expr(
)
}
};
- let arg_expr: Option<Box<protobuf::LogicalExprNode>> = if
!args.is_empty() {
- let arg = &args[0];
- Some(Box::new(serialize_expr(arg, codec)?))
- } else {
- None
- };
let partition_by = serialize_exprs(partition_by, codec)?;
let order_by = serialize_sorts(order_by, codec)?;
let window_frame: Option<protobuf::WindowFrame> =
Some(window_frame.try_into()?);
- let window_expr = Box::new(protobuf::WindowExprNode {
- expr: arg_expr,
+ let window_expr = protobuf::WindowExprNode {
+ exprs: serialize_exprs(args, codec)?,
window_function: Some(window_function),
partition_by,
order_by,
window_frame,
fun_definition,
- });
+ };
protobuf::LogicalExprNode {
expr_type: Some(ExprType::WindowExpr(window_expr)),
}
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index 85d4fe8a16..6072baca68 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -25,7 +25,6 @@ use datafusion::physical_expr::{PhysicalSortExpr,
ScalarFunctionExpr};
use datafusion::physical_plan::expressions::{
BinaryExpr, CaseExpr, CastExpr, Column, CumeDist, InListExpr,
IsNotNullExpr,
IsNullExpr, Literal, NegativeExpr, NotExpr, NthValue, Ntile, TryCastExpr,
- WindowShift,
};
use datafusion::physical_plan::udaf::AggregateFunctionExpr;
use datafusion::physical_plan::windows::{BuiltInWindowExpr,
PlainAggregateWindowExpr};
@@ -119,25 +118,6 @@ pub fn serialize_physical_window_expr(
)))),
);
protobuf::BuiltInWindowFunction::Ntile
- } else if let Some(window_shift_expr) =
- built_in_fn_expr.downcast_ref::<WindowShift>()
- {
- args.insert(
- 1,
-
Arc::new(Literal::new(datafusion_common::ScalarValue::Int64(Some(
- window_shift_expr.get_shift_offset(),
- )))),
- );
- args.insert(
- 2,
- Arc::new(Literal::new(window_shift_expr.get_default_value())),
- );
-
- if window_shift_expr.get_shift_offset() >= 0 {
- protobuf::BuiltInWindowFunction::Lag
- } else {
- protobuf::BuiltInWindowFunction::Lead
- }
} else if let Some(nth_value_expr) =
built_in_fn_expr.downcast_ref::<NthValue>() {
match nth_value_expr.get_kind() {
NthValueKind::First =>
protobuf::BuiltInWindowFunction::FirstValue,
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index ffa8fc1eef..c017395d97 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -47,8 +47,10 @@ use datafusion::functions_aggregate::expr_fn::{
};
use datafusion::functions_aggregate::min_max::max_udaf;
use datafusion::functions_nested::map::map;
-use datafusion::functions_window::rank::{dense_rank, percent_rank, rank,
rank_udwf};
-use datafusion::functions_window::row_number::row_number;
+use datafusion::functions_window::expr_fn::{
+ dense_rank, lag, lead, percent_rank, rank, row_number,
+};
+use datafusion::functions_window::rank::rank_udwf;
use datafusion::prelude::*;
use datafusion::test_util::{TestTableFactory, TestTableProvider};
use datafusion_common::config::TableOptions;
@@ -942,6 +944,12 @@ async fn roundtrip_expr_api() -> Result<()> {
rank(),
dense_rank(),
percent_rank(),
+ lead(col("b"), None, None),
+ lead(col("b"), Some(2), None),
+ lead(col("b"), Some(2), Some(ScalarValue::from(100))),
+ lag(col("b"), None, None),
+ lag(col("b"), Some(2), None),
+ lag(col("b"), Some(2), Some(ScalarValue::from(100))),
nth_value(col("b"), 1, vec![]),
nth_value(
col("b"),
diff --git a/datafusion/sqllogictest/test_files/union.slt
b/datafusion/sqllogictest/test_files/union.slt
index a3d0ff4383..fb7afdda2e 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -503,9 +503,9 @@ logical_plan
12)----Projection: Int64(1) AS cnt
13)------Limit: skip=0, fetch=3
14)--------EmptyRelation
-15)----Projection: LEAD(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND
UNBOUNDED FOLLOWING AS cnt
+15)----Projection: lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND
UNBOUNDED FOLLOWING AS cnt
16)------Limit: skip=0, fetch=3
-17)--------WindowAggr: windowExpr=[[LEAD(b.c1, Int64(1)) ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
+17)--------WindowAggr: windowExpr=[[lead(b.c1, Int64(1)) ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
18)----------SubqueryAlias: b
19)------------Projection: Int64(1) AS c1
20)--------------EmptyRelation
@@ -528,8 +528,8 @@ physical_plan
16)------------------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1,
c13], has_header=true
17)------ProjectionExec: expr=[1 as cnt]
18)--------PlaceholderRowExec
-19)------ProjectionExec: expr=[LEAD(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt]
-20)--------BoundedWindowAggExec: wdw=[LEAD(b.c1,Int64(1)) ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name:
"LEAD(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)),
end_bound: Following(UInt64(NULL)), is_causal: false }], mode=[Sorted]
+19)------ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt]
+20)--------BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name:
"lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)),
end_bound: Following(UInt64(NULL)), is_causal: false }], mode=[Sorted]
21)----------ProjectionExec: expr=[1 as c1]
22)------------PlaceholderRowExec
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index 1b612f9212..b3f2786d3d 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -1376,16 +1376,16 @@ EXPLAIN SELECT
LIMIT 5
----
logical_plan
-01)Projection: aggregate_test_100.c9, first_value(aggregate_test_100.c9) ORDER
BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING AS fv1, first_value(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING AS fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW AS lag1, LAG(aggregate_test_100.c9,Int64( [...]
+01)Projection: aggregate_test_100.c9, first_value(aggregate_test_100.c9) ORDER
BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING AS fv1, first_value(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING AS fv2, lag(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW AS lag1, lag(aggregate_test_100.c9,Int64( [...]
02)--Limit: skip=0, fetch=5
-03)----WindowAggr: windowExpr=[[first_value(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING, LAG(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW, LEAD(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]]
-04)------WindowAggr: windowExpr=[[first_value(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING, LAG(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1
FOLLOWING, LEAD(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1
FOLLOWING]]
+03)----WindowAggr: windowExpr=[[first_value(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING, lag(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW, lead(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]]
+04)------WindowAggr: windowExpr=[[first_value(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING, lag(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1
FOLLOWING, lead(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1
FOLLOWING]]
05)--------TableScan: aggregate_test_100 projection=[c9]
physical_plan
-01)ProjectionExec: expr=[c9@0 as c9, first_value(aggregate_test_100.c9) ORDER
BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@4 as fv1, first_value(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@1 as fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@5 as lag1, LAG(aggregate_test_100.c9,I [...]
+01)ProjectionExec: expr=[c9@0 as c9, first_value(aggregate_test_100.c9) ORDER
BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@4 as fv1, first_value(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@1 as fv2, lag(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@5 as lag1, lag(aggregate_test_100.c9,I [...]
02)--GlobalLimitExec: skip=0, fetch=5
-03)----BoundedWindowAggExec: wdw=[first_value(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING: Ok(Field { name: "first_value(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(5)), end_bound: Following(UInt64(1)), [...]
-04)------BoundedWindowAggExec: wdw=[first_value(aggregate_test_100.c9) ORDER
BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING: Ok(Field { name: "first_value(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(1)), end_bound: Following(UInt6 [...]
+03)----BoundedWindowAggExec: wdw=[first_value(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING: Ok(Field { name: "first_value(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(5)), end_bound: Following(UInt64(1)), [...]
+04)------BoundedWindowAggExec: wdw=[first_value(aggregate_test_100.c9) ORDER
BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING: Ok(Field { name: "first_value(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(1)), end_bound: Following(UInt6 [...]
05)--------SortExec: expr=[c9@0 DESC], preserve_partitioning=[false]
06)----------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9],
has_header=true
@@ -2636,15 +2636,15 @@ EXPLAIN SELECT
----
logical_plan
01)Sort: annotated_data_finite.ts DESC NULLS FIRST, fetch=5
-02)--Projection: annotated_data_finite.ts,
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv1,
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv2,
last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lv1,
last_value(annot [...]
-03)----WindowAggr: windowExpr=[[first_value(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING, first_value(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1
FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER [...]
-04)------WindowAggr: windowExpr=[[first_value(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING, first_value(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1
FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING, last_value(annotated_data_finite.inc_col [...]
+02)--Projection: annotated_data_finite.ts,
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv1,
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv2,
last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lv1,
last_value(annot [...]
+03)----WindowAggr: windowExpr=[[first_value(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING, first_value(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1
FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER [...]
+04)------WindowAggr: windowExpr=[[first_value(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING, first_value(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1
FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING, last_value(annotated_data_finite.inc_col [...]
05)--------TableScan: annotated_data_finite projection=[ts, inc_col]
physical_plan
01)SortExec: TopK(fetch=5), expr=[ts@0 DESC], preserve_partitioning=[false]
-02)--ProjectionExec: expr=[ts@0 as ts,
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1,
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2,
last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1,
last_value( [...]
-03)----BoundedWindowAggExec: wdw=[first_value(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int32(10)), end_bo [...]
-04)------BoundedWindowAggExec: wdw=[first_value(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int32(1)), e [...]
+02)--ProjectionExec: expr=[ts@0 as ts,
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1,
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2,
last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1,
last_value( [...]
+03)----BoundedWindowAggExec: wdw=[first_value(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int32(10)), end_bo [...]
+04)------BoundedWindowAggExec: wdw=[first_value(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int32(1)), e [...]
05)--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts,
inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
query IIIIIIIIIIIIIIIIIIIIIIIII
@@ -4971,6 +4971,26 @@ SELECT LAG(NULL, 1, false) OVER () FROM t1;
----
false
+query B
+SELECT LEAD(NULL, 0, true) OVER () FROM t1;
+----
+NULL
+
+query B
+SELECT LAG(NULL, 0, true) OVER () FROM t1;
+----
+NULL
+
+query B
+SELECT LEAD(NULL, 1, true) OVER () FROM t1;
+----
+true
+
+query B
+SELECT LAG(NULL, 1, true) OVER () FROM t1;
+----
+true
+
statement ok
insert into t1 values (2);
@@ -4986,6 +5006,18 @@ SELECT LAG(NULL, 1, false) OVER () FROM t1;
false
NULL
+query B
+SELECT LEAD(NULL, 1, true) OVER () FROM t1;
+----
+NULL
+true
+
+query B
+SELECT LAG(NULL, 1, true) OVER () FROM t1;
+----
+true
+NULL
+
statement ok
DROP TABLE t1;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]