This is an automated email from the ASF dual-hosted git repository.
berkay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 747001a414 Combine the logic of rank, dense_rank and percent_rank udwf
to reduce duplications (#12893)
747001a414 is described below
commit 747001a41481e0cf39dc758a85d1bdb64fdeb7c0
Author: Jagdish Parihar <[email protected]>
AuthorDate: Wed Oct 16 12:21:51 2024 +0530
Combine the logic of rank, dense_rank and percent_rank udwf to reduce
duplications (#12893)
* wip: combining the logic of rank, dense_rank and percent_rank udwf
* added test for dense_rank and percent_rank
* removed unnecessary files and fixed issue for percent_rank and dense_rank
udwf
* updated the module imports for the percent_rank and dense_rank udwfs
* removed data_type field from then rank struct
* fixed function-window macros
* removed unused function
* module doc updated for rank.rs
---
datafusion/core/tests/fuzz_cases/window_fuzz.rs | 3 +-
datafusion/functions-window/src/dense_rank.rs | 205 ----------------
datafusion/functions-window/src/lib.rs | 10 +-
datafusion/functions-window/src/macros.rs | 4 +-
datafusion/functions-window/src/percent_rank.rs | 192 ---------------
datafusion/functions-window/src/rank.rs | 263 ++++++++++++++++++---
.../proto/tests/cases/roundtrip_logical_plan.rs | 4 +-
docs/source/user-guide/sql/window_functions_new.md | 27 +++
8 files changed, 260 insertions(+), 448 deletions(-)
diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
index feffb11bf7..4a33334770 100644
--- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
@@ -45,8 +45,7 @@ use datafusion_physical_expr::{PhysicalExpr,
PhysicalSortExpr};
use test_utils::add_empty_batches;
use datafusion::functions_window::row_number::row_number_udwf;
-use datafusion_functions_window::dense_rank::dense_rank_udwf;
-use datafusion_functions_window::rank::rank_udwf;
+use datafusion_functions_window::rank::{dense_rank_udwf, rank_udwf};
use hashbrown::HashMap;
use rand::distributions::Alphanumeric;
use rand::rngs::StdRng;
diff --git a/datafusion/functions-window/src/dense_rank.rs
b/datafusion/functions-window/src/dense_rank.rs
deleted file mode 100644
index c969a7c46f..0000000000
--- a/datafusion/functions-window/src/dense_rank.rs
+++ /dev/null
@@ -1,205 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! `dense_rank` window function implementation
-
-use std::any::Any;
-use std::fmt::Debug;
-use std::iter;
-use std::ops::Range;
-use std::sync::Arc;
-
-use crate::define_udwf_and_expr;
-use crate::rank::RankState;
-use datafusion_common::arrow::array::ArrayRef;
-use datafusion_common::arrow::array::UInt64Array;
-use datafusion_common::arrow::compute::SortOptions;
-use datafusion_common::arrow::datatypes::DataType;
-use datafusion_common::arrow::datatypes::Field;
-use datafusion_common::utils::get_row_at_idx;
-use datafusion_common::{Result, ScalarValue};
-use datafusion_expr::{PartitionEvaluator, Signature, Volatility,
WindowUDFImpl};
-use datafusion_functions_window_common::field;
-use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
-use field::WindowUDFFieldArgs;
-
-define_udwf_and_expr!(
- DenseRank,
- dense_rank,
- "Returns rank of the current row without gaps. This function counts peer
groups"
-);
-
-/// dense_rank expression
-#[derive(Debug)]
-pub struct DenseRank {
- signature: Signature,
-}
-
-impl DenseRank {
- /// Create a new `dense_rank` function
- pub fn new() -> Self {
- Self {
- signature: Signature::any(0, Volatility::Immutable),
- }
- }
-}
-
-impl Default for DenseRank {
- fn default() -> Self {
- Self::new()
- }
-}
-
-impl WindowUDFImpl for DenseRank {
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn name(&self) -> &str {
- "dense_rank"
- }
-
- fn signature(&self) -> &Signature {
- &self.signature
- }
-
- fn partition_evaluator(
- &self,
- _partition_evaluator_args: PartitionEvaluatorArgs,
- ) -> Result<Box<dyn PartitionEvaluator>> {
- Ok(Box::<DenseRankEvaluator>::default())
- }
-
- fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
- Ok(Field::new(field_args.name(), DataType::UInt64, false))
- }
-
- fn sort_options(&self) -> Option<SortOptions> {
- Some(SortOptions {
- descending: false,
- nulls_first: false,
- })
- }
-}
-
-/// State for the `dense_rank` built-in window function.
-#[derive(Debug, Default)]
-struct DenseRankEvaluator {
- state: RankState,
-}
-
-impl PartitionEvaluator for DenseRankEvaluator {
- fn is_causal(&self) -> bool {
- // The dense_rank function doesn't need "future" values to emit
results:
- true
- }
-
- fn evaluate(
- &mut self,
- values: &[ArrayRef],
- range: &Range<usize>,
- ) -> Result<ScalarValue> {
- let row_idx = range.start;
- // There is no argument, values are order by column values (where rank
is calculated)
- let range_columns = values;
- let last_rank_data = get_row_at_idx(range_columns, row_idx)?;
- let new_rank_encountered =
- if let Some(state_last_rank_data) = &self.state.last_rank_data {
- // if rank data changes, new rank is encountered
- state_last_rank_data != &last_rank_data
- } else {
- // First rank seen
- true
- };
-
- if new_rank_encountered {
- self.state.last_rank_data = Some(last_rank_data);
- self.state.last_rank_boundary += self.state.current_group_count;
- self.state.current_group_count = 1;
- self.state.n_rank += 1;
- } else {
- // data is still in the same rank
- self.state.current_group_count += 1;
- }
-
- Ok(ScalarValue::UInt64(Some(self.state.n_rank as u64)))
- }
-
- fn evaluate_all_with_rank(
- &self,
- _num_rows: usize,
- ranks_in_partition: &[Range<usize>],
- ) -> Result<ArrayRef> {
- let result = Arc::new(UInt64Array::from_iter_values(
- ranks_in_partition
- .iter()
- .zip(1u64..)
- .flat_map(|(range, rank)| {
- let len = range.end - range.start;
- iter::repeat(rank).take(len)
- }),
- ));
-
- Ok(result)
- }
-
- fn supports_bounded_execution(&self) -> bool {
- true
- }
-
- fn include_rank(&self) -> bool {
- true
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use datafusion_common::cast::as_uint64_array;
-
- fn test_with_rank(expr: &DenseRank, expected: Vec<u64>) -> Result<()> {
- test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected)
- }
-
- #[allow(clippy::single_range_in_vec_init)]
- fn test_without_rank(expr: &DenseRank, expected: Vec<u64>) -> Result<()> {
- test_i32_result(expr, vec![0..8], expected)
- }
-
- fn test_i32_result(
- expr: &DenseRank,
- ranks: Vec<Range<usize>>,
- expected: Vec<u64>,
- ) -> Result<()> {
- let args = PartitionEvaluatorArgs::default();
- let result = expr
- .partition_evaluator(args)?
- .evaluate_all_with_rank(8, &ranks)?;
- let result = as_uint64_array(&result)?;
- let result = result.values();
- assert_eq!(expected, *result);
- Ok(())
- }
-
- #[test]
- fn test_dense_rank() -> Result<()> {
- let r = DenseRank::default();
- test_without_rank(&r, vec![1; 8])?;
- test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?;
- Ok(())
- }
-}
diff --git a/datafusion/functions-window/src/lib.rs
b/datafusion/functions-window/src/lib.rs
index b727809908..ef624e13e6 100644
--- a/datafusion/functions-window/src/lib.rs
+++ b/datafusion/functions-window/src/lib.rs
@@ -31,16 +31,12 @@ use datafusion_expr::WindowUDF;
#[macro_use]
pub mod macros;
-pub mod dense_rank;
-pub mod percent_rank;
pub mod rank;
pub mod row_number;
/// Fluent-style API for creating `Expr`s
pub mod expr_fn {
- pub use super::dense_rank::dense_rank;
- pub use super::percent_rank::percent_rank;
- pub use super::rank::rank;
+ pub use super::rank::{dense_rank, percent_rank, rank};
pub use super::row_number::row_number;
}
@@ -49,8 +45,8 @@ pub fn all_default_window_functions() -> Vec<Arc<WindowUDF>> {
vec![
row_number::row_number_udwf(),
rank::rank_udwf(),
- dense_rank::dense_rank_udwf(),
- percent_rank::percent_rank_udwf(),
+ rank::dense_rank_udwf(),
+ rank::percent_rank_udwf(),
]
}
/// Registers all enabled packages with a [`FunctionRegistry`]
diff --git a/datafusion/functions-window/src/macros.rs
b/datafusion/functions-window/src/macros.rs
index e934f883b1..2905ccf4c2 100644
--- a/datafusion/functions-window/src/macros.rs
+++ b/datafusion/functions-window/src/macros.rs
@@ -303,7 +303,7 @@ macro_rules! create_udwf_expr {
($UDWF:ident, $OUT_FN_NAME:ident, $DOC:expr) => {
paste::paste! {
#[doc = " Create a
[`WindowFunction`](datafusion_expr::Expr::WindowFunction) expression for"]
- #[doc = concat!(" [`", stringify!($UDWF), "`] user-defined window
function.")]
+ #[doc = concat!(" `", stringify!($UDWF), "` user-defined window
function.")]
#[doc = ""]
#[doc = concat!(" ", $DOC)]
pub fn $OUT_FN_NAME() -> datafusion_expr::Expr {
@@ -316,7 +316,7 @@ macro_rules! create_udwf_expr {
($UDWF:ident, $OUT_FN_NAME:ident, [$($PARAM:ident),+], $DOC:expr) => {
paste::paste! {
#[doc = " Create a
[`WindowFunction`](datafusion_expr::Expr::WindowFunction) expression for"]
- #[doc = concat!(" [`", stringify!($UDWF), "`] user-defined window
function.")]
+ #[doc = concat!(" `", stringify!($UDWF), "` user-defined window
function.")]
#[doc = ""]
#[doc = concat!(" ", $DOC)]
pub fn $OUT_FN_NAME(
diff --git a/datafusion/functions-window/src/percent_rank.rs
b/datafusion/functions-window/src/percent_rank.rs
deleted file mode 100644
index 147959f69b..0000000000
--- a/datafusion/functions-window/src/percent_rank.rs
+++ /dev/null
@@ -1,192 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! `percent_rank` window function implementation
-
-use std::any::Any;
-use std::fmt::Debug;
-use std::iter;
-use std::ops::Range;
-use std::sync::Arc;
-
-use crate::define_udwf_and_expr;
-use datafusion_common::arrow::array::ArrayRef;
-use datafusion_common::arrow::array::Float64Array;
-use datafusion_common::arrow::compute::SortOptions;
-use datafusion_common::arrow::datatypes::DataType;
-use datafusion_common::arrow::datatypes::Field;
-use datafusion_common::{exec_err, Result, ScalarValue};
-use datafusion_expr::{PartitionEvaluator, Signature, Volatility,
WindowUDFImpl};
-use datafusion_functions_window_common::field;
-use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
-use field::WindowUDFFieldArgs;
-
-define_udwf_and_expr!(
- PercentRank,
- percent_rank,
- "Returns the relative rank of the current row: (rank - 1) / (total rows -
1)"
-);
-
-/// percent_rank expression
-#[derive(Debug)]
-pub struct PercentRank {
- signature: Signature,
-}
-
-impl PercentRank {
- /// Create a new `percent_rank` function
- pub fn new() -> Self {
- Self {
- signature: Signature::any(0, Volatility::Immutable),
- }
- }
-}
-
-impl Default for PercentRank {
- fn default() -> Self {
- Self::new()
- }
-}
-
-impl WindowUDFImpl for PercentRank {
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn name(&self) -> &str {
- "percent_rank"
- }
-
- fn signature(&self) -> &Signature {
- &self.signature
- }
-
- fn partition_evaluator(
- &self,
- _partition_evaluator_args: PartitionEvaluatorArgs,
- ) -> Result<Box<dyn PartitionEvaluator>> {
- Ok(Box::<PercentRankEvaluator>::default())
- }
-
- fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
- Ok(Field::new(field_args.name(), DataType::Float64, false))
- }
-
- fn sort_options(&self) -> Option<SortOptions> {
- Some(SortOptions {
- descending: false,
- nulls_first: false,
- })
- }
-}
-
-/// State for the `percent_rank` built-in window function.
-#[derive(Debug, Default)]
-struct PercentRankEvaluator {}
-
-impl PartitionEvaluator for PercentRankEvaluator {
- fn is_causal(&self) -> bool {
- // The percent_rank function doesn't need "future" values to emit
results:
- false
- }
-
- fn evaluate(
- &mut self,
- _values: &[ArrayRef],
- _range: &Range<usize>,
- ) -> Result<ScalarValue> {
- exec_err!("Can not execute PERCENT_RANK in a streaming fashion")
- }
-
- fn evaluate_all_with_rank(
- &self,
- num_rows: usize,
- ranks_in_partition: &[Range<usize>],
- ) -> Result<ArrayRef> {
- let denominator = num_rows as f64;
- let result =
- // Returns the relative rank of the current row, that is (rank - 1) /
(total partition rows - 1). The value thus ranges from 0 to 1 inclusive.
- Arc::new(Float64Array::from_iter_values(
- ranks_in_partition
- .iter()
- .scan(0_u64, |acc, range| {
- let len = range.end - range.start;
- let value = (*acc as f64) / (denominator - 1.0).max(1.0);
- let result = iter::repeat(value).take(len);
- *acc += len as u64;
- Some(result)
- })
- .flatten(),
- ));
-
- Ok(result)
- }
-
- fn supports_bounded_execution(&self) -> bool {
- false
- }
-
- fn include_rank(&self) -> bool {
- true
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use datafusion_common::cast::as_float64_array;
-
- fn test_f64_result(
- expr: &PercentRank,
- num_rows: usize,
- ranks: Vec<Range<usize>>,
- expected: Vec<f64>,
- ) -> Result<()> {
- let args = PartitionEvaluatorArgs::default();
- let result = expr
- .partition_evaluator(args)?
- .evaluate_all_with_rank(num_rows, &ranks)?;
- let result = as_float64_array(&result)?;
- let result = result.values();
- assert_eq!(expected, *result);
- Ok(())
- }
-
- #[test]
- #[allow(clippy::single_range_in_vec_init)]
- fn test_percent_rank() -> Result<()> {
- let r = PercentRank::default();
-
- // empty case
- let expected = vec![0.0; 0];
- test_f64_result(&r, 0, vec![0..0; 0], expected)?;
-
- // singleton case
- let expected = vec![0.0];
- test_f64_result(&r, 1, vec![0..1], expected)?;
-
- // uniform case
- let expected = vec![0.0; 7];
- test_f64_result(&r, 7, vec![0..7], expected)?;
-
- // non-trivial case
- let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5];
- test_f64_result(&r, 7, vec![0..3, 3..7], expected)?;
-
- Ok(())
- }
-}
diff --git a/datafusion/functions-window/src/rank.rs
b/datafusion/functions-window/src/rank.rs
index c52dec9061..06c3f49055 100644
--- a/datafusion/functions-window/src/rank.rs
+++ b/datafusion/functions-window/src/rank.rs
@@ -15,23 +15,27 @@
// specific language governing permissions and limitations
// under the License.
-//! `rank` window function implementation
+//! Implementation of `rank`, `dense_rank`, and `percent_rank` window
functions,
+//! which can be evaluated at runtime during query execution.
use std::any::Any;
use std::fmt::Debug;
use std::iter;
use std::ops::Range;
-use std::sync::Arc;
+use std::sync::{Arc, OnceLock};
use crate::define_udwf_and_expr;
use datafusion_common::arrow::array::ArrayRef;
-use datafusion_common::arrow::array::UInt64Array;
+use datafusion_common::arrow::array::{Float64Array, UInt64Array};
use datafusion_common::arrow::compute::SortOptions;
use datafusion_common::arrow::datatypes::DataType;
use datafusion_common::arrow::datatypes::Field;
use datafusion_common::utils::get_row_at_idx;
-use datafusion_common::{Result, ScalarValue};
-use datafusion_expr::{PartitionEvaluator, Signature, Volatility,
WindowUDFImpl};
+use datafusion_common::{exec_err, Result, ScalarValue};
+use datafusion_expr::window_doc_sections::DOC_SECTION_RANKING;
+use datafusion_expr::{
+ Documentation, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
+};
use datafusion_functions_window_common::field;
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use field::WindowUDFFieldArgs;
@@ -39,28 +43,113 @@ use field::WindowUDFFieldArgs;
define_udwf_and_expr!(
Rank,
rank,
- "Returns rank of the current row with gaps. Same as `row_number` of its
first peer"
+ "Returns rank of the current row with gaps. Same as `row_number` of its
first peer",
+ Rank::basic
);
-/// rank expression
+define_udwf_and_expr!(
+ DenseRank,
+ dense_rank,
+ "Returns rank of the current row without gaps. This function counts peer
groups",
+ Rank::dense_rank
+);
+
+define_udwf_and_expr!(
+ PercentRank,
+ percent_rank,
+ "Returns the relative rank of the current row: (rank - 1) / (total rows -
1)",
+ Rank::percent_rank
+);
+
+/// Rank calculates the rank in the window function with order by
#[derive(Debug)]
pub struct Rank {
+ name: String,
signature: Signature,
+ rank_type: RankType,
}
impl Rank {
- /// Create a new `rank` function
- pub fn new() -> Self {
+ /// Create a new `rank` function with the specified name and rank type
+ pub fn new(name: String, rank_type: RankType) -> Self {
Self {
+ name,
signature: Signature::any(0, Volatility::Immutable),
+ rank_type,
}
}
-}
-impl Default for Rank {
- fn default() -> Self {
- Self::new()
+ /// Create a `rank` window function
+ pub fn basic() -> Self {
+ Rank::new("rank".to_string(), RankType::Basic)
}
+
+ /// Create a `dense_rank` window function
+ pub fn dense_rank() -> Self {
+ Rank::new("dense_rank".to_string(), RankType::Dense)
+ }
+
+ /// Create a `percent_rank` window function
+ pub fn percent_rank() -> Self {
+ Rank::new("percent_rank".to_string(), RankType::Percent)
+ }
+}
+
+#[derive(Debug, Copy, Clone)]
+pub enum RankType {
+ Basic,
+ Dense,
+ Percent,
+}
+
+static RANK_DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();
+
+fn get_rank_doc() -> &'static Documentation {
+ RANK_DOCUMENTATION.get_or_init(|| {
+ Documentation::builder()
+ .with_doc_section(DOC_SECTION_RANKING)
+ .with_description(
+ "Returns the rank of the current row within its partition,
allowing \
+ gaps between ranks. This function provides a ranking similar
to `row_number`, but \
+ skips ranks for identical values.",
+ )
+ .with_syntax_example("rank()")
+ .build()
+ .unwrap()
+ })
+}
+
+static DENSE_RANK_DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();
+
+fn get_dense_rank_doc() -> &'static Documentation {
+ DENSE_RANK_DOCUMENTATION.get_or_init(|| {
+ Documentation::builder()
+ .with_doc_section(DOC_SECTION_RANKING)
+ .with_description(
+ "Returns the rank of the current row without gaps. This
function ranks \
+ rows in a dense manner, meaning consecutive ranks are assigned
even for identical \
+ values.",
+ )
+ .with_syntax_example("dense_rank()")
+ .build()
+ .unwrap()
+ })
+}
+
+static PERCENT_RANK_DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();
+
+fn get_percent_rank_doc() -> &'static Documentation {
+ PERCENT_RANK_DOCUMENTATION.get_or_init(|| {
+ Documentation::builder()
+ .with_doc_section(DOC_SECTION_RANKING)
+ .with_description(
+ "Returns the percentage rank of the current row within its
partition. \
+ The value ranges from 0 to 1 and is computed as `(rank - 1) /
(total_rows - 1)`.",
+ )
+ .with_syntax_example("percent_rank()")
+ .build()
+ .unwrap()
+ })
}
impl WindowUDFImpl for Rank {
@@ -69,7 +158,7 @@ impl WindowUDFImpl for Rank {
}
fn name(&self) -> &str {
- "rank"
+ &self.name
}
fn signature(&self) -> &Signature {
@@ -80,11 +169,20 @@ impl WindowUDFImpl for Rank {
&self,
_partition_evaluator_args: PartitionEvaluatorArgs,
) -> Result<Box<dyn PartitionEvaluator>> {
- Ok(Box::<RankEvaluator>::default())
+ Ok(Box::new(RankEvaluator {
+ state: RankState::default(),
+ rank_type: self.rank_type,
+ }))
}
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
- Ok(Field::new(field_args.name(), DataType::UInt64, false))
+ let return_type = match self.rank_type {
+ RankType::Basic | RankType::Dense => DataType::UInt64,
+ RankType::Percent => DataType::Float64,
+ };
+
+ let nullable = false;
+ Ok(Field::new(field_args.name(), return_type, nullable))
}
fn sort_options(&self) -> Option<SortOptions> {
@@ -93,6 +191,14 @@ impl WindowUDFImpl for Rank {
nulls_first: false,
})
}
+
+ fn documentation(&self) -> Option<&Documentation> {
+ match self.rank_type {
+ RankType::Basic => Some(get_rank_doc()),
+ RankType::Dense => Some(get_dense_rank_doc()),
+ RankType::Percent => Some(get_percent_rank_doc()),
+ }
+ }
}
/// State for the RANK(rank) built-in window function.
@@ -109,15 +215,15 @@ pub struct RankState {
}
/// State for the `rank` built-in window function.
-#[derive(Debug, Default)]
+#[derive(Debug)]
struct RankEvaluator {
state: RankState,
+ rank_type: RankType,
}
impl PartitionEvaluator for RankEvaluator {
fn is_causal(&self) -> bool {
- // The rank function doesn't need "future" values to emit results:
- true
+ matches!(self.rank_type, RankType::Basic | RankType::Dense)
}
fn evaluate(
@@ -147,33 +253,68 @@ impl PartitionEvaluator for RankEvaluator {
self.state.current_group_count += 1;
}
- Ok(ScalarValue::UInt64(Some(
- self.state.last_rank_boundary as u64 + 1,
- )))
+ match self.rank_type {
+ RankType::Basic => Ok(ScalarValue::UInt64(Some(
+ self.state.last_rank_boundary as u64 + 1,
+ ))),
+ RankType::Dense => Ok(ScalarValue::UInt64(Some(self.state.n_rank
as u64))),
+ RankType::Percent => {
+ exec_err!("Can not execute PERCENT_RANK in a streaming
fashion")
+ }
+ }
}
fn evaluate_all_with_rank(
&self,
- _num_rows: usize,
+ num_rows: usize,
ranks_in_partition: &[Range<usize>],
) -> Result<ArrayRef> {
- let result = Arc::new(UInt64Array::from_iter_values(
- ranks_in_partition
- .iter()
- .scan(1_u64, |acc, range| {
- let len = range.end - range.start;
- let result = iter::repeat(*acc).take(len);
- *acc += len as u64;
- Some(result)
- })
- .flatten(),
- ));
+ let result: ArrayRef = match self.rank_type {
+ RankType::Basic => Arc::new(UInt64Array::from_iter_values(
+ ranks_in_partition
+ .iter()
+ .scan(1_u64, |acc, range| {
+ let len = range.end - range.start;
+ let result = iter::repeat(*acc).take(len);
+ *acc += len as u64;
+ Some(result)
+ })
+ .flatten(),
+ )),
+
+ RankType::Dense => Arc::new(UInt64Array::from_iter_values(
+ ranks_in_partition
+ .iter()
+ .zip(1u64..)
+ .flat_map(|(range, rank)| {
+ let len = range.end - range.start;
+ iter::repeat(rank).take(len)
+ }),
+ )),
+
+ RankType::Percent => {
+ let denominator = num_rows as f64;
+
+ Arc::new(Float64Array::from_iter_values(
+ ranks_in_partition
+ .iter()
+ .scan(0_u64, |acc, range| {
+ let len = range.end - range.start;
+ let value = (*acc as f64) / (denominator -
1.0).max(1.0);
+ let result = iter::repeat(value).take(len);
+ *acc += len as u64;
+ Some(result)
+ })
+ .flatten(),
+ ))
+ }
+ };
Ok(result)
}
fn supports_bounded_execution(&self) -> bool {
- true
+ matches!(self.rank_type, RankType::Basic | RankType::Dense)
}
fn include_rank(&self) -> bool {
@@ -184,7 +325,7 @@ impl PartitionEvaluator for RankEvaluator {
#[cfg(test)]
mod tests {
use super::*;
- use datafusion_common::cast::as_uint64_array;
+ use datafusion_common::cast::{as_float64_array, as_uint64_array};
fn test_with_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected)
@@ -210,11 +351,59 @@ mod tests {
Ok(())
}
+ fn test_f64_result(
+ expr: &Rank,
+ num_rows: usize,
+ ranks: Vec<Range<usize>>,
+ expected: Vec<f64>,
+ ) -> Result<()> {
+ let args = PartitionEvaluatorArgs::default();
+ let result = expr
+ .partition_evaluator(args)?
+ .evaluate_all_with_rank(num_rows, &ranks)?;
+ let result = as_float64_array(&result)?;
+ let result = result.values();
+ assert_eq!(expected, *result);
+ Ok(())
+ }
+
#[test]
fn test_rank() -> Result<()> {
- let r = Rank::default();
+ let r = Rank::basic();
test_without_rank(&r, vec![1; 8])?;
test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?;
Ok(())
}
+
+ #[test]
+ fn test_dense_rank() -> Result<()> {
+ let r = Rank::dense_rank();
+ test_without_rank(&r, vec![1; 8])?;
+ test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?;
+ Ok(())
+ }
+
+ #[test]
+ #[allow(clippy::single_range_in_vec_init)]
+ fn test_percent_rank() -> Result<()> {
+ let r = Rank::percent_rank();
+
+ // empty case
+ let expected = vec![0.0; 0];
+ test_f64_result(&r, 0, vec![0..0; 0], expected)?;
+
+ // singleton case
+ let expected = vec![0.0];
+ test_f64_result(&r, 1, vec![0..1], expected)?;
+
+ // uniform case
+ let expected = vec![0.0; 7];
+ test_f64_result(&r, 7, vec![0..7], expected)?;
+
+ // non-trivial case
+ let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5];
+ test_f64_result(&r, 7, vec![0..3, 3..7], expected)?;
+
+ Ok(())
+ }
}
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 8c9b368598..ffa8fc1eef 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -47,9 +47,7 @@ use datafusion::functions_aggregate::expr_fn::{
};
use datafusion::functions_aggregate::min_max::max_udaf;
use datafusion::functions_nested::map::map;
-use datafusion::functions_window::dense_rank::dense_rank;
-use datafusion::functions_window::percent_rank::percent_rank;
-use datafusion::functions_window::rank::{rank, rank_udwf};
+use datafusion::functions_window::rank::{dense_rank, percent_rank, rank,
rank_udwf};
use datafusion::functions_window::row_number::row_number;
use datafusion::prelude::*;
use datafusion::test_util::{TestTableFactory, TestTableProvider};
diff --git a/docs/source/user-guide/sql/window_functions_new.md
b/docs/source/user-guide/sql/window_functions_new.md
index ee981911f5..462fc900d1 100644
--- a/docs/source/user-guide/sql/window_functions_new.md
+++ b/docs/source/user-guide/sql/window_functions_new.md
@@ -157,8 +157,35 @@ All [aggregate functions](aggregate_functions.md) can be
used as window function
## Ranking Functions
+- [dense_rank](#dense_rank)
+- [percent_rank](#percent_rank)
+- [rank](#rank)
- [row_number](#row_number)
+### `dense_rank`
+
+Returns the rank of the current row without gaps. This function ranks rows in
a dense manner, meaning consecutive ranks are assigned even for identical
values.
+
+```
+dense_rank()
+```
+
+### `percent_rank`
+
+Returns the percentage rank of the current row within its partition. The value
ranges from 0 to 1 and is computed as `(rank - 1) / (total_rows - 1)`.
+
+```
+percent_rank()
+```
+
+### `rank`
+
+Returns the rank of the current row within its partition, allowing gaps
between ranks. This function provides a ranking similar to `row_number`, but
skips ranks for identical values.
+
+```
+rank()
+```
+
### `row_number`
Number of the current row within its partition, counting from 1.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]