This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new efe5708978 Convert `BuiltInWindowFunction::{Lead, Lag}` to a user 
defined window function (#12857)
efe5708978 is described below

commit efe5708978a480d11d5406a7d7df76d73e15c5d7
Author: jcsherin <[email protected]>
AuthorDate: Fri Oct 18 16:56:41 2024 +0530

    Convert `BuiltInWindowFunction::{Lead, Lag}` to a user defined window 
function (#12857)
    
    * Move `lead-lag` to `functions-window` package
    
    * Builds with warnings
    
    * Adds `PartitionEvaluatorArgs`
    
    * Extracts `shift_offset` from input expressions
    
    * Computes shift offset
    
    * Get default value from input expression
    
    * Implements `partition_evaluator`
    
    * Fixes compiler warnings
    
    * Comments out failing tests
    
    * Fixes `cargo test` errors and warnings
    
    * Minor: taplo formatting
    
    * Delete code
    
    * Define `lead`, `lag` user-defined window functions
    
    * Fixes `cargo build` errors
    
    * Export udwf and expression public APIs
    
    * Mark result field as nullable
    
    * Delete `return_type` tests for `lead` and `lag`
    
    * Disables test: window function case insensitive
    
    * Fixes: lowercase name in logical plan
    
    * Reverts to old methods for computing `shift_offset`, `default_value`
    
    * Implements expression reversal
    
    * Fixes: lowercase name in logical plans
    
    * Fixes: doc test compilation errors
    Fixes: doc test build errors
    
    * Temporarily quite clippy errors
    
    * Fixes proto defintion
    
    * Minor: fixes formatting
    
    * Fixes: doc tests
    
    * Uses macro for defining `lag_udwf()` and `leag_udwf()`
    
    * Fixes: window fuzz test cases
    
    * Copies doc comments verbatim from `BuiltInWindowFunction` enum
    
    * Deletes from window function case insensitive test
    
    * Deletes `BuiltInWindowFunction` expression APIs
    
    * Delete from `create_built_in_window_expr`
    
    * Deletes proto serialization
    
    * Delete from `BuiltInWindowFunction` enum
    
    * Deletes test for finding built-in window function
    
    * Fixes build errors + deletes redundant code
    
    * Deletes more code
    
    * Delete unnecessary structs
    
    * Refactors shift offset computation
    
    * Passes range unit test
    
    * Fixes: clippy::get-first error
    
    * Rewrite unit tests for WindowUDF
    
    * Fixes: unit test for lag with default value
    
    * Consistent input expressions and data types in unit tests
    
    * Minor: fixes formatting
    
    * Restore original helper method for unit tests
    
    * Revert "Refactors shift offset computation"
    
    This reverts commit 000ceb76409e66230f9c5017a30fa3c9bb1e6575.
    
    * Moves helper functions into `functions-window-common` package
    
    * Uses common helper functions in `{lead, lag}`
    
    * Minor: formatting
    
    * Revert "Moves helper functions into `functions-window-common` package"
    
    This reverts commit ab8a83c9c11ca3a245278f6f300438feaacb0978.
    
    * Moves common functions to utils
    
    * Minor: formatting fixes
    
    * Update lowercase names in explain output
    
    * Adds doc for `lead()` and `lag()` expression functions
    
    * Add doc for `WindowShiftKind::shift_offset`
    
    * Remove `arrow` dev dependency
    
    * Minor: formatting
    
    * Update inner doc comment
    
    * Serialize 1 or more window function arguments
    
    * Adds logical plan roundtrip test cases
    
    * Refactor: readability of unit tests
    
    * Minor: rename variable bindings
    
    * Minor: copy edit
    
    * Revert "Remove `arrow` dev dependency"
    
    This reverts commit 3eb09856c8ec4ddce20472deee2df590c2fd3f35.
    
    * Move null argument handling helper to utils
    
    * Disable failing sqllogic tests for handling NULL input
    
    * Revert "Disable failing sqllogic tests for handling NULL input"
    
    This reverts commit 270a2030637012d549c001e973a0a1bb6b3d4dd0.
    
    * Fixes: incorrect NULL handling in `lead`/`lag` window function
    
    * Adds more tests cases
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion-cli/Cargo.lock                          |   1 +
 datafusion/core/tests/fuzz_cases/window_fuzz.rs    |  13 +-
 datafusion/expr/src/built_in_window_function.rs    |  32 +-
 datafusion/expr/src/expr.rs                        |  38 --
 datafusion/expr/src/udwf.rs                        |  23 ++
 datafusion/expr/src/window_function.rs             |  34 --
 datafusion/functions-window-common/src/expr.rs     |  64 ++++
 datafusion/functions-window-common/src/lib.rs      |   1 +
 datafusion/functions-window/Cargo.toml             |   1 +
 .../window => functions-window/src}/lead_lag.rs    | 392 ++++++++++++++-------
 datafusion/functions-window/src/lib.rs             |   8 +
 datafusion/functions-window/src/utils.rs           |  53 +++
 datafusion/physical-expr/src/expressions/mod.rs    |   1 -
 datafusion/physical-expr/src/window/mod.rs         |   1 -
 datafusion/physical-plan/src/windows/mod.rs        |  88 +----
 datafusion/proto/proto/datafusion.proto            |   6 +-
 datafusion/proto/src/generated/pbjson.rs           |  30 +-
 datafusion/proto/src/generated/prost.rs            |  14 +-
 datafusion/proto/src/logical_plan/from_proto.rs    |  17 +-
 datafusion/proto/src/logical_plan/to_proto.rs      |  14 +-
 datafusion/proto/src/physical_plan/to_proto.rs     |  20 --
 .../proto/tests/cases/roundtrip_logical_plan.rs    |  12 +-
 datafusion/sqllogictest/test_files/union.slt       |   8 +-
 datafusion/sqllogictest/test_files/window.slt      |  56 ++-
 24 files changed, 520 insertions(+), 407 deletions(-)

diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index aa64e14fca..dfd07a7658 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1445,6 +1445,7 @@ dependencies = [
  "datafusion-common",
  "datafusion-expr",
  "datafusion-functions-window-common",
+ "datafusion-physical-expr",
  "datafusion-physical-expr-common",
  "log",
  "paste",
diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
index 4a33334770..d649919f1b 100644
--- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
@@ -45,6 +45,7 @@ use datafusion_physical_expr::{PhysicalExpr, 
PhysicalSortExpr};
 use test_utils::add_empty_batches;
 
 use datafusion::functions_window::row_number::row_number_udwf;
+use datafusion_functions_window::lead_lag::{lag_udwf, lead_udwf};
 use datafusion_functions_window::rank::{dense_rank_udwf, rank_udwf};
 use hashbrown::HashMap;
 use rand::distributions::Alphanumeric;
@@ -197,7 +198,7 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
         // )
         (
             // Window function
-            
WindowFunctionDefinition::BuiltInWindowFunction(BuiltInWindowFunction::Lag),
+            WindowFunctionDefinition::WindowUDF(lag_udwf()),
             // its name
             "LAG",
             // no argument
@@ -211,7 +212,7 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
         // )
         (
             // Window function
-            
WindowFunctionDefinition::BuiltInWindowFunction(BuiltInWindowFunction::Lead),
+            WindowFunctionDefinition::WindowUDF(lead_udwf()),
             // its name
             "LEAD",
             // no argument
@@ -393,9 +394,7 @@ fn get_random_function(
         window_fn_map.insert(
             "lead",
             (
-                WindowFunctionDefinition::BuiltInWindowFunction(
-                    BuiltInWindowFunction::Lead,
-                ),
+                WindowFunctionDefinition::WindowUDF(lead_udwf()),
                 vec![
                     arg.clone(),
                     lit(ScalarValue::Int64(Some(rng.gen_range(1..10)))),
@@ -406,9 +405,7 @@ fn get_random_function(
         window_fn_map.insert(
             "lag",
             (
-                WindowFunctionDefinition::BuiltInWindowFunction(
-                    BuiltInWindowFunction::Lag,
-                ),
+                WindowFunctionDefinition::WindowUDF(lag_udwf()),
                 vec![
                     arg.clone(),
                     lit(ScalarValue::Int64(Some(rng.gen_range(1..10)))),
diff --git a/datafusion/expr/src/built_in_window_function.rs 
b/datafusion/expr/src/built_in_window_function.rs
index 6a30080fb3..2c70a07a4e 100644
--- a/datafusion/expr/src/built_in_window_function.rs
+++ b/datafusion/expr/src/built_in_window_function.rs
@@ -22,7 +22,7 @@ use std::str::FromStr;
 
 use crate::type_coercion::functions::data_types;
 use crate::utils;
-use crate::{Signature, TypeSignature, Volatility};
+use crate::{Signature, Volatility};
 use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError, 
Result};
 
 use arrow::datatypes::DataType;
@@ -44,17 +44,7 @@ pub enum BuiltInWindowFunction {
     CumeDist,
     /// Integer ranging from 1 to the argument value, dividing the partition 
as equally as possible
     Ntile,
-    /// Returns value evaluated at the row that is offset rows before the 
current row within the partition;
-    /// If there is no such row, instead return default (which must be of the 
same type as value).
-    /// Both offset and default are evaluated with respect to the current row.
-    /// If omitted, offset defaults to 1 and default to null
-    Lag,
-    /// Returns value evaluated at the row that is offset rows after the 
current row within the partition;
-    /// If there is no such row, instead return default (which must be of the 
same type as value).
-    /// Both offset and default are evaluated with respect to the current row.
-    /// If omitted, offset defaults to 1 and default to null
-    Lead,
-    /// Returns value evaluated at the row that is the first row of the window 
frame
+    /// returns value evaluated at the row that is the first row of the window 
frame
     FirstValue,
     /// Returns value evaluated at the row that is the last row of the window 
frame
     LastValue,
@@ -68,8 +58,6 @@ impl BuiltInWindowFunction {
         match self {
             CumeDist => "CUME_DIST",
             Ntile => "NTILE",
-            Lag => "LAG",
-            Lead => "LEAD",
             FirstValue => "first_value",
             LastValue => "last_value",
             NthValue => "NTH_VALUE",
@@ -83,8 +71,6 @@ impl FromStr for BuiltInWindowFunction {
         Ok(match name.to_uppercase().as_str() {
             "CUME_DIST" => BuiltInWindowFunction::CumeDist,
             "NTILE" => BuiltInWindowFunction::Ntile,
-            "LAG" => BuiltInWindowFunction::Lag,
-            "LEAD" => BuiltInWindowFunction::Lead,
             "FIRST_VALUE" => BuiltInWindowFunction::FirstValue,
             "LAST_VALUE" => BuiltInWindowFunction::LastValue,
             "NTH_VALUE" => BuiltInWindowFunction::NthValue,
@@ -117,9 +103,7 @@ impl BuiltInWindowFunction {
         match self {
             BuiltInWindowFunction::Ntile => Ok(DataType::UInt64),
             BuiltInWindowFunction::CumeDist => Ok(DataType::Float64),
-            BuiltInWindowFunction::Lag
-            | BuiltInWindowFunction::Lead
-            | BuiltInWindowFunction::FirstValue
+            BuiltInWindowFunction::FirstValue
             | BuiltInWindowFunction::LastValue
             | BuiltInWindowFunction::NthValue => 
Ok(input_expr_types[0].clone()),
         }
@@ -130,16 +114,6 @@ impl BuiltInWindowFunction {
         // Note: The physical expression must accept the type returned by this 
function or the execution panics.
         match self {
             BuiltInWindowFunction::CumeDist => Signature::any(0, 
Volatility::Immutable),
-            BuiltInWindowFunction::Lag | BuiltInWindowFunction::Lead => {
-                Signature::one_of(
-                    vec![
-                        TypeSignature::Any(1),
-                        TypeSignature::Any(2),
-                        TypeSignature::Any(3),
-                    ],
-                    Volatility::Immutable,
-                )
-            }
             BuiltInWindowFunction::FirstValue | 
BuiltInWindowFunction::LastValue => {
                 Signature::any(1, Volatility::Immutable)
             }
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 3e692189e4..f3f71a8727 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -2560,30 +2560,6 @@ mod test {
         Ok(())
     }
 
-    #[test]
-    fn test_lead_return_type() -> Result<()> {
-        let fun = find_df_window_func("lead").unwrap();
-        let observed = fun.return_type(&[DataType::Utf8], &[true], "")?;
-        assert_eq!(DataType::Utf8, observed);
-
-        let observed = fun.return_type(&[DataType::Float64], &[true], "")?;
-        assert_eq!(DataType::Float64, observed);
-
-        Ok(())
-    }
-
-    #[test]
-    fn test_lag_return_type() -> Result<()> {
-        let fun = find_df_window_func("lag").unwrap();
-        let observed = fun.return_type(&[DataType::Utf8], &[true], "")?;
-        assert_eq!(DataType::Utf8, observed);
-
-        let observed = fun.return_type(&[DataType::Float64], &[true], "")?;
-        assert_eq!(DataType::Float64, observed);
-
-        Ok(())
-    }
-
     #[test]
     fn test_nth_value_return_type() -> Result<()> {
         let fun = find_df_window_func("nth_value").unwrap();
@@ -2621,8 +2597,6 @@ mod test {
         let names = vec![
             "cume_dist",
             "ntile",
-            "lag",
-            "lead",
             "first_value",
             "last_value",
             "nth_value",
@@ -2660,18 +2634,6 @@ mod test {
                 built_in_window_function::BuiltInWindowFunction::LastValue
             ))
         );
-        assert_eq!(
-            find_df_window_func("LAG"),
-            Some(WindowFunctionDefinition::BuiltInWindowFunction(
-                built_in_window_function::BuiltInWindowFunction::Lag
-            ))
-        );
-        assert_eq!(
-            find_df_window_func("LEAD"),
-            Some(WindowFunctionDefinition::BuiltInWindowFunction(
-                built_in_window_function::BuiltInWindowFunction::Lead
-            ))
-        );
         assert_eq!(find_df_window_func("not_exist"), None)
     }
 
diff --git a/datafusion/expr/src/udwf.rs b/datafusion/expr/src/udwf.rs
index 6d8f2be97e..6ab94c1e84 100644
--- a/datafusion/expr/src/udwf.rs
+++ b/datafusion/expr/src/udwf.rs
@@ -34,8 +34,10 @@ use crate::{
     Signature,
 };
 use datafusion_common::{not_impl_err, Result};
+use datafusion_functions_window_common::expr::ExpressionArgs;
 use datafusion_functions_window_common::field::WindowUDFFieldArgs;
 use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 
 /// Logical representation of a user-defined window function (UDWF)
 /// A UDWF is different from a UDF in that it is stateful across batches.
@@ -149,6 +151,12 @@ impl WindowUDF {
         self.inner.simplify()
     }
 
+    /// Expressions that are passed to the [`PartitionEvaluator`].
+    ///
+    /// See [`WindowUDFImpl::expressions`] for more details.
+    pub fn expressions(&self, expr_args: ExpressionArgs) -> Vec<Arc<dyn 
PhysicalExpr>> {
+        self.inner.expressions(expr_args)
+    }
     /// Return a `PartitionEvaluator` for evaluating this window function
     pub fn partition_evaluator_factory(
         &self,
@@ -302,6 +310,14 @@ pub trait WindowUDFImpl: Debug + Send + Sync {
     /// types are accepted and the function's Volatility.
     fn signature(&self) -> &Signature;
 
+    /// Returns the expressions that are passed to the [`PartitionEvaluator`].
+    fn expressions(&self, expr_args: ExpressionArgs) -> Vec<Arc<dyn 
PhysicalExpr>> {
+        expr_args
+            .input_exprs()
+            .first()
+            .map_or(vec![], |expr| vec![Arc::clone(expr)])
+    }
+
     /// Invoke the function, returning the [`PartitionEvaluator`] instance
     fn partition_evaluator(
         &self,
@@ -480,6 +496,13 @@ impl WindowUDFImpl for AliasedWindowUDFImpl {
         self.inner.signature()
     }
 
+    fn expressions(&self, expr_args: ExpressionArgs) -> Vec<Arc<dyn 
PhysicalExpr>> {
+        expr_args
+            .input_exprs()
+            .first()
+            .map_or(vec![], |expr| vec![Arc::clone(expr)])
+    }
+
     fn partition_evaluator(
         &self,
         partition_evaluator_args: PartitionEvaluatorArgs,
diff --git a/datafusion/expr/src/window_function.rs 
b/datafusion/expr/src/window_function.rs
index 7ac6fb7d16..3e1870c59c 100644
--- a/datafusion/expr/src/window_function.rs
+++ b/datafusion/expr/src/window_function.rs
@@ -15,8 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use datafusion_common::ScalarValue;
-
 use crate::{expr::WindowFunction, BuiltInWindowFunction, Expr, Literal};
 
 /// Create an expression to represent the `cume_dist` window function
@@ -29,38 +27,6 @@ pub fn ntile(arg: Expr) -> Expr {
     Expr::WindowFunction(WindowFunction::new(BuiltInWindowFunction::Ntile, 
vec![arg]))
 }
 
-/// Create an expression to represent the `lag` window function
-pub fn lag(
-    arg: Expr,
-    shift_offset: Option<i64>,
-    default_value: Option<ScalarValue>,
-) -> Expr {
-    let shift_offset_lit = shift_offset
-        .map(|v| v.lit())
-        .unwrap_or(ScalarValue::Null.lit());
-    let default_lit = default_value.unwrap_or(ScalarValue::Null).lit();
-    Expr::WindowFunction(WindowFunction::new(
-        BuiltInWindowFunction::Lag,
-        vec![arg, shift_offset_lit, default_lit],
-    ))
-}
-
-/// Create an expression to represent the `lead` window function
-pub fn lead(
-    arg: Expr,
-    shift_offset: Option<i64>,
-    default_value: Option<ScalarValue>,
-) -> Expr {
-    let shift_offset_lit = shift_offset
-        .map(|v| v.lit())
-        .unwrap_or(ScalarValue::Null.lit());
-    let default_lit = default_value.unwrap_or(ScalarValue::Null).lit();
-    Expr::WindowFunction(WindowFunction::new(
-        BuiltInWindowFunction::Lead,
-        vec![arg, shift_offset_lit, default_lit],
-    ))
-}
-
 /// Create an expression to represent the `nth_value` window function
 pub fn nth_value(arg: Expr, n: i64) -> Expr {
     Expr::WindowFunction(WindowFunction::new(
diff --git a/datafusion/functions-window-common/src/expr.rs 
b/datafusion/functions-window-common/src/expr.rs
new file mode 100644
index 0000000000..1d99fe7acf
--- /dev/null
+++ b/datafusion/functions-window-common/src/expr.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::arrow::datatypes::DataType;
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use std::sync::Arc;
+
+/// Arguments passed to user-defined window function
+#[derive(Debug, Default)]
+pub struct ExpressionArgs<'a> {
+    /// The expressions passed as arguments to the user-defined window
+    /// function.
+    input_exprs: &'a [Arc<dyn PhysicalExpr>],
+    /// The corresponding data types of expressions passed as arguments
+    /// to the user-defined window function.
+    input_types: &'a [DataType],
+}
+
+impl<'a> ExpressionArgs<'a> {
+    /// Create an instance of [`ExpressionArgs`].
+    ///
+    /// # Arguments
+    ///
+    /// * `input_exprs` - The expressions passed as arguments
+    ///     to the user-defined window function.
+    /// * `input_types` - The data types corresponding to the
+    ///     arguments to the user-defined window function.
+    ///
+    pub fn new(
+        input_exprs: &'a [Arc<dyn PhysicalExpr>],
+        input_types: &'a [DataType],
+    ) -> Self {
+        Self {
+            input_exprs,
+            input_types,
+        }
+    }
+
+    /// Returns the expressions passed as arguments to the user-defined
+    /// window function.
+    pub fn input_exprs(&self) -> &'a [Arc<dyn PhysicalExpr>] {
+        self.input_exprs
+    }
+
+    /// Returns the [`DataType`]s corresponding to the input expressions
+    /// to the user-defined window function.
+    pub fn input_types(&self) -> &'a [DataType] {
+        self.input_types
+    }
+}
diff --git a/datafusion/functions-window-common/src/lib.rs 
b/datafusion/functions-window-common/src/lib.rs
index 53f9eb1c9a..da8d096da5 100644
--- a/datafusion/functions-window-common/src/lib.rs
+++ b/datafusion/functions-window-common/src/lib.rs
@@ -18,5 +18,6 @@
 //! Common user-defined window functionality for [DataFusion]
 //!
 //! [DataFusion]: <https://crates.io/crates/datafusion>
+pub mod expr;
 pub mod field;
 pub mod partition;
diff --git a/datafusion/functions-window/Cargo.toml 
b/datafusion/functions-window/Cargo.toml
index 952e5720c7..262c21fcec 100644
--- a/datafusion/functions-window/Cargo.toml
+++ b/datafusion/functions-window/Cargo.toml
@@ -41,6 +41,7 @@ path = "src/lib.rs"
 datafusion-common = { workspace = true }
 datafusion-expr = { workspace = true }
 datafusion-functions-window-common = { workspace = true }
+datafusion-physical-expr = { workspace = true }
 datafusion-physical-expr-common = { workspace = true }
 log = { workspace = true }
 paste = "1.0.15"
diff --git a/datafusion/physical-expr/src/window/lead_lag.rs 
b/datafusion/functions-window/src/lead_lag.rs
similarity index 59%
rename from datafusion/physical-expr/src/window/lead_lag.rs
rename to datafusion/functions-window/src/lead_lag.rs
index 1656b7c303..f815210997 100644
--- a/datafusion/physical-expr/src/window/lead_lag.rs
+++ b/datafusion/functions-window/src/lead_lag.rs
@@ -15,125 +15,275 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines physical expression for `lead` and `lag` that can evaluated
-//! at runtime during query execution
-use crate::window::BuiltInWindowFunctionExpr;
-use crate::PhysicalExpr;
-use arrow::array::ArrayRef;
-use arrow::datatypes::{DataType, Field};
-use arrow_array::Array;
+//! `lead` and `lag` window function implementations
+
+use crate::utils::{get_scalar_value_from_args, get_signed_integer};
+use datafusion_common::arrow::array::ArrayRef;
+use datafusion_common::arrow::datatypes::DataType;
+use datafusion_common::arrow::datatypes::Field;
 use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, 
ScalarValue};
-use datafusion_expr::PartitionEvaluator;
+use datafusion_expr::{
+    Literal, PartitionEvaluator, ReversedUDWF, Signature, TypeSignature, 
Volatility,
+    WindowUDFImpl,
+};
+use datafusion_functions_window_common::expr::ExpressionArgs;
+use datafusion_functions_window_common::field::WindowUDFFieldArgs;
+use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 use std::any::Any;
 use std::cmp::min;
 use std::collections::VecDeque;
 use std::ops::{Neg, Range};
 use std::sync::Arc;
 
-/// window shift expression
+get_or_init_udwf!(
+    Lag,
+    lag,
+    "Returns the row value that precedes the current row by a specified \
+    offset within partition. If no such row exists, then returns the \
+    default value.",
+    WindowShift::lag
+);
+get_or_init_udwf!(
+    Lead,
+    lead,
+    "Returns the value from a row that follows the current row by a \
+    specified offset within the partition. If no such row exists, then \
+    returns the default value.",
+    WindowShift::lead
+);
+
+/// Create an expression to represent the `lag` window function
+///
+/// returns value evaluated at the row that is offset rows before the current 
row within the partition;
+/// if there is no such row, instead return default (which must be of the same 
type as value).
+/// Both offset and default are evaluated with respect to the current row.
+/// If omitted, offset defaults to 1 and default to null
+pub fn lag(
+    arg: datafusion_expr::Expr,
+    shift_offset: Option<i64>,
+    default_value: Option<ScalarValue>,
+) -> datafusion_expr::Expr {
+    let shift_offset_lit = shift_offset
+        .map(|v| v.lit())
+        .unwrap_or(ScalarValue::Null.lit());
+    let default_lit = default_value.unwrap_or(ScalarValue::Null).lit();
+
+    lag_udwf().call(vec![arg, shift_offset_lit, default_lit])
+}
+
+/// Create an expression to represent the `lead` window function
+///
+/// returns value evaluated at the row that is offset rows after the current 
row within the partition;
+/// if there is no such row, instead return default (which must be of the same 
type as value).
+/// Both offset and default are evaluated with respect to the current row.
+/// If omitted, offset defaults to 1 and default to null
+pub fn lead(
+    arg: datafusion_expr::Expr,
+    shift_offset: Option<i64>,
+    default_value: Option<ScalarValue>,
+) -> datafusion_expr::Expr {
+    let shift_offset_lit = shift_offset
+        .map(|v| v.lit())
+        .unwrap_or(ScalarValue::Null.lit());
+    let default_lit = default_value.unwrap_or(ScalarValue::Null).lit();
+
+    lead_udwf().call(vec![arg, shift_offset_lit, default_lit])
+}
+
 #[derive(Debug)]
-pub struct WindowShift {
-    name: String,
-    /// Output data type
-    data_type: DataType,
-    shift_offset: i64,
-    expr: Arc<dyn PhysicalExpr>,
-    default_value: ScalarValue,
-    ignore_nulls: bool,
+enum WindowShiftKind {
+    Lag,
+    Lead,
 }
 
-impl WindowShift {
-    /// Get shift_offset of window shift expression
-    pub fn get_shift_offset(&self) -> i64 {
-        self.shift_offset
+impl WindowShiftKind {
+    fn name(&self) -> &'static str {
+        match self {
+            WindowShiftKind::Lag => "lag",
+            WindowShiftKind::Lead => "lead",
+        }
     }
 
-    /// Get the default_value for window shift expression.
-    pub fn get_default_value(&self) -> ScalarValue {
-        self.default_value.clone()
+    /// In [`WindowShiftEvaluator`] a positive offset is used to signal
+    /// computation of `lag()`. So here we negate the input offset
+    /// value when computing `lead()`.
+    fn shift_offset(&self, value: Option<i64>) -> i64 {
+        match self {
+            WindowShiftKind::Lag => value.unwrap_or(1),
+            WindowShiftKind::Lead => value.map(|v| v.neg()).unwrap_or(-1),
+        }
     }
 }
 
-/// lead() window function
-pub fn lead(
-    name: String,
-    data_type: DataType,
-    expr: Arc<dyn PhysicalExpr>,
-    shift_offset: Option<i64>,
-    default_value: ScalarValue,
-    ignore_nulls: bool,
-) -> WindowShift {
-    WindowShift {
-        name,
-        data_type,
-        shift_offset: shift_offset.map(|v| v.neg()).unwrap_or(-1),
-        expr,
-        default_value,
-        ignore_nulls,
-    }
+/// window shift expression
+#[derive(Debug)]
+pub struct WindowShift {
+    signature: Signature,
+    kind: WindowShiftKind,
 }
 
-/// lag() window function
-pub fn lag(
-    name: String,
-    data_type: DataType,
-    expr: Arc<dyn PhysicalExpr>,
-    shift_offset: Option<i64>,
-    default_value: ScalarValue,
-    ignore_nulls: bool,
-) -> WindowShift {
-    WindowShift {
-        name,
-        data_type,
-        shift_offset: shift_offset.unwrap_or(1),
-        expr,
-        default_value,
-        ignore_nulls,
+impl WindowShift {
+    fn new(kind: WindowShiftKind) -> Self {
+        Self {
+            signature: Signature::one_of(
+                vec![
+                    TypeSignature::Any(1),
+                    TypeSignature::Any(2),
+                    TypeSignature::Any(3),
+                ],
+                Volatility::Immutable,
+            ),
+            kind,
+        }
+    }
+
+    pub fn lag() -> Self {
+        Self::new(WindowShiftKind::Lag)
+    }
+
+    pub fn lead() -> Self {
+        Self::new(WindowShiftKind::Lead)
     }
 }
 
-impl BuiltInWindowFunctionExpr for WindowShift {
-    /// Return a reference to Any that can be used for downcasting
+impl WindowUDFImpl for WindowShift {
     fn as_any(&self) -> &dyn Any {
         self
     }
 
-    fn field(&self) -> Result<Field> {
-        let nullable = true;
-        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    fn name(&self) -> &str {
+        self.kind.name()
     }
 
-    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![Arc::clone(&self.expr)]
+    fn signature(&self) -> &Signature {
+        &self.signature
     }
 
-    fn name(&self) -> &str {
-        &self.name
+    /// Handles the case where `NULL` expression is passed as an
+    /// argument to `lead`/`lag`. The type is refined depending
+    /// on the default value argument.
+    ///
+    /// For more details see: 
<https://github.com/apache/datafusion/issues/12717>
+    fn expressions(&self, expr_args: ExpressionArgs) -> Vec<Arc<dyn 
PhysicalExpr>> {
+        parse_expr(expr_args.input_exprs(), expr_args.input_types())
+            .into_iter()
+            .collect::<Vec<_>>()
     }
 
-    fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
+    fn partition_evaluator(
+        &self,
+        partition_evaluator_args: PartitionEvaluatorArgs,
+    ) -> Result<Box<dyn PartitionEvaluator>> {
+        let shift_offset =
+            get_scalar_value_from_args(partition_evaluator_args.input_exprs(), 
1)?
+                .map(get_signed_integer)
+                .map_or(Ok(None), |v| v.map(Some))
+                .map(|n| self.kind.shift_offset(n))
+                .map(|offset| {
+                    if partition_evaluator_args.is_reversed() {
+                        -offset
+                    } else {
+                        offset
+                    }
+                })?;
+        let default_value = parse_default_value(
+            partition_evaluator_args.input_exprs(),
+            partition_evaluator_args.input_types(),
+        )?;
+
         Ok(Box::new(WindowShiftEvaluator {
-            shift_offset: self.shift_offset,
-            default_value: self.default_value.clone(),
-            ignore_nulls: self.ignore_nulls,
+            shift_offset,
+            default_value,
+            ignore_nulls: partition_evaluator_args.ignore_nulls(),
             non_null_offsets: VecDeque::new(),
         }))
     }
 
-    fn reverse_expr(&self) -> Option<Arc<dyn BuiltInWindowFunctionExpr>> {
-        Some(Arc::new(Self {
-            name: self.name.clone(),
-            data_type: self.data_type.clone(),
-            shift_offset: -self.shift_offset,
-            expr: Arc::clone(&self.expr),
-            default_value: self.default_value.clone(),
-            ignore_nulls: self.ignore_nulls,
-        }))
+    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+        let return_type = parse_expr_type(field_args.input_types())?;
+
+        Ok(Field::new(field_args.name(), return_type, true))
     }
+
+    fn reverse_expr(&self) -> ReversedUDWF {
+        match self.kind {
+            WindowShiftKind::Lag => ReversedUDWF::Reversed(lag_udwf()),
+            WindowShiftKind::Lead => ReversedUDWF::Reversed(lead_udwf()),
+        }
+    }
+}
+
+/// When `lead`/`lag` is evaluated on a `NULL` expression we attempt to
+/// refine it by matching it with the type of the default value.
+///
+/// For e.g. in `lead(NULL, 1, false)` the generic `ScalarValue::Null`
+/// is refined into `ScalarValue::Boolean(None)`. Only the type is
+/// refined, the expression value remains `NULL`.
+///
+/// When the window function is evaluated with `NULL` expression
+/// this guarantees that the type matches with that of the default
+/// value.
+///
+/// For more details see: <https://github.com/apache/datafusion/issues/12717>
+fn parse_expr(
+    input_exprs: &[Arc<dyn PhysicalExpr>],
+    input_types: &[DataType],
+) -> Result<Arc<dyn PhysicalExpr>> {
+    assert!(!input_exprs.is_empty());
+    assert!(!input_types.is_empty());
+
+    let expr = Arc::clone(input_exprs.first().unwrap());
+    let expr_type = input_types.first().unwrap();
+
+    // Handles the most common case where NULL is unexpected
+    if !expr_type.is_null() {
+        return Ok(expr);
+    }
+
+    let default_value = get_scalar_value_from_args(input_exprs, 2)?;
+    default_value.map_or(Ok(expr), |value| {
+        ScalarValue::try_from(&value.data_type()).map(|v| {
+            Arc::new(datafusion_physical_expr::expressions::Literal::new(v))
+                as Arc<dyn PhysicalExpr>
+        })
+    })
+}
+
+/// Returns the data type of the default value(if provided) when the
+/// expression is `NULL`.
+///
+/// Otherwise, returns the expression type unchanged.
+fn parse_expr_type(input_types: &[DataType]) -> Result<DataType> {
+    assert!(!input_types.is_empty());
+    let expr_type = input_types.first().unwrap_or(&DataType::Null);
+
+    // Handles the most common case where NULL is unexpected
+    if !expr_type.is_null() {
+        return Ok(expr_type.clone());
+    }
+
+    let default_value_type = input_types.get(2).unwrap_or(&DataType::Null);
+    Ok(default_value_type.clone())
+}
+
+/// Handles type coercion and null value refinement for default value
+/// argument depending on the data type of the input expression.
+fn parse_default_value(
+    input_exprs: &[Arc<dyn PhysicalExpr>],
+    input_types: &[DataType],
+) -> Result<ScalarValue> {
+    let expr_type = parse_expr_type(input_types)?;
+    let unparsed = get_scalar_value_from_args(input_exprs, 2)?;
+
+    unparsed
+        .filter(|v| !v.data_type().is_null())
+        .map(|v| v.cast_to(&expr_type))
+        .unwrap_or(ScalarValue::try_from(expr_type))
 }
 
 #[derive(Debug)]
-pub(crate) struct WindowShiftEvaluator {
+struct WindowShiftEvaluator {
     shift_offset: i64,
     default_value: ScalarValue,
     ignore_nulls: bool,
@@ -205,7 +355,7 @@ fn shift_with_default_value(
     offset: i64,
     default_value: &ScalarValue,
 ) -> Result<ArrayRef> {
-    use arrow::compute::concat;
+    use datafusion_common::arrow::compute::concat;
 
     let value_len = array.len() as i64;
     if offset == 0 {
@@ -402,19 +552,22 @@ impl PartitionEvaluator for WindowShiftEvaluator {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::expressions::Column;
-    use arrow::{array::*, datatypes::*};
+    use arrow::array::*;
     use datafusion_common::cast::as_int32_array;
-
-    fn test_i32_result(expr: WindowShift, expected: Int32Array) -> Result<()> {
+    use datafusion_physical_expr::expressions::{Column, Literal};
+    use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+
+    fn test_i32_result(
+        expr: WindowShift,
+        partition_evaluator_args: PartitionEvaluatorArgs,
+        expected: Int32Array,
+    ) -> Result<()> {
         let arr: ArrayRef = Arc::new(Int32Array::from(vec![1, -2, 3, -4, 5, 
-6, 7, 8]));
         let values = vec![arr];
-        let schema = Schema::new(vec![Field::new("arr", DataType::Int32, 
false)]);
-        let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
-        let values = expr.evaluate_args(&batch)?;
+        let num_rows = values.len();
         let result = expr
-            .create_evaluator()?
-            .evaluate_all(&values, batch.num_rows())?;
+            .partition_evaluator(partition_evaluator_args)?
+            .evaluate_all(&values, num_rows)?;
         let result = as_int32_array(&result)?;
         assert_eq!(expected, *result);
         Ok(())
@@ -466,16 +619,12 @@ mod tests {
     }
 
     #[test]
-    fn lead_lag_window_shift() -> Result<()> {
+    fn test_lead_window_shift() -> Result<()> {
+        let expr = Arc::new(Column::new("c3", 0)) as Arc<dyn PhysicalExpr>;
+
         test_i32_result(
-            lead(
-                "lead".to_owned(),
-                DataType::Int32,
-                Arc::new(Column::new("c3", 0)),
-                None,
-                ScalarValue::Null.cast_to(&DataType::Int32)?,
-                false,
-            ),
+            WindowShift::lead(),
+            PartitionEvaluatorArgs::new(&[expr], &[DataType::Int32], false, 
false),
             [
                 Some(-2),
                 Some(3),
@@ -488,17 +637,16 @@ mod tests {
             ]
             .iter()
             .collect::<Int32Array>(),
-        )?;
+        )
+    }
+
+    #[test]
+    fn test_lag_window_shift() -> Result<()> {
+        let expr = Arc::new(Column::new("c3", 0)) as Arc<dyn PhysicalExpr>;
 
         test_i32_result(
-            lag(
-                "lead".to_owned(),
-                DataType::Int32,
-                Arc::new(Column::new("c3", 0)),
-                None,
-                ScalarValue::Null.cast_to(&DataType::Int32)?,
-                false,
-            ),
+            WindowShift::lag(),
+            PartitionEvaluatorArgs::new(&[expr], &[DataType::Int32], false, 
false),
             [
                 None,
                 Some(1),
@@ -511,17 +659,24 @@ mod tests {
             ]
             .iter()
             .collect::<Int32Array>(),
-        )?;
+        )
+    }
+
+    #[test]
+    fn test_lag_with_default() -> Result<()> {
+        let expr = Arc::new(Column::new("c3", 0)) as Arc<dyn PhysicalExpr>;
+        let shift_offset =
+            Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn 
PhysicalExpr>;
+        let default_value = 
Arc::new(Literal::new(ScalarValue::Int32(Some(100))))
+            as Arc<dyn PhysicalExpr>;
+
+        let input_exprs = &[expr, shift_offset, default_value];
+        let input_types: &[DataType] =
+            &[DataType::Int32, DataType::Int32, DataType::Int32];
 
         test_i32_result(
-            lag(
-                "lead".to_owned(),
-                DataType::Int32,
-                Arc::new(Column::new("c3", 0)),
-                None,
-                ScalarValue::Int32(Some(100)),
-                false,
-            ),
+            WindowShift::lag(),
+            PartitionEvaluatorArgs::new(input_exprs, input_types, false, 
false),
             [
                 Some(100),
                 Some(1),
@@ -534,7 +689,6 @@ mod tests {
             ]
             .iter()
             .collect::<Int32Array>(),
-        )?;
-        Ok(())
+        )
     }
 }
diff --git a/datafusion/functions-window/src/lib.rs 
b/datafusion/functions-window/src/lib.rs
index ef624e13e6..5a2aafa289 100644
--- a/datafusion/functions-window/src/lib.rs
+++ b/datafusion/functions-window/src/lib.rs
@@ -31,11 +31,17 @@ use datafusion_expr::WindowUDF;
 
 #[macro_use]
 pub mod macros;
+
+pub mod lead_lag;
+
 pub mod rank;
 pub mod row_number;
+mod utils;
 
 /// Fluent-style API for creating `Expr`s
 pub mod expr_fn {
+    pub use super::lead_lag::lag;
+    pub use super::lead_lag::lead;
     pub use super::rank::{dense_rank, percent_rank, rank};
     pub use super::row_number::row_number;
 }
@@ -44,6 +50,8 @@ pub mod expr_fn {
 pub fn all_default_window_functions() -> Vec<Arc<WindowUDF>> {
     vec![
         row_number::row_number_udwf(),
+        lead_lag::lead_udwf(),
+        lead_lag::lag_udwf(),
         rank::rank_udwf(),
         rank::dense_rank_udwf(),
         rank::percent_rank_udwf(),
diff --git a/datafusion/functions-window/src/utils.rs 
b/datafusion/functions-window/src/utils.rs
new file mode 100644
index 0000000000..69f68aa78f
--- /dev/null
+++ b/datafusion/functions-window/src/utils.rs
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::arrow::datatypes::DataType;
+use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue};
+use datafusion_physical_expr::expressions::Literal;
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use std::sync::Arc;
+
+pub(crate) fn get_signed_integer(value: ScalarValue) -> Result<i64> {
+    if value.is_null() {
+        return Ok(0);
+    }
+
+    if !value.data_type().is_integer() {
+        return exec_err!("Expected an integer value");
+    }
+
+    value.cast_to(&DataType::Int64)?.try_into()
+}
+
+pub(crate) fn get_scalar_value_from_args(
+    args: &[Arc<dyn PhysicalExpr>],
+    index: usize,
+) -> Result<Option<ScalarValue>> {
+    Ok(if let Some(field) = args.get(index) {
+        let tmp = field
+            .as_any()
+            .downcast_ref::<Literal>()
+            .ok_or_else(|| DataFusionError::NotImplemented(
+                format!("There is only support Literal types for field at idx: 
{index} in Window Function"),
+            ))?
+            .value()
+            .clone();
+        Some(tmp)
+    } else {
+        None
+    })
+}
diff --git a/datafusion/physical-expr/src/expressions/mod.rs 
b/datafusion/physical-expr/src/expressions/mod.rs
index e07e11e431..54b8aafdb4 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -36,7 +36,6 @@ mod unknown_column;
 /// Module with some convenient methods used in expression building
 pub use crate::aggregate::stats::StatsType;
 pub use crate::window::cume_dist::{cume_dist, CumeDist};
-pub use crate::window::lead_lag::{lag, lead, WindowShift};
 pub use crate::window::nth_value::NthValue;
 pub use crate::window::ntile::Ntile;
 pub use crate::PhysicalSortExpr;
diff --git a/datafusion/physical-expr/src/window/mod.rs 
b/datafusion/physical-expr/src/window/mod.rs
index 938bdac50f..c0fe3c2933 100644
--- a/datafusion/physical-expr/src/window/mod.rs
+++ b/datafusion/physical-expr/src/window/mod.rs
@@ -19,7 +19,6 @@ mod aggregate;
 mod built_in;
 mod built_in_window_function_expr;
 pub(crate) mod cume_dist;
-pub(crate) mod lead_lag;
 pub(crate) mod nth_value;
 pub(crate) mod ntile;
 mod sliding_aggregate;
diff --git a/datafusion/physical-plan/src/windows/mod.rs 
b/datafusion/physical-plan/src/windows/mod.rs
index e6a773f6b1..adf61f27bc 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -21,7 +21,7 @@ use std::borrow::Borrow;
 use std::sync::Arc;
 
 use crate::{
-    expressions::{cume_dist, lag, lead, Literal, NthValue, Ntile, 
PhysicalSortExpr},
+    expressions::{cume_dist, Literal, NthValue, Ntile, PhysicalSortExpr},
     ExecutionPlan, ExecutionPlanProperties, InputOrderMode, PhysicalExpr,
 };
 
@@ -48,6 +48,7 @@ mod utils;
 mod window_agg_exec;
 
 pub use bounded_window_agg_exec::BoundedWindowAggExec;
+use datafusion_functions_window_common::expr::ExpressionArgs;
 use datafusion_functions_window_common::field::WindowUDFFieldArgs;
 use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
 use datafusion_physical_expr::expressions::Column;
@@ -206,52 +207,6 @@ fn get_unsigned_integer(value: ScalarValue) -> Result<u64> 
{
     value.cast_to(&DataType::UInt64)?.try_into()
 }
 
-fn get_casted_value(
-    default_value: Option<ScalarValue>,
-    dtype: &DataType,
-) -> Result<ScalarValue> {
-    match default_value {
-        Some(v) if !v.data_type().is_null() => v.cast_to(dtype),
-        // If None or Null datatype
-        _ => ScalarValue::try_from(dtype),
-    }
-}
-
-/// Rewrites the NULL expression (1st argument) with an expression
-/// which is the same data type as the default value (3rd argument).
-/// Also rewrites the return type with the same data type as the
-/// default value.
-///
-/// If a default value is not provided, or it is NULL the original
-/// expression (1st argument) and return type is returned without
-/// any modifications.
-fn rewrite_null_expr_and_data_type(
-    args: &[Arc<dyn PhysicalExpr>],
-    expr_type: &DataType,
-) -> Result<(Arc<dyn PhysicalExpr>, DataType)> {
-    assert!(!args.is_empty());
-    let expr = Arc::clone(&args[0]);
-
-    // The input expression and the return is type is unchanged
-    // when the input expression is not NULL.
-    if !expr_type.is_null() {
-        return Ok((expr, expr_type.clone()));
-    }
-
-    get_scalar_value_from_args(args, 2)?
-        .and_then(|value| {
-            ScalarValue::try_from(value.data_type().clone())
-                .map(|sv| {
-                    Ok((
-                        Arc::new(Literal::new(sv)) as Arc<dyn PhysicalExpr>,
-                        value.data_type().clone(),
-                    ))
-                })
-                .ok()
-        })
-        .unwrap_or(Ok((expr, expr_type.clone())))
-}
-
 fn create_built_in_window_expr(
     fun: &BuiltInWindowFunction,
     args: &[Arc<dyn PhysicalExpr>],
@@ -286,42 +241,6 @@ fn create_built_in_window_expr(
                 Arc::new(Ntile::new(name, n as u64, out_data_type))
             }
         }
-        BuiltInWindowFunction::Lag => {
-            // rewrite NULL expression and the return datatype
-            let (arg, out_data_type) =
-                rewrite_null_expr_and_data_type(args, out_data_type)?;
-            let shift_offset = get_scalar_value_from_args(args, 1)?
-                .map(get_signed_integer)
-                .map_or(Ok(None), |v| v.map(Some))?;
-            let default_value =
-                get_casted_value(get_scalar_value_from_args(args, 2)?, 
&out_data_type)?;
-            Arc::new(lag(
-                name,
-                default_value.data_type().clone(),
-                arg,
-                shift_offset,
-                default_value,
-                ignore_nulls,
-            ))
-        }
-        BuiltInWindowFunction::Lead => {
-            // rewrite NULL expression and the return datatype
-            let (arg, out_data_type) =
-                rewrite_null_expr_and_data_type(args, out_data_type)?;
-            let shift_offset = get_scalar_value_from_args(args, 1)?
-                .map(get_signed_integer)
-                .map_or(Ok(None), |v| v.map(Some))?;
-            let default_value =
-                get_casted_value(get_scalar_value_from_args(args, 2)?, 
&out_data_type)?;
-            Arc::new(lead(
-                name,
-                default_value.data_type().clone(),
-                arg,
-                shift_offset,
-                default_value,
-                ignore_nulls,
-            ))
-        }
         BuiltInWindowFunction::NthValue => {
             let arg = Arc::clone(&args[0]);
             let n = get_signed_integer(
@@ -415,7 +334,8 @@ impl BuiltInWindowFunctionExpr for WindowUDFExpr {
     }
 
     fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        self.args.clone()
+        self.fun
+            .expressions(ExpressionArgs::new(&self.args, &self.input_types))
     }
 
     fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 5256f7473c..9964ab498f 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -515,8 +515,8 @@ enum BuiltInWindowFunction {
   //  PERCENT_RANK = 3;
   CUME_DIST = 4;
   NTILE = 5;
-  LAG = 6;
-  LEAD = 7;
+  // LAG = 6;
+  // LEAD = 7;
   FIRST_VALUE = 8;
   LAST_VALUE = 9;
   NTH_VALUE = 10;
@@ -528,7 +528,7 @@ message WindowExprNode {
     string udaf = 3;
     string udwf = 9;
   }
-  LogicalExprNode expr = 4;
+  repeated LogicalExprNode exprs = 4;
   repeated LogicalExprNode partition_by = 5;
   repeated SortExprNode order_by = 6;
   // repeated LogicalExprNode filter = 7;
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index e876008e85..4417d11496 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -1664,8 +1664,6 @@ impl serde::Serialize for BuiltInWindowFunction {
             Self::Unspecified => "UNSPECIFIED",
             Self::CumeDist => "CUME_DIST",
             Self::Ntile => "NTILE",
-            Self::Lag => "LAG",
-            Self::Lead => "LEAD",
             Self::FirstValue => "FIRST_VALUE",
             Self::LastValue => "LAST_VALUE",
             Self::NthValue => "NTH_VALUE",
@@ -1683,8 +1681,6 @@ impl<'de> serde::Deserialize<'de> for 
BuiltInWindowFunction {
             "UNSPECIFIED",
             "CUME_DIST",
             "NTILE",
-            "LAG",
-            "LEAD",
             "FIRST_VALUE",
             "LAST_VALUE",
             "NTH_VALUE",
@@ -1731,8 +1727,6 @@ impl<'de> serde::Deserialize<'de> for 
BuiltInWindowFunction {
                     "UNSPECIFIED" => Ok(BuiltInWindowFunction::Unspecified),
                     "CUME_DIST" => Ok(BuiltInWindowFunction::CumeDist),
                     "NTILE" => Ok(BuiltInWindowFunction::Ntile),
-                    "LAG" => Ok(BuiltInWindowFunction::Lag),
-                    "LEAD" => Ok(BuiltInWindowFunction::Lead),
                     "FIRST_VALUE" => Ok(BuiltInWindowFunction::FirstValue),
                     "LAST_VALUE" => Ok(BuiltInWindowFunction::LastValue),
                     "NTH_VALUE" => Ok(BuiltInWindowFunction::NthValue),
@@ -21157,7 +21151,7 @@ impl serde::Serialize for WindowExprNode {
     {
         use serde::ser::SerializeStruct;
         let mut len = 0;
-        if self.expr.is_some() {
+        if !self.exprs.is_empty() {
             len += 1;
         }
         if !self.partition_by.is_empty() {
@@ -21176,8 +21170,8 @@ impl serde::Serialize for WindowExprNode {
             len += 1;
         }
         let mut struct_ser = 
serializer.serialize_struct("datafusion.WindowExprNode", len)?;
-        if let Some(v) = self.expr.as_ref() {
-            struct_ser.serialize_field("expr", v)?;
+        if !self.exprs.is_empty() {
+            struct_ser.serialize_field("exprs", &self.exprs)?;
         }
         if !self.partition_by.is_empty() {
             struct_ser.serialize_field("partitionBy", &self.partition_by)?;
@@ -21218,7 +21212,7 @@ impl<'de> serde::Deserialize<'de> for WindowExprNode {
         D: serde::Deserializer<'de>,
     {
         const FIELDS: &[&str] = &[
-            "expr",
+            "exprs",
             "partition_by",
             "partitionBy",
             "order_by",
@@ -21235,7 +21229,7 @@ impl<'de> serde::Deserialize<'de> for WindowExprNode {
 
         #[allow(clippy::enum_variant_names)]
         enum GeneratedField {
-            Expr,
+            Exprs,
             PartitionBy,
             OrderBy,
             WindowFrame,
@@ -21264,7 +21258,7 @@ impl<'de> serde::Deserialize<'de> for WindowExprNode {
                         E: serde::de::Error,
                     {
                         match value {
-                            "expr" => Ok(GeneratedField::Expr),
+                            "exprs" => Ok(GeneratedField::Exprs),
                             "partitionBy" | "partition_by" => 
Ok(GeneratedField::PartitionBy),
                             "orderBy" | "order_by" => 
Ok(GeneratedField::OrderBy),
                             "windowFrame" | "window_frame" => 
Ok(GeneratedField::WindowFrame),
@@ -21291,7 +21285,7 @@ impl<'de> serde::Deserialize<'de> for WindowExprNode {
                 where
                     V: serde::de::MapAccess<'de>,
             {
-                let mut expr__ = None;
+                let mut exprs__ = None;
                 let mut partition_by__ = None;
                 let mut order_by__ = None;
                 let mut window_frame__ = None;
@@ -21299,11 +21293,11 @@ impl<'de> serde::Deserialize<'de> for WindowExprNode {
                 let mut window_function__ = None;
                 while let Some(k) = map_.next_key()? {
                     match k {
-                        GeneratedField::Expr => {
-                            if expr__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("expr"));
+                        GeneratedField::Exprs => {
+                            if exprs__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("exprs"));
                             }
-                            expr__ = map_.next_value()?;
+                            exprs__ = Some(map_.next_value()?);
                         }
                         GeneratedField::PartitionBy => {
                             if partition_by__.is_some() {
@@ -21352,7 +21346,7 @@ impl<'de> serde::Deserialize<'de> for WindowExprNode {
                     }
                 }
                 Ok(WindowExprNode {
-                    expr: expr__,
+                    exprs: exprs__.unwrap_or_default(),
                     partition_by: partition_by__.unwrap_or_default(),
                     order_by: order_by__.unwrap_or_default(),
                     window_frame: window_frame__,
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 2aa14f7e80..d3fe031a48 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -538,7 +538,7 @@ pub mod logical_expr_node {
         TryCast(::prost::alloc::boxed::Box<super::TryCastNode>),
         /// window expressions
         #[prost(message, tag = "18")]
-        WindowExpr(::prost::alloc::boxed::Box<super::WindowExprNode>),
+        WindowExpr(super::WindowExprNode),
         /// AggregateUDF expressions
         #[prost(message, tag = "19")]
         
AggregateUdfExpr(::prost::alloc::boxed::Box<super::AggregateUdfExprNode>),
@@ -735,8 +735,8 @@ pub struct ScalarUdfExprNode {
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct WindowExprNode {
-    #[prost(message, optional, boxed, tag = "4")]
-    pub expr: 
::core::option::Option<::prost::alloc::boxed::Box<LogicalExprNode>>,
+    #[prost(message, repeated, tag = "4")]
+    pub exprs: ::prost::alloc::vec::Vec<LogicalExprNode>,
     #[prost(message, repeated, tag = "5")]
     pub partition_by: ::prost::alloc::vec::Vec<LogicalExprNode>,
     #[prost(message, repeated, tag = "6")]
@@ -1828,8 +1828,8 @@ pub enum BuiltInWindowFunction {
     ///   PERCENT_RANK = 3;
     CumeDist = 4,
     Ntile = 5,
-    Lag = 6,
-    Lead = 7,
+    /// LAG = 6;
+    /// LEAD = 7;
     FirstValue = 8,
     LastValue = 9,
     NthValue = 10,
@@ -1844,8 +1844,6 @@ impl BuiltInWindowFunction {
             Self::Unspecified => "UNSPECIFIED",
             Self::CumeDist => "CUME_DIST",
             Self::Ntile => "NTILE",
-            Self::Lag => "LAG",
-            Self::Lead => "LEAD",
             Self::FirstValue => "FIRST_VALUE",
             Self::LastValue => "LAST_VALUE",
             Self::NthValue => "NTH_VALUE",
@@ -1857,8 +1855,6 @@ impl BuiltInWindowFunction {
             "UNSPECIFIED" => Some(Self::Unspecified),
             "CUME_DIST" => Some(Self::CumeDist),
             "NTILE" => Some(Self::Ntile),
-            "LAG" => Some(Self::Lag),
-            "LEAD" => Some(Self::Lead),
             "FIRST_VALUE" => Some(Self::FirstValue),
             "LAST_VALUE" => Some(Self::LastValue),
             "NTH_VALUE" => Some(Self::NthValue),
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs 
b/datafusion/proto/src/logical_plan/from_proto.rs
index 32e1b68203..20d007048a 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -142,8 +142,6 @@ impl From<protobuf::BuiltInWindowFunction> for 
BuiltInWindowFunction {
     fn from(built_in_function: protobuf::BuiltInWindowFunction) -> Self {
         match built_in_function {
             protobuf::BuiltInWindowFunction::Unspecified => todo!(),
-            protobuf::BuiltInWindowFunction::Lag => Self::Lag,
-            protobuf::BuiltInWindowFunction::Lead => Self::Lead,
             protobuf::BuiltInWindowFunction::FirstValue => Self::FirstValue,
             protobuf::BuiltInWindowFunction::CumeDist => Self::CumeDist,
             protobuf::BuiltInWindowFunction::Ntile => Self::Ntile,
@@ -286,10 +284,7 @@ pub fn parse_expr(
                         .map_err(|_| Error::unknown("BuiltInWindowFunction", 
*i))?
                         .into();
 
-                    let args =
-                        parse_optional_expr(expr.expr.as_deref(), registry, 
codec)?
-                            .map(|e| vec![e])
-                            .unwrap_or_else(Vec::new);
+                    let args = parse_exprs(&expr.exprs, registry, codec)?;
 
                     Expr::WindowFunction(WindowFunction::new(
                         expr::WindowFunctionDefinition::BuiltInWindowFunction(
@@ -309,10 +304,7 @@ pub fn parse_expr(
                         None => registry.udaf(udaf_name)?,
                     };
 
-                    let args =
-                        parse_optional_expr(expr.expr.as_deref(), registry, 
codec)?
-                            .map(|e| vec![e])
-                            .unwrap_or_else(Vec::new);
+                    let args = parse_exprs(&expr.exprs, registry, codec)?;
                     Expr::WindowFunction(WindowFunction::new(
                         
expr::WindowFunctionDefinition::AggregateUDF(udaf_function),
                         args,
@@ -329,10 +321,7 @@ pub fn parse_expr(
                         None => registry.udwf(udwf_name)?,
                     };
 
-                    let args =
-                        parse_optional_expr(expr.expr.as_deref(), registry, 
codec)?
-                            .map(|e| vec![e])
-                            .unwrap_or_else(Vec::new);
+                    let args = parse_exprs(&expr.exprs, registry, codec)?;
                     Expr::WindowFunction(WindowFunction::new(
                         
expr::WindowFunctionDefinition::WindowUDF(udwf_function),
                         args,
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs 
b/datafusion/proto/src/logical_plan/to_proto.rs
index 07823b422f..15fec3a8b2 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -119,8 +119,6 @@ impl From<&BuiltInWindowFunction> for 
protobuf::BuiltInWindowFunction {
             BuiltInWindowFunction::NthValue => Self::NthValue,
             BuiltInWindowFunction::Ntile => Self::Ntile,
             BuiltInWindowFunction::CumeDist => Self::CumeDist,
-            BuiltInWindowFunction::Lag => Self::Lag,
-            BuiltInWindowFunction::Lead => Self::Lead,
         }
     }
 }
@@ -333,25 +331,19 @@ pub fn serialize_expr(
                     )
                 }
             };
-            let arg_expr: Option<Box<protobuf::LogicalExprNode>> = if 
!args.is_empty() {
-                let arg = &args[0];
-                Some(Box::new(serialize_expr(arg, codec)?))
-            } else {
-                None
-            };
             let partition_by = serialize_exprs(partition_by, codec)?;
             let order_by = serialize_sorts(order_by, codec)?;
 
             let window_frame: Option<protobuf::WindowFrame> =
                 Some(window_frame.try_into()?);
-            let window_expr = Box::new(protobuf::WindowExprNode {
-                expr: arg_expr,
+            let window_expr = protobuf::WindowExprNode {
+                exprs: serialize_exprs(args, codec)?,
                 window_function: Some(window_function),
                 partition_by,
                 order_by,
                 window_frame,
                 fun_definition,
-            });
+            };
             protobuf::LogicalExprNode {
                 expr_type: Some(ExprType::WindowExpr(window_expr)),
             }
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index 85d4fe8a16..6072baca68 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -25,7 +25,6 @@ use datafusion::physical_expr::{PhysicalSortExpr, 
ScalarFunctionExpr};
 use datafusion::physical_plan::expressions::{
     BinaryExpr, CaseExpr, CastExpr, Column, CumeDist, InListExpr, 
IsNotNullExpr,
     IsNullExpr, Literal, NegativeExpr, NotExpr, NthValue, Ntile, TryCastExpr,
-    WindowShift,
 };
 use datafusion::physical_plan::udaf::AggregateFunctionExpr;
 use datafusion::physical_plan::windows::{BuiltInWindowExpr, 
PlainAggregateWindowExpr};
@@ -119,25 +118,6 @@ pub fn serialize_physical_window_expr(
                 )))),
             );
             protobuf::BuiltInWindowFunction::Ntile
-        } else if let Some(window_shift_expr) =
-            built_in_fn_expr.downcast_ref::<WindowShift>()
-        {
-            args.insert(
-                1,
-                
Arc::new(Literal::new(datafusion_common::ScalarValue::Int64(Some(
-                    window_shift_expr.get_shift_offset(),
-                )))),
-            );
-            args.insert(
-                2,
-                Arc::new(Literal::new(window_shift_expr.get_default_value())),
-            );
-
-            if window_shift_expr.get_shift_offset() >= 0 {
-                protobuf::BuiltInWindowFunction::Lag
-            } else {
-                protobuf::BuiltInWindowFunction::Lead
-            }
         } else if let Some(nth_value_expr) = 
built_in_fn_expr.downcast_ref::<NthValue>() {
             match nth_value_expr.get_kind() {
                 NthValueKind::First => 
protobuf::BuiltInWindowFunction::FirstValue,
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index ffa8fc1eef..c017395d97 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -47,8 +47,10 @@ use datafusion::functions_aggregate::expr_fn::{
 };
 use datafusion::functions_aggregate::min_max::max_udaf;
 use datafusion::functions_nested::map::map;
-use datafusion::functions_window::rank::{dense_rank, percent_rank, rank, 
rank_udwf};
-use datafusion::functions_window::row_number::row_number;
+use datafusion::functions_window::expr_fn::{
+    dense_rank, lag, lead, percent_rank, rank, row_number,
+};
+use datafusion::functions_window::rank::rank_udwf;
 use datafusion::prelude::*;
 use datafusion::test_util::{TestTableFactory, TestTableProvider};
 use datafusion_common::config::TableOptions;
@@ -942,6 +944,12 @@ async fn roundtrip_expr_api() -> Result<()> {
         rank(),
         dense_rank(),
         percent_rank(),
+        lead(col("b"), None, None),
+        lead(col("b"), Some(2), None),
+        lead(col("b"), Some(2), Some(ScalarValue::from(100))),
+        lag(col("b"), None, None),
+        lag(col("b"), Some(2), None),
+        lag(col("b"), Some(2), Some(ScalarValue::from(100))),
         nth_value(col("b"), 1, vec![]),
         nth_value(
             col("b"),
diff --git a/datafusion/sqllogictest/test_files/union.slt 
b/datafusion/sqllogictest/test_files/union.slt
index a3d0ff4383..fb7afdda2e 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -503,9 +503,9 @@ logical_plan
 12)----Projection: Int64(1) AS cnt
 13)------Limit: skip=0, fetch=3
 14)--------EmptyRelation
-15)----Projection: LEAD(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING AS cnt
+15)----Projection: lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING AS cnt
 16)------Limit: skip=0, fetch=3
-17)--------WindowAggr: windowExpr=[[LEAD(b.c1, Int64(1)) ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
+17)--------WindowAggr: windowExpr=[[lead(b.c1, Int64(1)) ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
 18)----------SubqueryAlias: b
 19)------------Projection: Int64(1) AS c1
 20)--------------EmptyRelation
@@ -528,8 +528,8 @@ physical_plan
 16)------------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c13], has_header=true
 17)------ProjectionExec: expr=[1 as cnt]
 18)--------PlaceholderRowExec
-19)------ProjectionExec: expr=[LEAD(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt]
-20)--------BoundedWindowAggExec: wdw=[LEAD(b.c1,Int64(1)) ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: 
"LEAD(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), 
end_bound: Following(UInt64(NULL)), is_causal: false }], mode=[Sorted]
+19)------ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt]
+20)--------BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: 
"lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), 
end_bound: Following(UInt64(NULL)), is_causal: false }], mode=[Sorted]
 21)----------ProjectionExec: expr=[1 as c1]
 22)------------PlaceholderRowExec
 
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index 1b612f9212..b3f2786d3d 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -1376,16 +1376,16 @@ EXPLAIN SELECT
     LIMIT 5
 ----
 logical_plan
-01)Projection: aggregate_test_100.c9, first_value(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING AS fv1, first_value(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING AS fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS lag1, LAG(aggregate_test_100.c9,Int64( [...]
+01)Projection: aggregate_test_100.c9, first_value(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING AS fv1, first_value(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING AS fv2, lag(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS lag1, lag(aggregate_test_100.c9,Int64( [...]
 02)--Limit: skip=0, fetch=5
-03)----WindowAggr: windowExpr=[[first_value(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING, LAG(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW, LEAD(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW]]
-04)------WindowAggr: windowExpr=[[first_value(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING, LAG(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 
FOLLOWING, LEAD(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 
FOLLOWING]]
+03)----WindowAggr: windowExpr=[[first_value(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING, lag(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW, lead(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW]]
+04)------WindowAggr: windowExpr=[[first_value(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING, lag(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 
FOLLOWING, lead(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 
FOLLOWING]]
 05)--------TableScan: aggregate_test_100 projection=[c9]
 physical_plan
-01)ProjectionExec: expr=[c9@0 as c9, first_value(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@4 as fv1, first_value(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@1 as fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@5 as lag1, LAG(aggregate_test_100.c9,I [...]
+01)ProjectionExec: expr=[c9@0 as c9, first_value(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@4 as fv1, first_value(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@1 as fv2, lag(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@5 as lag1, lag(aggregate_test_100.c9,I [...]
 02)--GlobalLimitExec: skip=0, fetch=5
-03)----BoundedWindowAggExec: wdw=[first_value(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING: Ok(Field { name: "first_value(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(5)), end_bound: Following(UInt64(1)), [...]
-04)------BoundedWindowAggExec: wdw=[first_value(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING: Ok(Field { name: "first_value(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(1)), end_bound: Following(UInt6 [...]
+03)----BoundedWindowAggExec: wdw=[first_value(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING: Ok(Field { name: "first_value(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(5)), end_bound: Following(UInt64(1)), [...]
+04)------BoundedWindowAggExec: wdw=[first_value(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING: Ok(Field { name: "first_value(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(1)), end_bound: Following(UInt6 [...]
 05)--------SortExec: expr=[c9@0 DESC], preserve_partitioning=[false]
 06)----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], 
has_header=true
 
@@ -2636,15 +2636,15 @@ EXPLAIN SELECT
 ----
 logical_plan
 01)Sort: annotated_data_finite.ts DESC NULLS FIRST, fetch=5
-02)--Projection: annotated_data_finite.ts, 
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv1, 
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv2, 
last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lv1, 
last_value(annot [...]
-03)----WindowAggr: windowExpr=[[first_value(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING, first_value(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 
FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER  [...]
-04)------WindowAggr: windowExpr=[[first_value(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING, first_value(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 
FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING, last_value(annotated_data_finite.inc_col [...]
+02)--Projection: annotated_data_finite.ts, 
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv1, 
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv2, 
last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lv1, 
last_value(annot [...]
+03)----WindowAggr: windowExpr=[[first_value(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING, first_value(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 
FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER  [...]
+04)------WindowAggr: windowExpr=[[first_value(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING, first_value(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 
FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING, last_value(annotated_data_finite.inc_col [...]
 05)--------TableScan: annotated_data_finite projection=[ts, inc_col]
 physical_plan
 01)SortExec: TopK(fetch=5), expr=[ts@0 DESC], preserve_partitioning=[false]
-02)--ProjectionExec: expr=[ts@0 as ts, 
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1, 
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2, 
last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1, 
last_value( [...]
-03)----BoundedWindowAggExec: wdw=[first_value(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(10)), end_bo [...]
-04)------BoundedWindowAggExec: wdw=[first_value(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(1)), e [...]
+02)--ProjectionExec: expr=[ts@0 as ts, 
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1, 
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2, 
last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1, 
last_value( [...]
+03)----BoundedWindowAggExec: wdw=[first_value(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(10)), end_bo [...]
+04)------BoundedWindowAggExec: wdw=[first_value(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(1)), e [...]
 05)--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
 query IIIIIIIIIIIIIIIIIIIIIIIII
@@ -4971,6 +4971,26 @@ SELECT LAG(NULL, 1, false) OVER () FROM t1;
 ----
 false
 
+query B
+SELECT LEAD(NULL, 0, true) OVER () FROM t1;
+----
+NULL
+
+query B
+SELECT LAG(NULL, 0, true) OVER () FROM t1;
+----
+NULL
+
+query B
+SELECT LEAD(NULL, 1, true) OVER () FROM t1;
+----
+true
+
+query B
+SELECT LAG(NULL, 1, true) OVER () FROM t1;
+----
+true
+
 statement ok
 insert into t1 values (2);
 
@@ -4986,6 +5006,18 @@ SELECT LAG(NULL, 1, false) OVER () FROM t1;
 false
 NULL
 
+query B
+SELECT LEAD(NULL, 1, true) OVER () FROM t1;
+----
+NULL
+true
+
+query B
+SELECT LAG(NULL, 1, true) OVER () FROM t1;
+----
+true
+NULL
+
 statement ok
 DROP TABLE t1;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to