This is an automated email from the ASF dual-hosted git repository.

berkay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 747001a414 Combine the logic of rank, dense_rank and percent_rank udwf 
 to reduce duplications (#12893)
747001a414 is described below

commit 747001a41481e0cf39dc758a85d1bdb64fdeb7c0
Author: Jagdish Parihar <[email protected]>
AuthorDate: Wed Oct 16 12:21:51 2024 +0530

    Combine the logic of rank, dense_rank and percent_rank udwf  to reduce 
duplications (#12893)
    
    * wip: combining the logic of rank, dense_rank and percent_rank udwf
    
    * added test for dense_rank and percent_rank
    
    * removed unnecessary files and fixed issue for percent_rank and dense_rank 
udwf
    
    * updated the module imports for the percent_rank and dense_rank udwfs
    
    * removed data_type field from then rank struct
    
    * fixed function-window macros
    
    * removed unused function
    
    * module doc updated for rank.rs
---
 datafusion/core/tests/fuzz_cases/window_fuzz.rs    |   3 +-
 datafusion/functions-window/src/dense_rank.rs      | 205 ----------------
 datafusion/functions-window/src/lib.rs             |  10 +-
 datafusion/functions-window/src/macros.rs          |   4 +-
 datafusion/functions-window/src/percent_rank.rs    | 192 ---------------
 datafusion/functions-window/src/rank.rs            | 263 ++++++++++++++++++---
 .../proto/tests/cases/roundtrip_logical_plan.rs    |   4 +-
 docs/source/user-guide/sql/window_functions_new.md |  27 +++
 8 files changed, 260 insertions(+), 448 deletions(-)

diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
index feffb11bf7..4a33334770 100644
--- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
@@ -45,8 +45,7 @@ use datafusion_physical_expr::{PhysicalExpr, 
PhysicalSortExpr};
 use test_utils::add_empty_batches;
 
 use datafusion::functions_window::row_number::row_number_udwf;
-use datafusion_functions_window::dense_rank::dense_rank_udwf;
-use datafusion_functions_window::rank::rank_udwf;
+use datafusion_functions_window::rank::{dense_rank_udwf, rank_udwf};
 use hashbrown::HashMap;
 use rand::distributions::Alphanumeric;
 use rand::rngs::StdRng;
diff --git a/datafusion/functions-window/src/dense_rank.rs 
b/datafusion/functions-window/src/dense_rank.rs
deleted file mode 100644
index c969a7c46f..0000000000
--- a/datafusion/functions-window/src/dense_rank.rs
+++ /dev/null
@@ -1,205 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! `dense_rank` window function implementation
-
-use std::any::Any;
-use std::fmt::Debug;
-use std::iter;
-use std::ops::Range;
-use std::sync::Arc;
-
-use crate::define_udwf_and_expr;
-use crate::rank::RankState;
-use datafusion_common::arrow::array::ArrayRef;
-use datafusion_common::arrow::array::UInt64Array;
-use datafusion_common::arrow::compute::SortOptions;
-use datafusion_common::arrow::datatypes::DataType;
-use datafusion_common::arrow::datatypes::Field;
-use datafusion_common::utils::get_row_at_idx;
-use datafusion_common::{Result, ScalarValue};
-use datafusion_expr::{PartitionEvaluator, Signature, Volatility, 
WindowUDFImpl};
-use datafusion_functions_window_common::field;
-use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
-use field::WindowUDFFieldArgs;
-
-define_udwf_and_expr!(
-    DenseRank,
-    dense_rank,
-    "Returns rank of the current row without gaps. This function counts peer 
groups"
-);
-
-/// dense_rank expression
-#[derive(Debug)]
-pub struct DenseRank {
-    signature: Signature,
-}
-
-impl DenseRank {
-    /// Create a new `dense_rank` function
-    pub fn new() -> Self {
-        Self {
-            signature: Signature::any(0, Volatility::Immutable),
-        }
-    }
-}
-
-impl Default for DenseRank {
-    fn default() -> Self {
-        Self::new()
-    }
-}
-
-impl WindowUDFImpl for DenseRank {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn name(&self) -> &str {
-        "dense_rank"
-    }
-
-    fn signature(&self) -> &Signature {
-        &self.signature
-    }
-
-    fn partition_evaluator(
-        &self,
-        _partition_evaluator_args: PartitionEvaluatorArgs,
-    ) -> Result<Box<dyn PartitionEvaluator>> {
-        Ok(Box::<DenseRankEvaluator>::default())
-    }
-
-    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
-        Ok(Field::new(field_args.name(), DataType::UInt64, false))
-    }
-
-    fn sort_options(&self) -> Option<SortOptions> {
-        Some(SortOptions {
-            descending: false,
-            nulls_first: false,
-        })
-    }
-}
-
-/// State for the `dense_rank` built-in window function.
-#[derive(Debug, Default)]
-struct DenseRankEvaluator {
-    state: RankState,
-}
-
-impl PartitionEvaluator for DenseRankEvaluator {
-    fn is_causal(&self) -> bool {
-        // The dense_rank function doesn't need "future" values to emit 
results:
-        true
-    }
-
-    fn evaluate(
-        &mut self,
-        values: &[ArrayRef],
-        range: &Range<usize>,
-    ) -> Result<ScalarValue> {
-        let row_idx = range.start;
-        // There is no argument, values are order by column values (where rank 
is calculated)
-        let range_columns = values;
-        let last_rank_data = get_row_at_idx(range_columns, row_idx)?;
-        let new_rank_encountered =
-            if let Some(state_last_rank_data) = &self.state.last_rank_data {
-                // if rank data changes, new rank is encountered
-                state_last_rank_data != &last_rank_data
-            } else {
-                // First rank seen
-                true
-            };
-
-        if new_rank_encountered {
-            self.state.last_rank_data = Some(last_rank_data);
-            self.state.last_rank_boundary += self.state.current_group_count;
-            self.state.current_group_count = 1;
-            self.state.n_rank += 1;
-        } else {
-            // data is still in the same rank
-            self.state.current_group_count += 1;
-        }
-
-        Ok(ScalarValue::UInt64(Some(self.state.n_rank as u64)))
-    }
-
-    fn evaluate_all_with_rank(
-        &self,
-        _num_rows: usize,
-        ranks_in_partition: &[Range<usize>],
-    ) -> Result<ArrayRef> {
-        let result = Arc::new(UInt64Array::from_iter_values(
-            ranks_in_partition
-                .iter()
-                .zip(1u64..)
-                .flat_map(|(range, rank)| {
-                    let len = range.end - range.start;
-                    iter::repeat(rank).take(len)
-                }),
-        ));
-
-        Ok(result)
-    }
-
-    fn supports_bounded_execution(&self) -> bool {
-        true
-    }
-
-    fn include_rank(&self) -> bool {
-        true
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use datafusion_common::cast::as_uint64_array;
-
-    fn test_with_rank(expr: &DenseRank, expected: Vec<u64>) -> Result<()> {
-        test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected)
-    }
-
-    #[allow(clippy::single_range_in_vec_init)]
-    fn test_without_rank(expr: &DenseRank, expected: Vec<u64>) -> Result<()> {
-        test_i32_result(expr, vec![0..8], expected)
-    }
-
-    fn test_i32_result(
-        expr: &DenseRank,
-        ranks: Vec<Range<usize>>,
-        expected: Vec<u64>,
-    ) -> Result<()> {
-        let args = PartitionEvaluatorArgs::default();
-        let result = expr
-            .partition_evaluator(args)?
-            .evaluate_all_with_rank(8, &ranks)?;
-        let result = as_uint64_array(&result)?;
-        let result = result.values();
-        assert_eq!(expected, *result);
-        Ok(())
-    }
-
-    #[test]
-    fn test_dense_rank() -> Result<()> {
-        let r = DenseRank::default();
-        test_without_rank(&r, vec![1; 8])?;
-        test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?;
-        Ok(())
-    }
-}
diff --git a/datafusion/functions-window/src/lib.rs 
b/datafusion/functions-window/src/lib.rs
index b727809908..ef624e13e6 100644
--- a/datafusion/functions-window/src/lib.rs
+++ b/datafusion/functions-window/src/lib.rs
@@ -31,16 +31,12 @@ use datafusion_expr::WindowUDF;
 
 #[macro_use]
 pub mod macros;
