This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 939ef9e9bc Convert `rank` / `dense_rank` and `percent_rank` builtin 
functions to UDWF (#12718)
939ef9e9bc is described below

commit 939ef9e9bcf95be4215569609d189f2a94280d86
Author: Jagdish Parihar <[email protected]>
AuthorDate: Fri Oct 11 01:58:35 2024 +0530

    Convert `rank` / `dense_rank` and `percent_rank` builtin functions to UDWF 
(#12718)
    
    * wip: converting rank builtin function to UDWF
    
    * commented BuiltInWindowFunction in datafusion.proto and fixed issue 
related to Datafusion window function
    
    * implemented rank.rs, percent_rank.rs and dense_rank.rs in datafusion 
functions-window
    
    * removed a test from built in window function test for percent_rank and 
updated pbson fields
    
    * removed unnecessary code
    
    * added window_functions field to the MockSessionState
    
    * updated rank, percent_rank and dense_rank udwf to use macros
    
    * wip: fix rank functionality in sql integration
    
    * fixed rank udwf not found issue in sql_integration.rs
    
    * evaluating rank, percent_rank and dense_rank udwf with evaluate_with_rank 
function
    
    * fixed rank projection test
    
    * wip: fixing the percent_rank() documentation
    
    * fixed the docs error issue
    
    * fixed data type of the percent_rank udwf
    
    * updated prost.rs file
    
    * updated test and documentation
    
    * Fix logical conflicts
    
    * tweak module documentation
    
    ---------
    
    Co-authored-by: jatin <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/core/tests/fuzz_cases/window_fuzz.rs    |  23 +-
 datafusion/expr/src/built_in_window_function.rs    |  25 +-
 datafusion/expr/src/expr.rs                        |  12 -
 datafusion/expr/src/expr_fn.rs                     |   4 +-
 datafusion/expr/src/window_function.rs             |  21 --
 datafusion/functions-window/src/dense_rank.rs      | 205 ++++++++++++++
 datafusion/functions-window/src/lib.rs             |  13 +-
 datafusion/functions-window/src/percent_rank.rs    | 192 +++++++++++++
 datafusion/functions-window/src/rank.rs            | 220 +++++++++++++++
 datafusion/functions-window/src/row_number.rs      |   2 +-
 datafusion/physical-expr/src/expressions/mod.rs    |   1 -
 datafusion/physical-expr/src/window/mod.rs         |   1 -
 datafusion/physical-expr/src/window/rank.rs        | 313 ---------------------
 datafusion/physical-expr/src/window/window_expr.rs |  13 -
 datafusion/physical-plan/src/windows/mod.rs        |   8 +-
 datafusion/proto/proto/datafusion.proto            |   8 +-
 datafusion/proto/src/generated/pbjson.rs           |   9 -
 datafusion/proto/src/generated/prost.rs            |  12 +-
 datafusion/proto/src/logical_plan/from_proto.rs    |   3 -
 datafusion/proto/src/logical_plan/to_proto.rs      |   3 -
 datafusion/proto/src/physical_plan/to_proto.rs     |  13 +-
 .../proto/tests/cases/roundtrip_logical_plan.rs    |  18 +-
 datafusion/sql/tests/common/mod.rs                 |  11 +-
 datafusion/sql/tests/sql_integration.rs            |   6 +-
 .../sqllogictest/test_files/subquery_sort.slt      |   8 +-
 datafusion/sqllogictest/test_files/window.slt      |   8 +-
 datafusion/substrait/tests/cases/serialize.rs      |   4 +-
 27 files changed, 687 insertions(+), 469 deletions(-)

diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
index a6c2cf700c..b9881c9f23 100644
--- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
@@ -45,6 +45,8 @@ use datafusion_physical_expr::{PhysicalExpr, 
PhysicalSortExpr};
 use test_utils::add_empty_batches;
 
 use datafusion::functions_window::row_number::row_number_udwf;
+use datafusion_functions_window::dense_rank::dense_rank_udwf;
+use datafusion_functions_window::rank::rank_udwf;
 use hashbrown::HashMap;
 use rand::distributions::Alphanumeric;
 use rand::rngs::StdRng;
@@ -224,9 +226,9 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
         // )
         (
             // Window function
-            
WindowFunctionDefinition::BuiltInWindowFunction(BuiltInWindowFunction::Rank),
+            WindowFunctionDefinition::WindowUDF(rank_udwf()),
             // its name
-            "RANK",
+            "rank",
             // no argument
             vec![],
             // Expected causality, for None cases causality will be determined 
from window frame boundaries
@@ -238,11 +240,9 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
         // )
         (
             // Window function
-            WindowFunctionDefinition::BuiltInWindowFunction(
-                BuiltInWindowFunction::DenseRank,
-            ),
+            WindowFunctionDefinition::WindowUDF(dense_rank_udwf()),
             // its name
-            "DENSE_RANK",
+            "dense_rank",
             // no argument
             vec![],
             // Expected causality, for None cases causality will be determined 
from window frame boundaries
@@ -382,19 +382,12 @@ fn get_random_function(
         );
         window_fn_map.insert(
             "rank",
-            (
-                WindowFunctionDefinition::BuiltInWindowFunction(
-                    BuiltInWindowFunction::Rank,
-                ),
-                vec![],
-            ),
+            (WindowFunctionDefinition::WindowUDF(rank_udwf()), vec![]),
         );
         window_fn_map.insert(
             "dense_rank",
             (
-                WindowFunctionDefinition::BuiltInWindowFunction(
-                    BuiltInWindowFunction::DenseRank,
-                ),
+                WindowFunctionDefinition::WindowUDF(dense_rank_udwf()),
                 vec![],
             ),
         );
diff --git a/datafusion/expr/src/built_in_window_function.rs 
b/datafusion/expr/src/built_in_window_function.rs
index b136d6cace..117ff08253 100644
--- a/datafusion/expr/src/built_in_window_function.rs
+++ b/datafusion/expr/src/built_in_window_function.rs
@@ -40,12 +40,6 @@ impl fmt::Display for BuiltInWindowFunction {
 /// [window function]: https://en.wikipedia.org/wiki/Window_function_(SQL)
 #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash, EnumIter)]
 pub enum BuiltInWindowFunction {
-    /// rank of the current row with gaps; same as row_number of its first peer
-    Rank,
-    /// rank of the current row without gaps; this function counts peer groups
-    DenseRank,
-    /// relative rank of the current row: (rank - 1) / (total rows - 1)
-    PercentRank,
     /// relative rank of the current row: (number of rows preceding or peer 
with current row) / (total rows)
     CumeDist,
     /// integer ranging from 1 to the argument value, dividing the partition 
as equally as possible
@@ -72,9 +66,6 @@ impl BuiltInWindowFunction {
     pub fn name(&self) -> &str {
         use BuiltInWindowFunction::*;
         match self {
-            Rank => "RANK",
-            DenseRank => "DENSE_RANK",
-            PercentRank => "PERCENT_RANK",
             CumeDist => "CUME_DIST",
             Ntile => "NTILE",
             Lag => "LAG",
@@ -90,9 +81,6 @@ impl FromStr for BuiltInWindowFunction {
     type Err = DataFusionError;
     fn from_str(name: &str) -> Result<BuiltInWindowFunction> {
         Ok(match name.to_uppercase().as_str() {
-            "RANK" => BuiltInWindowFunction::Rank,
-            "DENSE_RANK" => BuiltInWindowFunction::DenseRank,
-            "PERCENT_RANK" => BuiltInWindowFunction::PercentRank,
             "CUME_DIST" => BuiltInWindowFunction::CumeDist,
             "NTILE" => BuiltInWindowFunction::Ntile,
             "LAG" => BuiltInWindowFunction::Lag,
@@ -127,12 +115,8 @@ impl BuiltInWindowFunction {
             })?;
 
         match self {
-            BuiltInWindowFunction::Rank
-            | BuiltInWindowFunction::DenseRank
-            | BuiltInWindowFunction::Ntile => Ok(DataType::UInt64),
-            BuiltInWindowFunction::PercentRank | 
BuiltInWindowFunction::CumeDist => {
-                Ok(DataType::Float64)
-            }
+            BuiltInWindowFunction::Ntile => Ok(DataType::UInt64),
+            BuiltInWindowFunction::CumeDist => Ok(DataType::Float64),
             BuiltInWindowFunction::Lag
             | BuiltInWindowFunction::Lead
             | BuiltInWindowFunction::FirstValue
@@ -145,10 +129,7 @@ impl BuiltInWindowFunction {
     pub fn signature(&self) -> Signature {
         // note: the physical expression must accept the type returned by this 
function or the execution panics.
         match self {
-            BuiltInWindowFunction::Rank
-            | BuiltInWindowFunction::DenseRank
-            | BuiltInWindowFunction::PercentRank
-            | BuiltInWindowFunction::CumeDist => Signature::any(0, 
Volatility::Immutable),
+            BuiltInWindowFunction::CumeDist => Signature::any(0, 
Volatility::Immutable),
             BuiltInWindowFunction::Lag | BuiltInWindowFunction::Lead => {
                 Signature::one_of(
                     vec![
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 02a2edb980..723433f573 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -2598,15 +2598,6 @@ mod test {
         Ok(())
     }
 
-    #[test]
-    fn test_percent_rank_return_type() -> Result<()> {
-        let fun = find_df_window_func("percent_rank").unwrap();
-        let observed = fun.return_type(&[], &[], "")?;
-        assert_eq!(DataType::Float64, observed);
-
-        Ok(())
-    }
-
     #[test]
     fn test_cume_dist_return_type() -> Result<()> {
         let fun = find_df_window_func("cume_dist").unwrap();
@@ -2628,9 +2619,6 @@ mod test {
     #[test]
     fn test_window_function_case_insensitive() -> Result<()> {
         let names = vec![
-            "rank",
-            "dense_rank",
-            "percent_rank",
             "cume_dist",
             "ntile",
             "lag",
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index ea053b9fb1..7fd4e64e0e 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -701,7 +701,6 @@ pub fn interval_month_day_nano_lit(value: &str) -> Expr {
 /// # use datafusion_expr::test::function_stub::count;
 /// # use sqlparser::ast::NullTreatment;
 /// # use datafusion_expr::{ExprFunctionExt, lit, Expr, col};
-/// # use datafusion_expr::window_function::percent_rank;
 /// # // first_value is an aggregate function in another crate
 /// # fn first_value(_arg: Expr) -> Expr {
 /// unimplemented!() }
@@ -721,6 +720,9 @@ pub fn interval_month_day_nano_lit(value: &str) -> Expr {
 /// // Create a window expression for percent rank partitioned on column a
 /// // equivalent to:
 /// // `PERCENT_RANK() OVER (PARTITION BY a ORDER BY b ASC NULLS LAST IGNORE 
NULLS)`
+/// // percent_rank is an udwf function in another crate
+/// # fn percent_rank() -> Expr {
+/// unimplemented!() }
 /// let window = percent_rank()
 ///     .partition_by(vec![col("a")])
 ///     .order_by(vec![col("b").sort(true, true)])
diff --git a/datafusion/expr/src/window_function.rs 
b/datafusion/expr/src/window_function.rs
index a80718147c..7ac6fb7d16 100644
--- a/datafusion/expr/src/window_function.rs
+++ b/datafusion/expr/src/window_function.rs
@@ -19,27 +19,6 @@ use datafusion_common::ScalarValue;
 
 use crate::{expr::WindowFunction, BuiltInWindowFunction, Expr, Literal};
 
-/// Create an expression to represent the `rank` window function
-pub fn rank() -> Expr {
-    Expr::WindowFunction(WindowFunction::new(BuiltInWindowFunction::Rank, 
vec![]))
-}
-
-/// Create an expression to represent the `dense_rank` window function
-pub fn dense_rank() -> Expr {
-    Expr::WindowFunction(WindowFunction::new(
-        BuiltInWindowFunction::DenseRank,
-        vec![],
-    ))
-}
-
-/// Create an expression to represent the `percent_rank` window function
-pub fn percent_rank() -> Expr {
-    Expr::WindowFunction(WindowFunction::new(
-        BuiltInWindowFunction::PercentRank,
-        vec![],
-    ))
-}
-
 /// Create an expression to represent the `cume_dist` window function
 pub fn cume_dist() -> Expr {
     Expr::WindowFunction(WindowFunction::new(BuiltInWindowFunction::CumeDist, 
vec![]))
diff --git a/datafusion/functions-window/src/dense_rank.rs 
b/datafusion/functions-window/src/dense_rank.rs
new file mode 100644
index 0000000000..c969a7c46f
--- /dev/null
+++ b/datafusion/functions-window/src/dense_rank.rs
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! `dense_rank` window function implementation
+
+use std::any::Any;
+use std::fmt::Debug;
+use std::iter;
+use std::ops::Range;
+use std::sync::Arc;
+
+use crate::define_udwf_and_expr;
+use crate::rank::RankState;
+use datafusion_common::arrow::array::ArrayRef;
+use datafusion_common::arrow::array::UInt64Array;
+use datafusion_common::arrow::compute::SortOptions;
+use datafusion_common::arrow::datatypes::DataType;
+use datafusion_common::arrow::datatypes::Field;
+use datafusion_common::utils::get_row_at_idx;
+use datafusion_common::{Result, ScalarValue};
+use datafusion_expr::{PartitionEvaluator, Signature, Volatility, 
WindowUDFImpl};
+use datafusion_functions_window_common::field;
+use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
+use field::WindowUDFFieldArgs;
+
+define_udwf_and_expr!(
+    DenseRank,
+    dense_rank,
+    "Returns rank of the current row without gaps. This function counts peer 
groups"
+);
+
+/// dense_rank expression
+#[derive(Debug)]
+pub struct DenseRank {
+    signature: Signature,
+}
+
+impl DenseRank {
+    /// Create a new `dense_rank` function
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::any(0, Volatility::Immutable),
+        }
+    }
+}
+
+impl Default for DenseRank {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl WindowUDFImpl for DenseRank {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "dense_rank"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn partition_evaluator(
+        &self,
+        _partition_evaluator_args: PartitionEvaluatorArgs,
+    ) -> Result<Box<dyn PartitionEvaluator>> {
+        Ok(Box::<DenseRankEvaluator>::default())
+    }
+
+    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+        Ok(Field::new(field_args.name(), DataType::UInt64, false))
+    }
+
+    fn sort_options(&self) -> Option<SortOptions> {
+        Some(SortOptions {
+            descending: false,
+            nulls_first: false,
+        })
+    }
+}
+
+/// State for the `dense_rank` built-in window function.
+#[derive(Debug, Default)]
+struct DenseRankEvaluator {
+    state: RankState,
+}
+
+impl PartitionEvaluator for DenseRankEvaluator {
+    fn is_causal(&self) -> bool {
+        // The dense_rank function doesn't need "future" values to emit 
results:
+        true
+    }
+
+    fn evaluate(
+        &mut self,
+        values: &[ArrayRef],
+        range: &Range<usize>,
+    ) -> Result<ScalarValue> {
+        let row_idx = range.start;
+        // There is no argument, values are order by column values (where rank 
is calculated)
+        let range_columns = values;
+        let last_rank_data = get_row_at_idx(range_columns, row_idx)?;
+        let new_rank_encountered =
+            if let Some(state_last_rank_data) = &self.state.last_rank_data {
+                // if rank data changes, new rank is encountered
+                state_last_rank_data != &last_rank_data
+            } else {
+                // First rank seen
+                true
+            };
+
+        if new_rank_encountered {
+            self.state.last_rank_data = Some(last_rank_data);
+            self.state.last_rank_boundary += self.state.current_group_count;
+            self.state.current_group_count = 1;
+            self.state.n_rank += 1;
+        } else {
+            // data is still in the same rank
+            self.state.current_group_count += 1;
+        }
+
+        Ok(ScalarValue::UInt64(Some(self.state.n_rank as u64)))
+    }
+
+    fn evaluate_all_with_rank(
+        &self,
+        _num_rows: usize,
+        ranks_in_partition: &[Range<usize>],
+    ) -> Result<ArrayRef> {
+        let result = Arc::new(UInt64Array::from_iter_values(
+            ranks_in_partition
+                .iter()
+                .zip(1u64..)
+                .flat_map(|(range, rank)| {
+                    let len = range.end - range.start;
+                    iter::repeat(rank).take(len)
+                }),
+        ));
+
+        Ok(result)
+    }
+
+    fn supports_bounded_execution(&self) -> bool {
+        true
+    }
+
+    fn include_rank(&self) -> bool {
+        true
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use datafusion_common::cast::as_uint64_array;
+
+    fn test_with_rank(expr: &DenseRank, expected: Vec<u64>) -> Result<()> {
+        test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected)
+    }
+
+    #[allow(clippy::single_range_in_vec_init)]
+    fn test_without_rank(expr: &DenseRank, expected: Vec<u64>) -> Result<()> {
+        test_i32_result(expr, vec![0..8], expected)
+    }
+
+    fn test_i32_result(
+        expr: &DenseRank,
+        ranks: Vec<Range<usize>>,
+        expected: Vec<u64>,
+    ) -> Result<()> {
+        let args = PartitionEvaluatorArgs::default();
+        let result = expr
+            .partition_evaluator(args)?
+            .evaluate_all_with_rank(8, &ranks)?;
+        let result = as_uint64_array(&result)?;
+        let result = result.values();
+        assert_eq!(expected, *result);
+        Ok(())
+    }
+
+    #[test]
+    fn test_dense_rank() -> Result<()> {
+        let r = DenseRank::default();
+        test_without_rank(&r, vec![1; 8])?;
+        test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?;
+        Ok(())
+    }
+}
diff --git a/datafusion/functions-window/src/lib.rs 
b/datafusion/functions-window/src/lib.rs
index 6e98bb0914..b727809908 100644
--- a/datafusion/functions-window/src/lib.rs
+++ b/datafusion/functions-window/src/lib.rs
@@ -31,16 +31,27 @@ use datafusion_expr::WindowUDF;
 
 #[macro_use]
 pub mod macros;
+pub mod dense_rank;
+pub mod percent_rank;
+pub mod rank;
 pub mod row_number;
 
 /// Fluent-style API for creating `Expr`s
 pub mod expr_fn {
+    pub use super::dense_rank::dense_rank;
+    pub use super::percent_rank::percent_rank;
+    pub use super::rank::rank;
     pub use super::row_number::row_number;
 }
 
 /// Returns all default window functions
 pub fn all_default_window_functions() -> Vec<Arc<WindowUDF>> {
-    vec![row_number::row_number_udwf()]
+    vec![
+        row_number::row_number_udwf(),
+        rank::rank_udwf(),
+        dense_rank::dense_rank_udwf(),
+        percent_rank::percent_rank_udwf(),
+    ]
 }
 /// Registers all enabled packages with a [`FunctionRegistry`]
 pub fn register_all(
diff --git a/datafusion/functions-window/src/percent_rank.rs 
b/datafusion/functions-window/src/percent_rank.rs
new file mode 100644
index 0000000000..147959f69b
--- /dev/null
+++ b/datafusion/functions-window/src/percent_rank.rs
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! `percent_rank` window function implementation
+
+use std::any::Any;
+use std::fmt::Debug;
+use std::iter;
+use std::ops::Range;
+use std::sync::Arc;
+
+use crate::define_udwf_and_expr;
+use datafusion_common::arrow::array::ArrayRef;
+use datafusion_common::arrow::array::Float64Array;
+use datafusion_common::arrow::compute::SortOptions;
+use datafusion_common::arrow::datatypes::DataType;
+use datafusion_common::arrow::datatypes::Field;
+use datafusion_common::{exec_err, Result, ScalarValue};
+use datafusion_expr::{PartitionEvaluator, Signature, Volatility, 
WindowUDFImpl};
+use datafusion_functions_window_common::field;
+use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
+use field::WindowUDFFieldArgs;
+
+define_udwf_and_expr!(
+    PercentRank,
+    percent_rank,
+    "Returns the relative rank of the current row: (rank - 1) / (total rows - 
1)"
+);
+
+/// percent_rank expression
+#[derive(Debug)]
+pub struct PercentRank {
+    signature: Signature,
+}
+
+impl PercentRank {
+    /// Create a new `percent_rank` function
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::any(0, Volatility::Immutable),
+        }
+    }
+}
+
+impl Default for PercentRank {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl WindowUDFImpl for PercentRank {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "percent_rank"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn partition_evaluator(
+        &self,
+        _partition_evaluator_args: PartitionEvaluatorArgs,
+    ) -> Result<Box<dyn PartitionEvaluator>> {
+        Ok(Box::<PercentRankEvaluator>::default())
+    }
+
+    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+        Ok(Field::new(field_args.name(), DataType::Float64, false))
+    }
+
+    fn sort_options(&self) -> Option<SortOptions> {
+        Some(SortOptions {
+            descending: false,
+            nulls_first: false,
+        })
+    }
+}
+
+/// State for the `percent_rank` built-in window function.
+#[derive(Debug, Default)]
+struct PercentRankEvaluator {}
+
+impl PartitionEvaluator for PercentRankEvaluator {
+    fn is_causal(&self) -> bool {
+        // The percent_rank function doesn't need "future" values to emit 
results:
+        false
+    }
+
+    fn evaluate(
+        &mut self,
+        _values: &[ArrayRef],
+        _range: &Range<usize>,
+    ) -> Result<ScalarValue> {
+        exec_err!("Can not execute PERCENT_RANK in a streaming fashion")
+    }
+
+    fn evaluate_all_with_rank(
+        &self,
+        num_rows: usize,
+        ranks_in_partition: &[Range<usize>],
+    ) -> Result<ArrayRef> {
+        let denominator = num_rows as f64;
+        let result =
+        // Returns the relative rank of the current row, that is (rank - 1) / 
(total partition rows - 1). The value thus ranges from 0 to 1 inclusive.
+        Arc::new(Float64Array::from_iter_values(
+            ranks_in_partition
+                .iter()
+                .scan(0_u64, |acc, range| {
+                    let len = range.end - range.start;
+                    let value = (*acc as f64) / (denominator - 1.0).max(1.0);
+                    let result = iter::repeat(value).take(len);
+                    *acc += len as u64;
+                    Some(result)
+                })
+                .flatten(),
+        ));
+
+        Ok(result)
+    }
+
+    fn supports_bounded_execution(&self) -> bool {
+        false
+    }
+
+    fn include_rank(&self) -> bool {
+        true
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use datafusion_common::cast::as_float64_array;
+
+    fn test_f64_result(
+        expr: &PercentRank,
+        num_rows: usize,
+        ranks: Vec<Range<usize>>,
+        expected: Vec<f64>,
+    ) -> Result<()> {
+        let args = PartitionEvaluatorArgs::default();
+        let result = expr
+            .partition_evaluator(args)?
+            .evaluate_all_with_rank(num_rows, &ranks)?;
+        let result = as_float64_array(&result)?;
+        let result = result.values();
+        assert_eq!(expected, *result);
+        Ok(())
+    }
+
+    #[test]
+    #[allow(clippy::single_range_in_vec_init)]
+    fn test_percent_rank() -> Result<()> {
+        let r = PercentRank::default();
+
+        // empty case
+        let expected = vec![0.0; 0];
+        test_f64_result(&r, 0, vec![0..0; 0], expected)?;
+
+        // singleton case
+        let expected = vec![0.0];
+        test_f64_result(&r, 1, vec![0..1], expected)?;
+
+        // uniform case
+        let expected = vec![0.0; 7];
+        test_f64_result(&r, 7, vec![0..7], expected)?;
+
+        // non-trivial case
+        let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5];
+        test_f64_result(&r, 7, vec![0..3, 3..7], expected)?;
+
+        Ok(())
+    }
+}
diff --git a/datafusion/functions-window/src/rank.rs 
b/datafusion/functions-window/src/rank.rs
new file mode 100644
index 0000000000..c52dec9061
--- /dev/null
+++ b/datafusion/functions-window/src/rank.rs
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! `rank` window function implementation
+
+use std::any::Any;
+use std::fmt::Debug;
+use std::iter;
+use std::ops::Range;
+use std::sync::Arc;
+
+use crate::define_udwf_and_expr;
+use datafusion_common::arrow::array::ArrayRef;
+use datafusion_common::arrow::array::UInt64Array;
+use datafusion_common::arrow::compute::SortOptions;
+use datafusion_common::arrow::datatypes::DataType;
+use datafusion_common::arrow::datatypes::Field;
+use datafusion_common::utils::get_row_at_idx;
+use datafusion_common::{Result, ScalarValue};
+use datafusion_expr::{PartitionEvaluator, Signature, Volatility, 
WindowUDFImpl};
+use datafusion_functions_window_common::field;
+use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
+use field::WindowUDFFieldArgs;
+
+define_udwf_and_expr!(
+    Rank,
+    rank,
+    "Returns rank of the current row with gaps. Same as `row_number` of its 
first peer"
+);
+
+/// rank expression
+#[derive(Debug)]
+pub struct Rank {
+    signature: Signature,
+}
+
+impl Rank {
+    /// Create a new `rank` function
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::any(0, Volatility::Immutable),
+        }
+    }
+}
+
+impl Default for Rank {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl WindowUDFImpl for Rank {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "rank"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn partition_evaluator(
+        &self,
+        _partition_evaluator_args: PartitionEvaluatorArgs,
+    ) -> Result<Box<dyn PartitionEvaluator>> {
+        Ok(Box::<RankEvaluator>::default())
+    }
+
+    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+        Ok(Field::new(field_args.name(), DataType::UInt64, false))
+    }
+
+    fn sort_options(&self) -> Option<SortOptions> {
+        Some(SortOptions {
+            descending: false,
+            nulls_first: false,
+        })
+    }
+}
+
+/// State for the RANK(rank) built-in window function.
+#[derive(Debug, Clone, Default)]
+pub struct RankState {
+    /// The last values for rank as these values change, we increase n_rank
+    pub last_rank_data: Option<Vec<ScalarValue>>,
+    /// The index where last_rank_boundary is started
+    pub last_rank_boundary: usize,
+    /// Keep the number of entries in current rank
+    pub current_group_count: usize,
+    /// Rank number kept from the start
+    pub n_rank: usize,
+}
+
+/// State for the `rank` built-in window function.
+#[derive(Debug, Default)]
+struct RankEvaluator {
+    state: RankState,
+}
+
+impl PartitionEvaluator for RankEvaluator {
+    fn is_causal(&self) -> bool {
+        // The rank function doesn't need "future" values to emit results:
+        true
+    }
+
+    fn evaluate(
+        &mut self,
+        values: &[ArrayRef],
+        range: &Range<usize>,
+    ) -> Result<ScalarValue> {
+        let row_idx = range.start;
+        // There is no argument, values are order by column values (where rank 
is calculated)
+        let range_columns = values;
+        let last_rank_data = get_row_at_idx(range_columns, row_idx)?;
+        let new_rank_encountered =
+            if let Some(state_last_rank_data) = &self.state.last_rank_data {
+                // if rank data changes, new rank is encountered
+                state_last_rank_data != &last_rank_data
+            } else {
+                // First rank seen
+                true
+            };
+        if new_rank_encountered {
+            self.state.last_rank_data = Some(last_rank_data);
+            self.state.last_rank_boundary += self.state.current_group_count;
+            self.state.current_group_count = 1;
+            self.state.n_rank += 1;
+        } else {
+            // data is still in the same rank
+            self.state.current_group_count += 1;
+        }
+
+        Ok(ScalarValue::UInt64(Some(
+            self.state.last_rank_boundary as u64 + 1,
+        )))
+    }
+
+    fn evaluate_all_with_rank(
+        &self,
+        _num_rows: usize,
+        ranks_in_partition: &[Range<usize>],
+    ) -> Result<ArrayRef> {
+        let result = Arc::new(UInt64Array::from_iter_values(
+            ranks_in_partition
+                .iter()
+                .scan(1_u64, |acc, range| {
+                    let len = range.end - range.start;
+                    let result = iter::repeat(*acc).take(len);
+                    *acc += len as u64;
+                    Some(result)
+                })
+                .flatten(),
+        ));
+
+        Ok(result)
+    }
+
+    fn supports_bounded_execution(&self) -> bool {
+        true
+    }
+
+    fn include_rank(&self) -> bool {
+        true
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use datafusion_common::cast::as_uint64_array;
+
+    fn test_with_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
+        test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected)
+    }
+
+    #[allow(clippy::single_range_in_vec_init)]
+    fn test_without_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
+        test_i32_result(expr, vec![0..8], expected)
+    }
+
+    fn test_i32_result(
+        expr: &Rank,
+        ranks: Vec<Range<usize>>,
+        expected: Vec<u64>,
+    ) -> Result<()> {
+        let args = PartitionEvaluatorArgs::default();
+        let result = expr
+            .partition_evaluator(args)?
+            .evaluate_all_with_rank(8, &ranks)?;
+        let result = as_uint64_array(&result)?;
+        let result = result.values();
+        assert_eq!(expected, *result);
+        Ok(())
+    }
+
+    #[test]
+    fn test_rank() -> Result<()> {
+        let r = Rank::default();
+        test_without_rank(&r, vec![1; 8])?;
+        test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?;
+        Ok(())
+    }
+}
diff --git a/datafusion/functions-window/src/row_number.rs 
b/datafusion/functions-window/src/row_number.rs
index ca4372bef2..56af14fb84 100644
--- a/datafusion/functions-window/src/row_number.rs
+++ b/datafusion/functions-window/src/row_number.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines physical expression for `row_number` that can evaluated at runtime 
during query execution
+//! `row_number` window function implementation
 
 use datafusion_common::arrow::array::ArrayRef;
 use datafusion_common::arrow::array::UInt64Array;
diff --git a/datafusion/physical-expr/src/expressions/mod.rs 
b/datafusion/physical-expr/src/expressions/mod.rs
index 177fd799ae..e07e11e431 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -39,7 +39,6 @@ pub use crate::window::cume_dist::{cume_dist, CumeDist};
 pub use crate::window::lead_lag::{lag, lead, WindowShift};
 pub use crate::window::nth_value::NthValue;
 pub use crate::window::ntile::Ntile;
-pub use crate::window::rank::{dense_rank, percent_rank, rank, Rank, RankType};
 pub use crate::PhysicalSortExpr;
 
 pub use binary::{binary, similar_to, BinaryExpr};
diff --git a/datafusion/physical-expr/src/window/mod.rs 
b/datafusion/physical-expr/src/window/mod.rs
index 2aeb053331..938bdac50f 100644
--- a/datafusion/physical-expr/src/window/mod.rs
+++ b/datafusion/physical-expr/src/window/mod.rs
@@ -22,7 +22,6 @@ pub(crate) mod cume_dist;
 pub(crate) mod lead_lag;
 pub(crate) mod nth_value;
 pub(crate) mod ntile;
-pub(crate) mod rank;
 mod sliding_aggregate;
 mod window_expr;
 
diff --git a/datafusion/physical-expr/src/window/rank.rs 
b/datafusion/physical-expr/src/window/rank.rs
deleted file mode 100644
index fa3d4e487f..0000000000
--- a/datafusion/physical-expr/src/window/rank.rs
+++ /dev/null
@@ -1,313 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Defines physical expression for `rank`, `dense_rank`, and `percent_rank` 
that can evaluated
-//! at runtime during query execution
-
-use crate::expressions::Column;
-use crate::window::window_expr::RankState;
-use crate::window::BuiltInWindowFunctionExpr;
-use crate::{PhysicalExpr, PhysicalSortExpr};
-
-use arrow::array::ArrayRef;
-use arrow::array::{Float64Array, UInt64Array};
-use arrow::datatypes::{DataType, Field};
-use arrow_schema::{SchemaRef, SortOptions};
-use datafusion_common::utils::get_row_at_idx;
-use datafusion_common::{exec_err, Result, ScalarValue};
-use datafusion_expr::PartitionEvaluator;
-
-use std::any::Any;
-use std::iter;
-use std::ops::Range;
-use std::sync::Arc;
-
-/// Rank calculates the rank in the window function with order by
-#[derive(Debug)]
-pub struct Rank {
-    name: String,
-    rank_type: RankType,
-    /// Output data type
-    data_type: DataType,
-}
-
-impl Rank {
-    /// Get rank_type of the rank in window function with order by
-    pub fn get_type(&self) -> RankType {
-        self.rank_type
-    }
-}
-
-#[derive(Debug, Copy, Clone)]
-pub enum RankType {
-    Basic,
-    Dense,
-    Percent,
-}
-
-/// Create a rank window function
-pub fn rank(name: String, data_type: &DataType) -> Rank {
-    Rank {
-        name,
-        rank_type: RankType::Basic,
-        data_type: data_type.clone(),
-    }
-}
-
-/// Create a dense rank window function
-pub fn dense_rank(name: String, data_type: &DataType) -> Rank {
-    Rank {
-        name,
-        rank_type: RankType::Dense,
-        data_type: data_type.clone(),
-    }
-}
-
-/// Create a percent rank window function
-pub fn percent_rank(name: String, data_type: &DataType) -> Rank {
-    Rank {
-        name,
-        rank_type: RankType::Percent,
-        data_type: data_type.clone(),
-    }
-}
-
-impl BuiltInWindowFunctionExpr for Rank {
-    /// Return a reference to Any that can be used for downcasting
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn field(&self) -> Result<Field> {
-        let nullable = false;
-        Ok(Field::new(self.name(), self.data_type.clone(), nullable))
-    }
-
-    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![]
-    }
-
-    fn name(&self) -> &str {
-        &self.name
-    }
-
-    fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
-        Ok(Box::new(RankEvaluator {
-            state: RankState::default(),
-            rank_type: self.rank_type,
-        }))
-    }
-
-    fn get_result_ordering(&self, schema: &SchemaRef) -> 
Option<PhysicalSortExpr> {
-        // The built-in RANK window function (in all modes) introduces a new 
ordering:
-        schema.column_with_name(self.name()).map(|(idx, field)| {
-            let expr = Arc::new(Column::new(field.name(), idx));
-            let options = SortOptions {
-                descending: false,
-                nulls_first: false,
-            }; // ASC, NULLS LAST
-            PhysicalSortExpr { expr, options }
-        })
-    }
-}
-
-#[derive(Debug)]
-pub(crate) struct RankEvaluator {
-    state: RankState,
-    rank_type: RankType,
-}
-
-impl PartitionEvaluator for RankEvaluator {
-    fn is_causal(&self) -> bool {
-        matches!(self.rank_type, RankType::Basic | RankType::Dense)
-    }
-
-    /// Evaluates the window function inside the given range.
-    fn evaluate(
-        &mut self,
-        values: &[ArrayRef],
-        range: &Range<usize>,
-    ) -> Result<ScalarValue> {
-        let row_idx = range.start;
-        // There is no argument, values are order by column values (where rank 
is calculated)
-        let range_columns = values;
-        let last_rank_data = get_row_at_idx(range_columns, row_idx)?;
-        let new_rank_encountered =
-            if let Some(state_last_rank_data) = &self.state.last_rank_data {
-                // if rank data changes, new rank is encountered
-                state_last_rank_data != &last_rank_data
-            } else {
-                // First rank seen
-                true
-            };
-        if new_rank_encountered {
-            self.state.last_rank_data = Some(last_rank_data);
-            self.state.last_rank_boundary += self.state.current_group_count;
-            self.state.current_group_count = 1;
-            self.state.n_rank += 1;
-        } else {
-            // data is still in the same rank
-            self.state.current_group_count += 1;
-        }
-        match self.rank_type {
-            RankType::Basic => Ok(ScalarValue::UInt64(Some(
-                self.state.last_rank_boundary as u64 + 1,
-            ))),
-            RankType::Dense => Ok(ScalarValue::UInt64(Some(self.state.n_rank 
as u64))),
-            RankType::Percent => {
-                exec_err!("Can not execute PERCENT_RANK in a streaming 
fashion")
-            }
-        }
-    }
-
-    fn evaluate_all_with_rank(
-        &self,
-        num_rows: usize,
-        ranks_in_partition: &[Range<usize>],
-    ) -> Result<ArrayRef> {
-        // see https://www.postgresql.org/docs/current/functions-window.html
-        let result: ArrayRef = match self.rank_type {
-            RankType::Dense => Arc::new(UInt64Array::from_iter_values(
-                ranks_in_partition
-                    .iter()
-                    .zip(1u64..)
-                    .flat_map(|(range, rank)| {
-                        let len = range.end - range.start;
-                        iter::repeat(rank).take(len)
-                    }),
-            )),
-            RankType::Percent => {
-                // Returns the relative rank of the current row, that is (rank 
- 1) / (total partition rows - 1). The value thus ranges from 0 to 1 inclusive.
-                let denominator = num_rows as f64;
-                Arc::new(Float64Array::from_iter_values(
-                    ranks_in_partition
-                        .iter()
-                        .scan(0_u64, |acc, range| {
-                            let len = range.end - range.start;
-                            let value = (*acc as f64) / (denominator - 
1.0).max(1.0);
-                            let result = iter::repeat(value).take(len);
-                            *acc += len as u64;
-                            Some(result)
-                        })
-                        .flatten(),
-                ))
-            }
-            RankType::Basic => Arc::new(UInt64Array::from_iter_values(
-                ranks_in_partition
-                    .iter()
-                    .scan(1_u64, |acc, range| {
-                        let len = range.end - range.start;
-                        let result = iter::repeat(*acc).take(len);
-                        *acc += len as u64;
-                        Some(result)
-                    })
-                    .flatten(),
-            )),
-        };
-        Ok(result)
-    }
-
-    fn supports_bounded_execution(&self) -> bool {
-        matches!(self.rank_type, RankType::Basic | RankType::Dense)
-    }
-
-    fn include_rank(&self) -> bool {
-        true
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use datafusion_common::cast::{as_float64_array, as_uint64_array};
-
-    fn test_with_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
-        test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected)
-    }
-
-    #[allow(clippy::single_range_in_vec_init)]
-    fn test_without_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
-        test_i32_result(expr, vec![0..8], expected)
-    }
-
-    fn test_f64_result(
-        expr: &Rank,
-        num_rows: usize,
-        ranks: Vec<Range<usize>>,
-        expected: Vec<f64>,
-    ) -> Result<()> {
-        let result = expr
-            .create_evaluator()?
-            .evaluate_all_with_rank(num_rows, &ranks)?;
-        let result = as_float64_array(&result)?;
-        let result = result.values();
-        assert_eq!(expected, *result);
-        Ok(())
-    }
-
-    fn test_i32_result(
-        expr: &Rank,
-        ranks: Vec<Range<usize>>,
-        expected: Vec<u64>,
-    ) -> Result<()> {
-        let result = expr.create_evaluator()?.evaluate_all_with_rank(8, 
&ranks)?;
-        let result = as_uint64_array(&result)?;
-        let result = result.values();
-        assert_eq!(expected, *result);
-        Ok(())
-    }
-
-    #[test]
-    fn test_dense_rank() -> Result<()> {
-        let r = dense_rank("arr".into(), &DataType::UInt64);
-        test_without_rank(&r, vec![1; 8])?;
-        test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?;
-        Ok(())
-    }
-
-    #[test]
-    fn test_rank() -> Result<()> {
-        let r = rank("arr".into(), &DataType::UInt64);
-        test_without_rank(&r, vec![1; 8])?;
-        test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?;
-        Ok(())
-    }
-
-    #[test]
-    #[allow(clippy::single_range_in_vec_init)]
-    fn test_percent_rank() -> Result<()> {
-        let r = percent_rank("arr".into(), &DataType::Float64);
-
-        // empty case
-        let expected = vec![0.0; 0];
-        test_f64_result(&r, 0, vec![0..0; 0], expected)?;
-
-        // singleton case
-        let expected = vec![0.0];
-        test_f64_result(&r, 1, vec![0..1], expected)?;
-
-        // uniform case
-        let expected = vec![0.0; 7];
-        test_f64_result(&r, 7, vec![0..7], expected)?;
-
-        // non-trivial case
-        let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5];
-        test_f64_result(&r, 7, vec![0..3, 3..7], expected)?;
-
-        Ok(())
-    }
-}
diff --git a/datafusion/physical-expr/src/window/window_expr.rs 
b/datafusion/physical-expr/src/window/window_expr.rs
index 8f6f78df8c..46c46fab68 100644
--- a/datafusion/physical-expr/src/window/window_expr.rs
+++ b/datafusion/physical-expr/src/window/window_expr.rs
@@ -530,19 +530,6 @@ pub enum WindowFn {
     Aggregate(Box<dyn Accumulator>),
 }
 
-/// State for the RANK(percent_rank, rank, dense_rank) built-in window 
function.
-#[derive(Debug, Clone, Default)]
-pub struct RankState {
-    /// The last values for rank as these values change, we increase n_rank
-    pub last_rank_data: Option<Vec<ScalarValue>>,
-    /// The index where last_rank_boundary is started
-    pub last_rank_boundary: usize,
-    /// Keep the number of entries in current rank
-    pub current_group_count: usize,
-    /// Rank number kept from the start
-    pub n_rank: usize,
-}
-
 /// Tag to differentiate special use cases of the NTH_VALUE built-in window 
function.
 #[derive(Debug, Copy, Clone)]
 pub enum NthValueKind {
diff --git a/datafusion/physical-plan/src/windows/mod.rs 
b/datafusion/physical-plan/src/windows/mod.rs
index ff5085a6d9..6f7d95bf95 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -21,10 +21,7 @@ use std::borrow::Borrow;
 use std::sync::Arc;
 
 use crate::{
-    expressions::{
-        cume_dist, dense_rank, lag, lead, percent_rank, rank, Literal, 
NthValue, Ntile,
-        PhysicalSortExpr,
-    },
+    expressions::{cume_dist, lag, lead, Literal, NthValue, Ntile, 
PhysicalSortExpr},
     ExecutionPlan, ExecutionPlanProperties, InputOrderMode, PhysicalExpr,
 };
 
@@ -231,9 +228,6 @@ fn create_built_in_window_expr(
     let out_data_type: &DataType = 
input_schema.field_with_name(&name)?.data_type();
 
     Ok(match fun {
-        BuiltInWindowFunction::Rank => Arc::new(rank(name, out_data_type)),
-        BuiltInWindowFunction::DenseRank => Arc::new(dense_rank(name, 
out_data_type)),
-        BuiltInWindowFunction::PercentRank => Arc::new(percent_rank(name, 
out_data_type)),
         BuiltInWindowFunction::CumeDist => Arc::new(cume_dist(name, 
out_data_type)),
         BuiltInWindowFunction::Ntile => {
             let n = get_scalar_value_from_args(args, 0)?.ok_or_else(|| {
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index e36c91e7d0..803cb49919 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -508,9 +508,9 @@ message ScalarUDFExprNode {
 enum BuiltInWindowFunction {
   UNSPECIFIED = 0; // 
https://protobuf.dev/programming-guides/dos-donts/#unspecified-enum
   // ROW_NUMBER = 0;
-  RANK = 1;
-  DENSE_RANK = 2;
-  PERCENT_RANK = 3;
+  //  RANK = 1;
+  //  DENSE_RANK = 2;
+  //  PERCENT_RANK = 3;
   CUME_DIST = 4;
   NTILE = 5;
   LAG = 6;
@@ -739,7 +739,7 @@ message FileSinkConfig {
   datafusion_common.Schema output_schema = 4;
   repeated PartitionColumn table_partition_cols = 5;
   bool keep_partition_by_columns = 9;
-  InsertOp insert_op = 10; 
+  InsertOp insert_op = 10;
 }
 
 enum InsertOp {
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 004798b3ba..c7d4c4561a 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -1662,9 +1662,6 @@ impl serde::Serialize for BuiltInWindowFunction {
     {
         let variant = match self {
             Self::Unspecified => "UNSPECIFIED",
-            Self::Rank => "RANK",
-            Self::DenseRank => "DENSE_RANK",
-            Self::PercentRank => "PERCENT_RANK",
             Self::CumeDist => "CUME_DIST",
             Self::Ntile => "NTILE",
             Self::Lag => "LAG",
@@ -1684,9 +1681,6 @@ impl<'de> serde::Deserialize<'de> for 
BuiltInWindowFunction {
     {
         const FIELDS: &[&str] = &[
             "UNSPECIFIED",
-            "RANK",
-            "DENSE_RANK",
-            "PERCENT_RANK",
             "CUME_DIST",
             "NTILE",
             "LAG",
@@ -1735,9 +1729,6 @@ impl<'de> serde::Deserialize<'de> for 
BuiltInWindowFunction {
             {
                 match value {
                     "UNSPECIFIED" => Ok(BuiltInWindowFunction::Unspecified),
-                    "RANK" => Ok(BuiltInWindowFunction::Rank),
-                    "DENSE_RANK" => Ok(BuiltInWindowFunction::DenseRank),
-                    "PERCENT_RANK" => Ok(BuiltInWindowFunction::PercentRank),
                     "CUME_DIST" => Ok(BuiltInWindowFunction::CumeDist),
                     "NTILE" => Ok(BuiltInWindowFunction::Ntile),
                     "LAG" => Ok(BuiltInWindowFunction::Lag),
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 436347330d..8cba6f84f7 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1819,9 +1819,9 @@ pub enum BuiltInWindowFunction {
     /// <https://protobuf.dev/programming-guides/dos-donts/#unspecified-enum>
     Unspecified = 0,
     /// ROW_NUMBER = 0;
-    Rank = 1,
-    DenseRank = 2,
-    PercentRank = 3,
+    ///   RANK = 1;
+    ///   DENSE_RANK = 2;
+    ///   PERCENT_RANK = 3;
     CumeDist = 4,
     Ntile = 5,
     Lag = 6,
@@ -1838,9 +1838,6 @@ impl BuiltInWindowFunction {
     pub fn as_str_name(&self) -> &'static str {
         match self {
             Self::Unspecified => "UNSPECIFIED",
-            Self::Rank => "RANK",
-            Self::DenseRank => "DENSE_RANK",
-            Self::PercentRank => "PERCENT_RANK",
             Self::CumeDist => "CUME_DIST",
             Self::Ntile => "NTILE",
             Self::Lag => "LAG",
@@ -1854,9 +1851,6 @@ impl BuiltInWindowFunction {
     pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
         match value {
             "UNSPECIFIED" => Some(Self::Unspecified),
-            "RANK" => Some(Self::Rank),
-            "DENSE_RANK" => Some(Self::DenseRank),
-            "PERCENT_RANK" => Some(Self::PercentRank),
             "CUME_DIST" => Some(Self::CumeDist),
             "NTILE" => Some(Self::Ntile),
             "LAG" => Some(Self::Lag),
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs 
b/datafusion/proto/src/logical_plan/from_proto.rs
index 893255ccc8..32e1b68203 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -142,9 +142,6 @@ impl From<protobuf::BuiltInWindowFunction> for 
BuiltInWindowFunction {
     fn from(built_in_function: protobuf::BuiltInWindowFunction) -> Self {
         match built_in_function {
             protobuf::BuiltInWindowFunction::Unspecified => todo!(),
-            protobuf::BuiltInWindowFunction::Rank => Self::Rank,
-            protobuf::BuiltInWindowFunction::PercentRank => Self::PercentRank,
-            protobuf::BuiltInWindowFunction::DenseRank => Self::DenseRank,
             protobuf::BuiltInWindowFunction::Lag => Self::Lag,
             protobuf::BuiltInWindowFunction::Lead => Self::Lead,
             protobuf::BuiltInWindowFunction::FirstValue => Self::FirstValue,
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs 
b/datafusion/proto/src/logical_plan/to_proto.rs
index 63d1a007c1..07823b422f 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -119,11 +119,8 @@ impl From<&BuiltInWindowFunction> for 
protobuf::BuiltInWindowFunction {
             BuiltInWindowFunction::NthValue => Self::NthValue,
             BuiltInWindowFunction::Ntile => Self::Ntile,
             BuiltInWindowFunction::CumeDist => Self::CumeDist,
-            BuiltInWindowFunction::PercentRank => Self::PercentRank,
-            BuiltInWindowFunction::Rank => Self::Rank,
             BuiltInWindowFunction::Lag => Self::Lag,
             BuiltInWindowFunction::Lead => Self::Lead,
-            BuiltInWindowFunction::DenseRank => Self::DenseRank,
         }
     }
 }
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index 6f6065a1c2..85d4fe8a16 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -24,8 +24,8 @@ use datafusion::physical_expr::window::{NthValueKind, 
SlidingAggregateWindowExpr
 use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr};
 use datafusion::physical_plan::expressions::{
     BinaryExpr, CaseExpr, CastExpr, Column, CumeDist, InListExpr, 
IsNotNullExpr,
-    IsNullExpr, Literal, NegativeExpr, NotExpr, NthValue, Ntile, Rank, 
RankType,
-    TryCastExpr, WindowShift,
+    IsNullExpr, Literal, NegativeExpr, NotExpr, NthValue, Ntile, TryCastExpr,
+    WindowShift,
 };
 use datafusion::physical_plan::udaf::AggregateFunctionExpr;
 use datafusion::physical_plan::windows::{BuiltInWindowExpr, 
PlainAggregateWindowExpr};
@@ -109,14 +109,7 @@ pub fn serialize_physical_window_expr(
         let expr = built_in_window_expr.get_built_in_func_expr();
         let built_in_fn_expr = expr.as_any();
 
-        let builtin_fn = if let Some(rank_expr) = 
built_in_fn_expr.downcast_ref::<Rank>()
-        {
-            match rank_expr.get_type() {
-                RankType::Basic => protobuf::BuiltInWindowFunction::Rank,
-                RankType::Dense => protobuf::BuiltInWindowFunction::DenseRank,
-                RankType::Percent => 
protobuf::BuiltInWindowFunction::PercentRank,
-            }
-        } else if built_in_fn_expr.downcast_ref::<CumeDist>().is_some() {
+        let builtin_fn = if 
built_in_fn_expr.downcast_ref::<CumeDist>().is_some() {
             protobuf::BuiltInWindowFunction::CumeDist
         } else if let Some(ntile_expr) = 
built_in_fn_expr.downcast_ref::<Ntile>() {
             args.insert(
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 46b1af3d32..8c9b368598 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -47,6 +47,9 @@ use datafusion::functions_aggregate::expr_fn::{
 };
 use datafusion::functions_aggregate::min_max::max_udaf;
 use datafusion::functions_nested::map::map;
+use datafusion::functions_window::dense_rank::dense_rank;
+use datafusion::functions_window::percent_rank::percent_rank;
+use datafusion::functions_window::rank::{rank, rank_udwf};
 use datafusion::functions_window::row_number::row_number;
 use datafusion::prelude::*;
 use datafusion::test_util::{TestTableFactory, TestTableProvider};
@@ -938,6 +941,9 @@ async fn roundtrip_expr_api() -> Result<()> {
             vec![lit(10), lit(20), lit(30)],
         ),
         row_number(),
+        rank(),
+        dense_rank(),
+        percent_rank(),
         nth_value(col("b"), 1, vec![]),
         nth_value(
             col("b"),
@@ -2305,9 +2311,7 @@ fn roundtrip_window() {
 
     // 1. without window_frame
     let test_expr1 = Expr::WindowFunction(expr::WindowFunction::new(
-        WindowFunctionDefinition::BuiltInWindowFunction(
-            datafusion_expr::BuiltInWindowFunction::Rank,
-        ),
+        WindowFunctionDefinition::WindowUDF(rank_udwf()),
         vec![],
     ))
     .partition_by(vec![col("col1")])
@@ -2318,9 +2322,7 @@ fn roundtrip_window() {
 
     // 2. with default window_frame
     let test_expr2 = Expr::WindowFunction(expr::WindowFunction::new(
-        WindowFunctionDefinition::BuiltInWindowFunction(
-            datafusion_expr::BuiltInWindowFunction::Rank,
-        ),
+        WindowFunctionDefinition::WindowUDF(rank_udwf()),
         vec![],
     ))
     .partition_by(vec![col("col1")])
@@ -2337,9 +2339,7 @@ fn roundtrip_window() {
     );
 
     let test_expr3 = Expr::WindowFunction(expr::WindowFunction::new(
-        WindowFunctionDefinition::BuiltInWindowFunction(
-            datafusion_expr::BuiltInWindowFunction::Rank,
-        ),
+        WindowFunctionDefinition::WindowUDF(rank_udwf()),
         vec![],
     ))
     .partition_by(vec![col("col1")])
diff --git a/datafusion/sql/tests/common/mod.rs 
b/datafusion/sql/tests/common/mod.rs
index fe0e5f7283..47caeec78d 100644
--- a/datafusion/sql/tests/common/mod.rs
+++ b/datafusion/sql/tests/common/mod.rs
@@ -54,6 +54,7 @@ pub(crate) struct MockSessionState {
     scalar_functions: HashMap<String, Arc<ScalarUDF>>,
     aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
     expr_planners: Vec<Arc<dyn ExprPlanner>>,
+    window_functions: HashMap<String, Arc<WindowUDF>>,
     pub config_options: ConfigOptions,
 }
 
@@ -80,6 +81,12 @@ impl MockSessionState {
         );
         self
     }
+
+    pub fn with_window_function(mut self, window_function: Arc<WindowUDF>) -> 
Self {
+        self.window_functions
+            .insert(window_function.name().to_string(), window_function);
+        self
+    }
 }
 
 pub(crate) struct MockContextProvider {
@@ -217,8 +224,8 @@ impl ContextProvider for MockContextProvider {
         unimplemented!()
     }
 
-    fn get_window_meta(&self, _name: &str) -> Option<Arc<WindowUDF>> {
-        None
+    fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>> {
+        self.state.window_functions.get(name).cloned()
     }
 
     fn options(&self) -> &ConfigOptions {
diff --git a/datafusion/sql/tests/sql_integration.rs 
b/datafusion/sql/tests/sql_integration.rs
index 44b591fede..19f3d31321 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -48,6 +48,7 @@ use datafusion_functions_aggregate::{
     min_max::min_udaf,
 };
 use datafusion_functions_aggregate::{average::avg_udaf, 
grouping::grouping_udaf};
+use datafusion_functions_window::rank::rank_udwf;
 use rstest::rstest;
 use sqlparser::dialect::{Dialect, GenericDialect, HiveDialect, MySqlDialect};
 
@@ -2633,6 +2634,7 @@ fn logical_plan_with_dialect_and_options(
         .with_aggregate_function(min_udaf())
         .with_aggregate_function(max_udaf())
         .with_aggregate_function(grouping_udaf())
+        .with_window_function(rank_udwf())
         .with_expr_planner(Arc::new(CoreFunctionPlanner::default()));
 
     let context = MockContextProvider { state };
@@ -3059,8 +3061,8 @@ fn rank_partition_grouping() {
             from
                 person
             group by rollup(state, last_name)";
-    let expected = "Projection: sum(person.age) AS total_sum, person.state, 
person.last_name, grouping(person.state) + grouping(person.last_name) AS x, 
RANK() PARTITION BY [grouping(person.state) + grouping(person.last_name), CASE 
WHEN grouping(person.last_name) = Int64(0) THEN person.state END] ORDER BY 
[sum(person.age) DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS the_rank\
-        \n  WindowAggr: windowExpr=[[RANK() PARTITION BY 
[grouping(person.state) + grouping(person.last_name), CASE WHEN 
grouping(person.last_name) = Int64(0) THEN person.state END] ORDER BY 
[sum(person.age) DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW]]\
+    let expected = "Projection: sum(person.age) AS total_sum, person.state, 
person.last_name, grouping(person.state) + grouping(person.last_name) AS x, 
rank() PARTITION BY [grouping(person.state) + grouping(person.last_name), CASE 
WHEN grouping(person.last_name) = Int64(0) THEN person.state END] ORDER BY 
[sum(person.age) DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS the_rank\
+        \n  WindowAggr: windowExpr=[[rank() PARTITION BY 
[grouping(person.state) + grouping(person.last_name), CASE WHEN 
grouping(person.last_name) = Int64(0) THEN person.state END] ORDER BY 
[sum(person.age) DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW]]\
         \n    Aggregate: groupBy=[[ROLLUP (person.state, person.last_name)]], 
aggr=[[sum(person.age), grouping(person.state), grouping(person.last_name)]]\
         \n      TableScan: person";
     quick_test(sql, expected);
diff --git a/datafusion/sqllogictest/test_files/subquery_sort.slt 
b/datafusion/sqllogictest/test_files/subquery_sort.slt
index 17affbc0ac..e4360a9269 100644
--- a/datafusion/sqllogictest/test_files/subquery_sort.slt
+++ b/datafusion/sqllogictest/test_files/subquery_sort.slt
@@ -93,14 +93,14 @@ logical_plan
 02)--Sort: t2.c1 ASC NULLS LAST, t2.c3 ASC NULLS LAST, t2.c9 ASC NULLS LAST
 03)----SubqueryAlias: t2
 04)------Sort: sink_table.c1 ASC NULLS LAST, sink_table.c3 ASC NULLS LAST, 
fetch=2
-05)--------Projection: sink_table.c1, RANK() ORDER BY [sink_table.c1 DESC 
NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS r, 
sink_table.c3, sink_table.c9
-06)----------WindowAggr: windowExpr=[[RANK() ORDER BY [sink_table.c1 DESC 
NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+05)--------Projection: sink_table.c1, rank() ORDER BY [sink_table.c1 DESC 
NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS r, 
sink_table.c3, sink_table.c9
+06)----------WindowAggr: windowExpr=[[rank() ORDER BY [sink_table.c1 DESC 
NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
 07)------------TableScan: sink_table projection=[c1, c3, c9]
 physical_plan
 01)ProjectionExec: expr=[c1@0 as c1, r@1 as r]
 02)--SortExec: TopK(fetch=2), expr=[c1@0 ASC NULLS LAST,c3@2 ASC NULLS 
LAST,c9@3 ASC NULLS LAST], preserve_partitioning=[false]
-03)----ProjectionExec: expr=[c1@0 as c1, RANK() ORDER BY [sink_table.c1 DESC 
NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as r, c3@1 as 
c3, c9@2 as c9]
-04)------BoundedWindowAggExec: wdw=[RANK() ORDER BY [sink_table.c1 DESC NULLS 
FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"RANK() ORDER BY [sink_table.c1 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Utf8(NULL)), end_bound: CurrentRow, is_causal: false }], 
mode=[Sorted]
+03)----ProjectionExec: expr=[c1@0 as c1, rank() ORDER BY [sink_table.c1 DESC 
NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as r, c3@1 as 
c3, c9@2 as c9]
+04)------BoundedWindowAggExec: wdw=[rank() ORDER BY [sink_table.c1 DESC NULLS 
FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"rank() ORDER BY [sink_table.c1 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Utf8(NULL)), end_bound: CurrentRow, is_causal: false }], 
mode=[Sorted]
 05)--------SortExec: expr=[c1@0 DESC], preserve_partitioning=[false]
 06)----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c3, c9], has_header=true
 
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index cb6c6a5ace..40309a1f2d 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -2636,14 +2636,14 @@ EXPLAIN SELECT
 ----
 logical_plan
 01)Sort: annotated_data_finite.ts DESC NULLS FIRST, fetch=5
-02)--Projection: annotated_data_finite.ts, 
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv1, 
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv2, 
last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lv1, 
last_value(annot [...]
-03)----WindowAggr: windowExpr=[[first_value(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING, first_value(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 
FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER  [...]
+02)--Projection: annotated_data_finite.ts, 
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv1, 
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv2, 
last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lv1, 
last_value(annot [...]
+03)----WindowAggr: windowExpr=[[first_value(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING, first_value(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 
FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER  [...]
 04)------WindowAggr: windowExpr=[[first_value(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING, first_value(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 
FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING, last_value(annotated_data_finite.inc_col [...]
 05)--------TableScan: annotated_data_finite projection=[ts, inc_col]
 physical_plan
 01)SortExec: TopK(fetch=5), expr=[ts@0 DESC], preserve_partitioning=[false]
-02)--ProjectionExec: expr=[ts@0 as ts, 
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1, 
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2, 
last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1, 
last_value( [...]
-03)----BoundedWindowAggExec: wdw=[first_value(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(10)), end_bo [...]
+02)--ProjectionExec: expr=[ts@0 as ts, 
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1, 
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2, 
last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1, 
last_value( [...]
+03)----BoundedWindowAggExec: wdw=[first_value(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(10)), end_bo [...]
 04)------BoundedWindowAggExec: wdw=[first_value(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(1)), e [...]
 05)--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
diff --git a/datafusion/substrait/tests/cases/serialize.rs 
b/datafusion/substrait/tests/cases/serialize.rs
index da0898d222..72d685817d 100644
--- a/datafusion/substrait/tests/cases/serialize.rs
+++ b/datafusion/substrait/tests/cases/serialize.rs
@@ -117,8 +117,8 @@ mod tests {
         let datafusion_plan = df.into_optimized_plan()?;
         assert_eq!(
             format!("{}", datafusion_plan),
-            "Projection: data.b, RANK() PARTITION BY [data.a] ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, data.c\
-            \n  WindowAggr: windowExpr=[[RANK() PARTITION BY [data.a] ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
+            "Projection: data.b, rank() PARTITION BY [data.a] ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, data.c\
+            \n  WindowAggr: windowExpr=[[rank() PARTITION BY [data.a] ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
             \n    TableScan: data projection=[a, b, c]",
         );
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to