-pub mod dense_rank;
-pub mod percent_rank;
 pub mod rank;
 pub mod row_number;
 
 /// Fluent-style API for creating `Expr`s
 pub mod expr_fn {
-    pub use super::dense_rank::dense_rank;
-    pub use super::percent_rank::percent_rank;
-    pub use super::rank::rank;
+    pub use super::rank::{dense_rank, percent_rank, rank};
     pub use super::row_number::row_number;
 }
 
@@ -49,8 +45,8 @@ pub fn all_default_window_functions() -> Vec<Arc<WindowUDF>> {
     vec![
         row_number::row_number_udwf(),
         rank::rank_udwf(),
-        dense_rank::dense_rank_udwf(),
-        percent_rank::percent_rank_udwf(),
+        rank::dense_rank_udwf(),
+        rank::percent_rank_udwf(),
     ]
 }
 /// Registers all enabled packages with a [`FunctionRegistry`]
diff --git a/datafusion/functions-window/src/macros.rs 
b/datafusion/functions-window/src/macros.rs
index e934f883b1..2905ccf4c2 100644
--- a/datafusion/functions-window/src/macros.rs
+++ b/datafusion/functions-window/src/macros.rs
@@ -303,7 +303,7 @@ macro_rules! create_udwf_expr {
     ($UDWF:ident, $OUT_FN_NAME:ident, $DOC:expr) => {
         paste::paste! {
             #[doc = " Create a 
[`WindowFunction`](datafusion_expr::Expr::WindowFunction) expression for"]
-            #[doc = concat!(" [`", stringify!($UDWF), "`] user-defined window 
function.")]
+            #[doc = concat!(" `", stringify!($UDWF), "` user-defined window 
function.")]
             #[doc = ""]
             #[doc = concat!(" ", $DOC)]
             pub fn $OUT_FN_NAME() -> datafusion_expr::Expr {
@@ -316,7 +316,7 @@ macro_rules! create_udwf_expr {
     ($UDWF:ident, $OUT_FN_NAME:ident, [$($PARAM:ident),+], $DOC:expr) => {
         paste::paste! {
             #[doc = " Create a 
[`WindowFunction`](datafusion_expr::Expr::WindowFunction) expression for"]
-            #[doc = concat!(" [`", stringify!($UDWF), "`] user-defined window 
function.")]
+            #[doc = concat!(" `", stringify!($UDWF), "` user-defined window 
function.")]
             #[doc = ""]
             #[doc = concat!(" ", $DOC)]
             pub fn $OUT_FN_NAME(
diff --git a/datafusion/functions-window/src/percent_rank.rs 
b/datafusion/functions-window/src/percent_rank.rs
deleted file mode 100644
index 147959f69b..0000000000
--- a/datafusion/functions-window/src/percent_rank.rs
+++ /dev/null
@@ -1,192 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! `percent_rank` window function implementation
-
-use std::any::Any;
-use std::fmt::Debug;
-use std::iter;
-use std::ops::Range;
-use std::sync::Arc;
-
-use crate::define_udwf_and_expr;
-use datafusion_common::arrow::array::ArrayRef;
-use datafusion_common::arrow::array::Float64Array;
-use datafusion_common::arrow::compute::SortOptions;
-use datafusion_common::arrow::datatypes::DataType;
-use datafusion_common::arrow::datatypes::Field;
-use datafusion_common::{exec_err, Result, ScalarValue};
-use datafusion_expr::{PartitionEvaluator, Signature, Volatility, 
WindowUDFImpl};
-use datafusion_functions_window_common::field;
-use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
-use field::WindowUDFFieldArgs;
-
-define_udwf_and_expr!(
-    PercentRank,
-    percent_rank,
-    "Returns the relative rank of the current row: (rank - 1) / (total rows - 
1)"
-);
-
-/// percent_rank expression
-#[derive(Debug)]
-pub struct PercentRank {
-    signature: Signature,
-}
-
-impl PercentRank {
-    /// Create a new `percent_rank` function
-    pub fn new() -> Self {
-        Self {
-            signature: Signature::any(0, Volatility::Immutable),
-        }
-    }
-}
-
-impl Default for PercentRank {
-    fn default() -> Self {
-        Self::new()
-    }
-}
-
-impl WindowUDFImpl for PercentRank {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn name(&self) -> &str {
-        "percent_rank"
-    }
-
-    fn signature(&self) -> &Signature {
-        &self.signature
-    }
-
-    fn partition_evaluator(
-        &self,
-        _partition_evaluator_args: PartitionEvaluatorArgs,
-    ) -> Result<Box<dyn PartitionEvaluator>> {
-        Ok(Box::<PercentRankEvaluator>::default())
-    }
-
-    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
-        Ok(Field::new(field_args.name(), DataType::Float64, false))
-    }
-
-    fn sort_options(&self) -> Option<SortOptions> {
-        Some(SortOptions {
-            descending: false,
-            nulls_first: false,
-        })
-    }
-}
-
-/// State for the `percent_rank` built-in window function.
-#[derive(Debug, Default)]
-struct PercentRankEvaluator {}
-
-impl PartitionEvaluator for PercentRankEvaluator {
-    fn is_causal(&self) -> bool {
-        // The percent_rank function doesn't need "future" values to emit 
results:
-        false
-    }
-
-    fn evaluate(
-        &mut self,
-        _values: &[ArrayRef],
-        _range: &Range<usize>,
-    ) -> Result<ScalarValue> {
-        exec_err!("Can not execute PERCENT_RANK in a streaming fashion")
-    }
-
-    fn evaluate_all_with_rank(
-        &self,
-        num_rows: usize,
-        ranks_in_partition: &[Range<usize>],
-    ) -> Result<ArrayRef> {
-        let denominator = num_rows as f64;
-        let result =
-        // Returns the relative rank of the current row, that is (rank - 1) / 
(total partition rows - 1). The value thus ranges from 0 to 1 inclusive.
-        Arc::new(Float64Array::from_iter_values(
-            ranks_in_partition
-                .iter()
-                .scan(0_u64, |acc, range| {
-                    let len = range.end - range.start;
-                    let value = (*acc as f64) / (denominator - 1.0).max(1.0);
-                    let result = iter::repeat(value).take(len);
-                    *acc += len as u64;
-                    Some(result)
-                })
-                .flatten(),
-        ));
-
-        Ok(result)
-    }
-
-    fn supports_bounded_execution(&self) -> bool {
-        false
-    }
-
-    fn include_rank(&self) -> bool {
-        true
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use datafusion_common::cast::as_float64_array;
-
-    fn test_f64_result(
-        expr: &PercentRank,
-        num_rows: usize,
-        ranks: Vec<Range<usize>>,
-        expected: Vec<f64>,
-    ) -> Result<()> {
-        let args = PartitionEvaluatorArgs::default();
-        let result = expr
-            .partition_evaluator(args)?
-            .evaluate_all_with_rank(num_rows, &ranks)?;
-        let result = as_float64_array(&result)?;
-        let result = result.values();
-        assert_eq!(expected, *result);
-        Ok(())
-    }
-
-    #[test]
-    #[allow(clippy::single_range_in_vec_init)]
-    fn test_percent_rank() -> Result<()> {
-        let r = PercentRank::default();
-
-        // empty case
-        let expected = vec![0.0; 0];
-        test_f64_result(&r, 0, vec![0..0; 0], expected)?;
-
-        // singleton case
-        let expected = vec![0.0];
-        test_f64_result(&r, 1, vec![0..1], expected)?;
-
-        // uniform case
-        let expected = vec![0.0; 7];
-        test_f64_result(&r, 7, vec![0..7], expected)?;
-
-        // non-trivial case
-        let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5];
-        test_f64_result(&r, 7, vec![0..3, 3..7], expected)?;
-
-        Ok(())
-    }
-}
diff --git a/datafusion/functions-window/src/rank.rs 
b/datafusion/functions-window/src/rank.rs
index c52dec9061..06c3f49055 100644
--- a/datafusion/functions-window/src/rank.rs
+++ b/datafusion/functions-window/src/rank.rs
@@ -15,23 +15,27 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! `rank` window function implementation
+//! Implementation of `rank`, `dense_rank`, and `percent_rank` window 
functions,
+//! which can be evaluated at runtime during query execution.
 
 use std::any::Any;
 use std::fmt::Debug;
 use std::iter;
 use std::ops::Range;
-use std::sync::Arc;
+use std::sync::{Arc, OnceLock};
 
 use crate::define_udwf_and_expr;
 use datafusion_common::arrow::array::ArrayRef;
-use datafusion_common::arrow::array::UInt64Array;
+use datafusion_common::arrow::array::{Float64Array, UInt64Array};
 use datafusion_common::arrow::compute::SortOptions;
 use datafusion_common::arrow::datatypes::DataType;
 use datafusion_common::arrow::datatypes::Field;
 use datafusion_common::utils::get_row_at_idx;
-use datafusion_common::{Result, ScalarValue};
-use datafusion_expr::{PartitionEvaluator, Signature, Volatility, 
WindowUDFImpl};
+use datafusion_common::{exec_err, Result, ScalarValue};
+use datafusion_expr::window_doc_sections::DOC_SECTION_RANKING;
+use datafusion_expr::{
+    Documentation, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
+};
 use datafusion_functions_window_common::field;
 use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
 use field::WindowUDFFieldArgs;
@@ -39,28 +43,113 @@ use field::WindowUDFFieldArgs;
 define_udwf_and_expr!(
     Rank,
     rank,
-    "Returns rank of the current row with gaps. Same as `row_number` of its 
first peer"
+    "Returns rank of the current row with gaps. Same as `row_number` of its 
first peer",
+    Rank::basic
 );
 
-/// rank expression
+define_udwf_and_expr!(
+    DenseRank,
+    dense_rank,
+    "Returns rank of the current row without gaps. This function counts peer 
groups",
+    Rank::dense_rank
+);
+
+define_udwf_and_expr!(
+    PercentRank,
+    percent_rank,
+    "Returns the relative rank of the current row: (rank - 1) / (total rows - 
1)",
+    Rank::percent_rank
+);
+
+/// Rank calculates the rank in the window function with order by
 #[derive(Debug)]
 pub struct Rank {
+    name: String,
     signature: Signature,
+    rank_type: RankType,
 }
 
 impl Rank {
-    /// Create a new `rank` function
-    pub fn new() -> Self {
+    /// Create a new `rank` function with the specified name and rank type
+    pub fn new(name: String, rank_type: RankType) -> Self {
         Self {
+            name,
             signature: Signature::any(0, Volatility::Immutable),
+            rank_type,
         }
     }
-}
 
-impl Default for Rank {
-    fn default() -> Self {
-        Self::new()
+    /// Create a `rank` window function
+    pub fn basic() -> Self {
+        Rank::new("rank".to_string(), RankType::Basic)
     }
+
+    /// Create a `dense_rank` window function
+    pub fn dense_rank() -> Self {
+        Rank::new("dense_rank".to_string(), RankType::Dense)
+    }
+
+    /// Create a `percent_rank` window function
+    pub fn percent_rank() -> Self {
+        Rank::new("percent_rank".to_string(), RankType::Percent)
+    }
+}
+
+#[derive(Debug, Copy, Clone)]
+pub enum RankType {
+    Basic,
+    Dense,
+    Percent,
+}
+
+static RANK_DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();
+
+fn get_rank_doc() -> &'static Documentation {
+    RANK_DOCUMENTATION.get_or_init(|| {
+        Documentation::builder()
+            .with_doc_section(DOC_SECTION_RANKING)
+            .with_description(
+                "Returns the rank of the current row within its partition, 
allowing \
+                gaps between ranks. This function provides a ranking similar 
to `row_number`, but \
+                skips ranks for identical values.",
+            )
+            .with_syntax_example("rank()")
+            .build()
+            .unwrap()
+    })
+}
+
+static DENSE_RANK_DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();
+
+fn get_dense_rank_doc() -> &'static Documentation {
+    DENSE_RANK_DOCUMENTATION.get_or_init(|| {
+        Documentation::builder()
+            .with_doc_section(DOC_SECTION_RANKING)
+            .with_description(
+                "Returns the rank of the current row without gaps. This 
function ranks \
+                rows in a dense manner, meaning consecutive ranks are assigned 
even for identical \
+                values.",
+            )
+            .with_syntax_example("dense_rank()")
+            .build()
+            .unwrap()
+    })
+}
+
+static PERCENT_RANK_DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();
+
+fn get_percent_rank_doc() -> &'static Documentation {
+    PERCENT_RANK_DOCUMENTATION.get_or_init(|| {
+        Documentation::builder()
+            .with_doc_section(DOC_SECTION_RANKING)
+            .with_description(
+                "Returns the percentage rank of the current row within its 
partition. \
+                The value ranges from 0 to 1 and is computed as `(rank - 1) / 
(total_rows - 1)`.",
+            )
+            .with_syntax_example("percent_rank()")
+            .build()
+            .unwrap()
+    })
 }
 
 impl WindowUDFImpl for Rank {
@@ -69,7 +158,7 @@ impl WindowUDFImpl for Rank {
     }
 
     fn name(&self) -> &str {
-        "rank"
+        &self.name
     }
 
     fn signature(&self) -> &Signature {
@@ -80,11 +169,20 @@ impl WindowUDFImpl for Rank {
         &self,
         _partition_evaluator_args: PartitionEvaluatorArgs,
     ) -> Result<Box<dyn PartitionEvaluator>> {
-        Ok(Box::<RankEvaluator>::default())
+        Ok(Box::new(RankEvaluator {
+            state: RankState::default(),
+            rank_type: self.rank_type,
+        }))
     }
 
     fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
-        Ok(Field::new(field_args.name(), DataType::UInt64, false))
+        let return_type = match self.rank_type {
+            RankType::Basic | RankType::Dense => DataType::UInt64,
+            RankType::Percent => DataType::Float64,
+        };
+
+        let nullable = false;
+        Ok(Field::new(field_args.name(), return_type, nullable))
     }
 
     fn sort_options(&self) -> Option<SortOptions> {
@@ -93,6 +191,14 @@ impl WindowUDFImpl for Rank {
             nulls_first: false,
         })
     }
+
+    fn documentation(&self) -> Option<&Documentation> {
+        match self.rank_type {
+            RankType::Basic => Some(get_rank_doc()),
+            RankType::Dense => Some(get_dense_rank_doc()),
+            RankType::Percent => Some(get_percent_rank_doc()),
+        }
+    }
 }
 
 /// State for the RANK(rank) built-in window function.
@@ -109,15 +215,15 @@ pub struct RankState {
 }
 
 /// State for the `rank` built-in window function.
-#[derive(Debug, Default)]
+#[derive(Debug)]
 struct RankEvaluator {
     state: RankState,
+    rank_type: RankType,
 }
 
 impl PartitionEvaluator for RankEvaluator {
     fn is_causal(&self) -> bool {
-        // The rank function doesn't need "future" values to emit results:
-        true
+        matches!(self.rank_type, RankType::Basic | RankType::Dense)
     }
 
     fn evaluate(
@@ -147,33 +253,68 @@ impl PartitionEvaluator for RankEvaluator {
             self.state.current_group_count += 1;
         }
 
-        Ok(ScalarValue::UInt64(Some(
-            self.state.last_rank_boundary as u64 + 1,
-        )))
+        match self.rank_type {
+            RankType::Basic => Ok(ScalarValue::UInt64(Some(
+                self.state.last_rank_boundary as u64 + 1,
+            ))),
+            RankType::Dense => Ok(ScalarValue::UInt64(Some(self.state.n_rank 
as u64))),
+            RankType::Percent => {
+                exec_err!("Can not execute PERCENT_RANK in a streaming 
fashion")
+            }
+        }
     }
 
     fn evaluate_all_with_rank(
         &self,
-        _num_rows: usize,
+        num_rows: usize,
         ranks_in_partition: &[Range<usize>],
     ) -> Result<ArrayRef> {
-        let result = Arc::new(UInt64Array::from_iter_values(
-            ranks_in_partition
-                .iter()
-                .scan(1_u64, |acc, range| {
-                    let len = range.end - range.start;
-                    let result = iter::repeat(*acc).take(len);
-                    *acc += len as u64;
-                    Some(result)
-                })
-                .flatten(),
-        ));
+        let result: ArrayRef = match self.rank_type {
+            RankType::Basic => Arc::new(UInt64Array::from_iter_values(
+                ranks_in_partition
+                    .iter()
+                    .scan(1_u64, |acc, range| {
+                        let len = range.end - range.start;
+                        let result = iter::repeat(*acc).take(len);
+                        *acc += len as u64;
+                        Some(result)
+                    })
+                    .flatten(),
+            )),
+
+            RankType::Dense => Arc::new(UInt64Array::from_iter_values(
+                ranks_in_partition
+                    .iter()
+                    .zip(1u64..)
+                    .flat_map(|(range, rank)| {
+                        let len = range.end - range.start;
+                        iter::repeat(rank).take(len)
+                    }),
+            )),
+
+            RankType::Percent => {
+                let denominator = num_rows as f64;
+
+                Arc::new(Float64Array::from_iter_values(
+                    ranks_in_partition
+                        .iter()
+                        .scan(0_u64, |acc, range| {
+                            let len = range.end - range.start;
+                            let value = (*acc as f64) / (denominator - 
1.0).max(1.0);
+                            let result = iter::repeat(value).take(len);
+                            *acc += len as u64;
+                            Some(result)
+                        })
+                        .flatten(),
+                ))
+            }
+        };
 
         Ok(result)
     }
 
     fn supports_bounded_execution(&self) -> bool {
-        true
+        matches!(self.rank_type, RankType::Basic | RankType::Dense)
     }
 
     fn include_rank(&self) -> bool {
@@ -184,7 +325,7 @@ impl PartitionEvaluator for RankEvaluator {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use datafusion_common::cast::as_uint64_array;
+    use datafusion_common::cast::{as_float64_array, as_uint64_array};
 
     fn test_with_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
         test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected)
@@ -210,11 +351,59 @@ mod tests {
         Ok(())
     }
 
+    fn test_f64_result(
+        expr: &Rank,
+        num_rows: usize,
+        ranks: Vec<Range<usize>>,
+        expected: Vec<f64>,
+    ) -> Result<()> {
+        let args = PartitionEvaluatorArgs::default();
+        let result = expr
+            .partition_evaluator(args)?
+            .evaluate_all_with_rank(num_rows, &ranks)?;
+        let result = as_float64_array(&result)?;
+        let result = result.values();
+        assert_eq!(expected, *result);
+        Ok(())
+    }
+
     #[test]
     fn test_rank() -> Result<()> {
-        let r = Rank::default();
+        let r = Rank::basic();
         test_without_rank(&r, vec![1; 8])?;
         test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?;
         Ok(())
     }
+
+    #[test]
+    fn test_dense_rank() -> Result<()> {
+        let r = Rank::dense_rank();
+        test_without_rank(&r, vec![1; 8])?;
+        test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?;
+        Ok(())
+    }
+
+    #[test]
+    #[allow(clippy::single_range_in_vec_init)]
+    fn test_percent_rank() -> Result<()> {
+        let r = Rank::percent_rank();
+
+        // empty case
+        let expected = vec![0.0; 0];
+        test_f64_result(&r, 0, vec![0..0; 0], expected)?;
+
+        // singleton case
+        let expected = vec![0.0];
+        test_f64_result(&r, 1, vec![0..1], expected)?;
+
+        // uniform case
+        let expected = vec![0.0; 7];
+        test_f64_result(&r, 7, vec![0..7], expected)?;
+
+        // non-trivial case
+        let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5];
+        test_f64_result(&r, 7, vec![0..3, 3..7], expected)?;
+
+        Ok(())
+    }
 }
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 8c9b368598..ffa8fc1eef 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -47,9 +47,7 @@ use datafusion::functions_aggregate::expr_fn::{
 };
 use datafusion::functions_aggregate::min_max::max_udaf;
 use datafusion::functions_nested::map::map;
-use datafusion::functions_window::dense_rank::dense_rank;
-use datafusion::functions_window::percent_rank::percent_rank;
-use datafusion::functions_window::rank::{rank, rank_udwf};
+use datafusion::functions_window::rank::{dense_rank, percent_rank, rank, 
rank_udwf};
 use datafusion::functions_window::row_number::row_number;
 use datafusion::prelude::*;
 use datafusion::test_util::{TestTableFactory, TestTableProvider};
diff --git a/docs/source/user-guide/sql/window_functions_new.md 
b/docs/source/user-guide/sql/window_functions_new.md
index ee981911f5..462fc900d1 100644
--- a/docs/source/user-guide/sql/window_functions_new.md
+++ b/docs/source/user-guide/sql/window_functions_new.md
@@ -157,8 +157,35 @@ All [aggregate functions](aggregate_functions.md) can be 
used as window function
 
 ## Ranking Functions
 
+- [dense_rank](#dense_rank)
+- [percent_rank](#percent_rank)
+- [rank](#rank)
 - [row_number](#row_number)
 
+### `dense_rank`
+
+Returns the rank of the current row without gaps. This function ranks rows in 
a dense manner, meaning consecutive ranks are assigned even for identical 
values.
+
+```
+dense_rank()
+```
+
+### `percent_rank`
+
+Returns the percentage rank of the current row within its partition. The value 
ranges from 0 to 1 and is computed as `(rank - 1) / (total_rows - 1)`.
+
+```
+percent_rank()
+```
+
+### `rank`
+
+Returns the rank of the current row within its partition, allowing gaps 
between ranks. This function provides a ranking similar to `row_number`, but 
skips ranks for identical values.
+
+```
+rank()
+```
+
 ### `row_number`
 
 Number of the current row within its partition, counting from 1.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to