This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 939ef9e9bc Convert `rank` / `dense_rank` and `percent_rank` builtin
functions to UDWF (#12718)
939ef9e9bc is described below
commit 939ef9e9bcf95be4215569609d189f2a94280d86
Author: Jagdish Parihar <[email protected]>
AuthorDate: Fri Oct 11 01:58:35 2024 +0530
Convert `rank` / `dense_rank` and `percent_rank` builtin functions to UDWF
(#12718)
* wip: converting rank builtin function to UDWF
* commented BuiltInWindowFunction in datafusion.proto and fixed issue
related to Datafusion window function
* implemented rank.rs, percent_rank.rs and dense_rank.rs in datafusion
functions-window
* removed a test from built in window function test for percent_rank and
updated pbson fields
* removed unnecessary code
* added window_functions field to the MockSessionState
* updated rank, percent_rank and dense_rank udwf to use macros
* wip: fix rank functionality in sql integration
* fixed rank udwf not found issue in sql_integration.rs
* evaluating rank, percent_rank and dense_rank udwf with evaluate_with_rank
function
* fixed rank projection test
* wip: fixing the percent_rank() documentation
* fixed the docs error issue
* fixed data type of the percent_rank udwf
* updated prost.rs file
* updated test and documentation
* Fix logical conflicts
* tweak module documentation
---------
Co-authored-by: jatin <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/core/tests/fuzz_cases/window_fuzz.rs | 23 +-
datafusion/expr/src/built_in_window_function.rs | 25 +-
datafusion/expr/src/expr.rs | 12 -
datafusion/expr/src/expr_fn.rs | 4 +-
datafusion/expr/src/window_function.rs | 21 --
datafusion/functions-window/src/dense_rank.rs | 205 ++++++++++++++
datafusion/functions-window/src/lib.rs | 13 +-
datafusion/functions-window/src/percent_rank.rs | 192 +++++++++++++
datafusion/functions-window/src/rank.rs | 220 +++++++++++++++
datafusion/functions-window/src/row_number.rs | 2 +-
datafusion/physical-expr/src/expressions/mod.rs | 1 -
datafusion/physical-expr/src/window/mod.rs | 1 -
datafusion/physical-expr/src/window/rank.rs | 313 ---------------------
datafusion/physical-expr/src/window/window_expr.rs | 13 -
datafusion/physical-plan/src/windows/mod.rs | 8 +-
datafusion/proto/proto/datafusion.proto | 8 +-
datafusion/proto/src/generated/pbjson.rs | 9 -
datafusion/proto/src/generated/prost.rs | 12 +-
datafusion/proto/src/logical_plan/from_proto.rs | 3 -
datafusion/proto/src/logical_plan/to_proto.rs | 3 -
datafusion/proto/src/physical_plan/to_proto.rs | 13 +-
.../proto/tests/cases/roundtrip_logical_plan.rs | 18 +-
datafusion/sql/tests/common/mod.rs | 11 +-
datafusion/sql/tests/sql_integration.rs | 6 +-
.../sqllogictest/test_files/subquery_sort.slt | 8 +-
datafusion/sqllogictest/test_files/window.slt | 8 +-
datafusion/substrait/tests/cases/serialize.rs | 4 +-
27 files changed, 687 insertions(+), 469 deletions(-)
diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
index a6c2cf700c..b9881c9f23 100644
--- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
@@ -45,6 +45,8 @@ use datafusion_physical_expr::{PhysicalExpr,
PhysicalSortExpr};
use test_utils::add_empty_batches;
use datafusion::functions_window::row_number::row_number_udwf;
+use datafusion_functions_window::dense_rank::dense_rank_udwf;
+use datafusion_functions_window::rank::rank_udwf;
use hashbrown::HashMap;
use rand::distributions::Alphanumeric;
use rand::rngs::StdRng;
@@ -224,9 +226,9 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
// )
(
// Window function
-
WindowFunctionDefinition::BuiltInWindowFunction(BuiltInWindowFunction::Rank),
+ WindowFunctionDefinition::WindowUDF(rank_udwf()),
// its name
- "RANK",
+ "rank",
// no argument
vec![],
// Expected causality, for None cases causality will be determined
from window frame boundaries
@@ -238,11 +240,9 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
// )
(
// Window function
- WindowFunctionDefinition::BuiltInWindowFunction(
- BuiltInWindowFunction::DenseRank,
- ),
+ WindowFunctionDefinition::WindowUDF(dense_rank_udwf()),
// its name
- "DENSE_RANK",
+ "dense_rank",
// no argument
vec![],
// Expected causality, for None cases causality will be determined
from window frame boundaries
@@ -382,19 +382,12 @@ fn get_random_function(
);
window_fn_map.insert(
"rank",
- (
- WindowFunctionDefinition::BuiltInWindowFunction(
- BuiltInWindowFunction::Rank,
- ),
- vec![],
- ),
+ (WindowFunctionDefinition::WindowUDF(rank_udwf()), vec![]),
);
window_fn_map.insert(
"dense_rank",
(
- WindowFunctionDefinition::BuiltInWindowFunction(
- BuiltInWindowFunction::DenseRank,
- ),
+ WindowFunctionDefinition::WindowUDF(dense_rank_udwf()),
vec![],
),
);
diff --git a/datafusion/expr/src/built_in_window_function.rs
b/datafusion/expr/src/built_in_window_function.rs
index b136d6cace..117ff08253 100644
--- a/datafusion/expr/src/built_in_window_function.rs
+++ b/datafusion/expr/src/built_in_window_function.rs
@@ -40,12 +40,6 @@ impl fmt::Display for BuiltInWindowFunction {
/// [window function]: https://en.wikipedia.org/wiki/Window_function_(SQL)
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash, EnumIter)]
pub enum BuiltInWindowFunction {
- /// rank of the current row with gaps; same as row_number of its first peer
- Rank,
- /// rank of the current row without gaps; this function counts peer groups
- DenseRank,
- /// relative rank of the current row: (rank - 1) / (total rows - 1)
- PercentRank,
/// relative rank of the current row: (number of rows preceding or peer
with current row) / (total rows)
CumeDist,
/// integer ranging from 1 to the argument value, dividing the partition
as equally as possible
@@ -72,9 +66,6 @@ impl BuiltInWindowFunction {
pub fn name(&self) -> &str {
use BuiltInWindowFunction::*;
match self {
- Rank => "RANK",
- DenseRank => "DENSE_RANK",
- PercentRank => "PERCENT_RANK",
CumeDist => "CUME_DIST",
Ntile => "NTILE",
Lag => "LAG",
@@ -90,9 +81,6 @@ impl FromStr for BuiltInWindowFunction {
type Err = DataFusionError;
fn from_str(name: &str) -> Result<BuiltInWindowFunction> {
Ok(match name.to_uppercase().as_str() {
- "RANK" => BuiltInWindowFunction::Rank,
- "DENSE_RANK" => BuiltInWindowFunction::DenseRank,
- "PERCENT_RANK" => BuiltInWindowFunction::PercentRank,
"CUME_DIST" => BuiltInWindowFunction::CumeDist,
"NTILE" => BuiltInWindowFunction::Ntile,
"LAG" => BuiltInWindowFunction::Lag,
@@ -127,12 +115,8 @@ impl BuiltInWindowFunction {
})?;
match self {
- BuiltInWindowFunction::Rank
- | BuiltInWindowFunction::DenseRank
- | BuiltInWindowFunction::Ntile => Ok(DataType::UInt64),
- BuiltInWindowFunction::PercentRank |
BuiltInWindowFunction::CumeDist => {
- Ok(DataType::Float64)
- }
+ BuiltInWindowFunction::Ntile => Ok(DataType::UInt64),
+ BuiltInWindowFunction::CumeDist => Ok(DataType::Float64),
BuiltInWindowFunction::Lag
| BuiltInWindowFunction::Lead
| BuiltInWindowFunction::FirstValue
@@ -145,10 +129,7 @@ impl BuiltInWindowFunction {
pub fn signature(&self) -> Signature {
// note: the physical expression must accept the type returned by this
function or the execution panics.
match self {
- BuiltInWindowFunction::Rank
- | BuiltInWindowFunction::DenseRank
- | BuiltInWindowFunction::PercentRank
- | BuiltInWindowFunction::CumeDist => Signature::any(0,
Volatility::Immutable),
+ BuiltInWindowFunction::CumeDist => Signature::any(0,
Volatility::Immutable),
BuiltInWindowFunction::Lag | BuiltInWindowFunction::Lead => {
Signature::one_of(
vec![
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 02a2edb980..723433f573 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -2598,15 +2598,6 @@ mod test {
Ok(())
}
- #[test]
- fn test_percent_rank_return_type() -> Result<()> {
- let fun = find_df_window_func("percent_rank").unwrap();
- let observed = fun.return_type(&[], &[], "")?;
- assert_eq!(DataType::Float64, observed);
-
- Ok(())
- }
-
#[test]
fn test_cume_dist_return_type() -> Result<()> {
let fun = find_df_window_func("cume_dist").unwrap();
@@ -2628,9 +2619,6 @@ mod test {
#[test]
fn test_window_function_case_insensitive() -> Result<()> {
let names = vec![
- "rank",
- "dense_rank",
- "percent_rank",
"cume_dist",
"ntile",
"lag",
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index ea053b9fb1..7fd4e64e0e 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -701,7 +701,6 @@ pub fn interval_month_day_nano_lit(value: &str) -> Expr {
/// # use datafusion_expr::test::function_stub::count;
/// # use sqlparser::ast::NullTreatment;
/// # use datafusion_expr::{ExprFunctionExt, lit, Expr, col};
-/// # use datafusion_expr::window_function::percent_rank;
/// # // first_value is an aggregate function in another crate
/// # fn first_value(_arg: Expr) -> Expr {
/// unimplemented!() }
@@ -721,6 +720,9 @@ pub fn interval_month_day_nano_lit(value: &str) -> Expr {
/// // Create a window expression for percent rank partitioned on column a
/// // equivalent to:
/// // `PERCENT_RANK() OVER (PARTITION BY a ORDER BY b ASC NULLS LAST IGNORE
NULLS)`
+/// // percent_rank is an udwf function in another crate
+/// # fn percent_rank() -> Expr {
+/// unimplemented!() }
/// let window = percent_rank()
/// .partition_by(vec![col("a")])
/// .order_by(vec![col("b").sort(true, true)])
diff --git a/datafusion/expr/src/window_function.rs
b/datafusion/expr/src/window_function.rs
index a80718147c..7ac6fb7d16 100644
--- a/datafusion/expr/src/window_function.rs
+++ b/datafusion/expr/src/window_function.rs
@@ -19,27 +19,6 @@ use datafusion_common::ScalarValue;
use crate::{expr::WindowFunction, BuiltInWindowFunction, Expr, Literal};
-/// Create an expression to represent the `rank` window function
-pub fn rank() -> Expr {
- Expr::WindowFunction(WindowFunction::new(BuiltInWindowFunction::Rank,
vec![]))
-}
-
-/// Create an expression to represent the `dense_rank` window function
-pub fn dense_rank() -> Expr {
- Expr::WindowFunction(WindowFunction::new(
- BuiltInWindowFunction::DenseRank,
- vec![],
- ))
-}
-
-/// Create an expression to represent the `percent_rank` window function
-pub fn percent_rank() -> Expr {
- Expr::WindowFunction(WindowFunction::new(
- BuiltInWindowFunction::PercentRank,
- vec![],
- ))
-}
-
/// Create an expression to represent the `cume_dist` window function
pub fn cume_dist() -> Expr {
Expr::WindowFunction(WindowFunction::new(BuiltInWindowFunction::CumeDist,
vec![]))
diff --git a/datafusion/functions-window/src/dense_rank.rs
b/datafusion/functions-window/src/dense_rank.rs
new file mode 100644
index 0000000000..c969a7c46f
--- /dev/null
+++ b/datafusion/functions-window/src/dense_rank.rs
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! `dense_rank` window function implementation
+
+use std::any::Any;
+use std::fmt::Debug;
+use std::iter;
+use std::ops::Range;
+use std::sync::Arc;
+
+use crate::define_udwf_and_expr;
+use crate::rank::RankState;
+use datafusion_common::arrow::array::ArrayRef;
+use datafusion_common::arrow::array::UInt64Array;
+use datafusion_common::arrow::compute::SortOptions;
+use datafusion_common::arrow::datatypes::DataType;
+use datafusion_common::arrow::datatypes::Field;
+use datafusion_common::utils::get_row_at_idx;
+use datafusion_common::{Result, ScalarValue};
+use datafusion_expr::{PartitionEvaluator, Signature, Volatility,
WindowUDFImpl};
+use datafusion_functions_window_common::field;
+use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
+use field::WindowUDFFieldArgs;
+
+define_udwf_and_expr!(
+ DenseRank,
+ dense_rank,
+ "Returns rank of the current row without gaps. This function counts peer
groups"
+);
+
+/// dense_rank expression
+#[derive(Debug)]
+pub struct DenseRank {
+ signature: Signature,
+}
+
+impl DenseRank {
+ /// Create a new `dense_rank` function
+ pub fn new() -> Self {
+ Self {
+ signature: Signature::any(0, Volatility::Immutable),
+ }
+ }
+}
+
+impl Default for DenseRank {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl WindowUDFImpl for DenseRank {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "dense_rank"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn partition_evaluator(
+ &self,
+ _partition_evaluator_args: PartitionEvaluatorArgs,
+ ) -> Result<Box<dyn PartitionEvaluator>> {
+ Ok(Box::<DenseRankEvaluator>::default())
+ }
+
+ fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+ Ok(Field::new(field_args.name(), DataType::UInt64, false))
+ }
+
+ fn sort_options(&self) -> Option<SortOptions> {
+ Some(SortOptions {
+ descending: false,
+ nulls_first: false,
+ })
+ }
+}
+
+/// State for the `dense_rank` built-in window function.
+#[derive(Debug, Default)]
+struct DenseRankEvaluator {
+ state: RankState,
+}
+
+impl PartitionEvaluator for DenseRankEvaluator {
+ fn is_causal(&self) -> bool {
+ // The dense_rank function doesn't need "future" values to emit
results:
+ true
+ }
+
+ fn evaluate(
+ &mut self,
+ values: &[ArrayRef],
+ range: &Range<usize>,
+ ) -> Result<ScalarValue> {
+ let row_idx = range.start;
+ // There is no argument, values are order by column values (where rank
is calculated)
+ let range_columns = values;
+ let last_rank_data = get_row_at_idx(range_columns, row_idx)?;
+ let new_rank_encountered =
+ if let Some(state_last_rank_data) = &self.state.last_rank_data {
+ // if rank data changes, new rank is encountered
+ state_last_rank_data != &last_rank_data
+ } else {
+ // First rank seen
+ true
+ };
+
+ if new_rank_encountered {
+ self.state.last_rank_data = Some(last_rank_data);
+ self.state.last_rank_boundary += self.state.current_group_count;
+ self.state.current_group_count = 1;
+ self.state.n_rank += 1;
+ } else {
+ // data is still in the same rank
+ self.state.current_group_count += 1;
+ }
+
+ Ok(ScalarValue::UInt64(Some(self.state.n_rank as u64)))
+ }
+
+ fn evaluate_all_with_rank(
+ &self,
+ _num_rows: usize,
+ ranks_in_partition: &[Range<usize>],
+ ) -> Result<ArrayRef> {
+ let result = Arc::new(UInt64Array::from_iter_values(
+ ranks_in_partition
+ .iter()
+ .zip(1u64..)
+ .flat_map(|(range, rank)| {
+ let len = range.end - range.start;
+ iter::repeat(rank).take(len)
+ }),
+ ));
+
+ Ok(result)
+ }
+
+ fn supports_bounded_execution(&self) -> bool {
+ true
+ }
+
+ fn include_rank(&self) -> bool {
+ true
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use datafusion_common::cast::as_uint64_array;
+
+ fn test_with_rank(expr: &DenseRank, expected: Vec<u64>) -> Result<()> {
+ test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected)
+ }
+
+ #[allow(clippy::single_range_in_vec_init)]
+ fn test_without_rank(expr: &DenseRank, expected: Vec<u64>) -> Result<()> {
+ test_i32_result(expr, vec![0..8], expected)
+ }
+
+ fn test_i32_result(
+ expr: &DenseRank,
+ ranks: Vec<Range<usize>>,
+ expected: Vec<u64>,
+ ) -> Result<()> {
+ let args = PartitionEvaluatorArgs::default();
+ let result = expr
+ .partition_evaluator(args)?
+ .evaluate_all_with_rank(8, &ranks)?;
+ let result = as_uint64_array(&result)?;
+ let result = result.values();
+ assert_eq!(expected, *result);
+ Ok(())
+ }
+
+ #[test]
+ fn test_dense_rank() -> Result<()> {
+ let r = DenseRank::default();
+ test_without_rank(&r, vec![1; 8])?;
+ test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?;
+ Ok(())
+ }
+}
diff --git a/datafusion/functions-window/src/lib.rs
b/datafusion/functions-window/src/lib.rs
index 6e98bb0914..b727809908 100644
--- a/datafusion/functions-window/src/lib.rs
+++ b/datafusion/functions-window/src/lib.rs
@@ -31,16 +31,27 @@ use datafusion_expr::WindowUDF;
#[macro_use]
pub mod macros;
+pub mod dense_rank;
+pub mod percent_rank;
+pub mod rank;
pub mod row_number;
/// Fluent-style API for creating `Expr`s
pub mod expr_fn {
+ pub use super::dense_rank::dense_rank;
+ pub use super::percent_rank::percent_rank;
+ pub use super::rank::rank;
pub use super::row_number::row_number;
}
/// Returns all default window functions
pub fn all_default_window_functions() -> Vec<Arc<WindowUDF>> {
- vec![row_number::row_number_udwf()]
+ vec![
+ row_number::row_number_udwf(),
+ rank::rank_udwf(),
+ dense_rank::dense_rank_udwf(),
+ percent_rank::percent_rank_udwf(),
+ ]
}
/// Registers all enabled packages with a [`FunctionRegistry`]
pub fn register_all(
diff --git a/datafusion/functions-window/src/percent_rank.rs
b/datafusion/functions-window/src/percent_rank.rs
new file mode 100644
index 0000000000..147959f69b
--- /dev/null
+++ b/datafusion/functions-window/src/percent_rank.rs
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! `percent_rank` window function implementation
+
+use std::any::Any;
+use std::fmt::Debug;
+use std::iter;
+use std::ops::Range;
+use std::sync::Arc;
+
+use crate::define_udwf_and_expr;
+use datafusion_common::arrow::array::ArrayRef;
+use datafusion_common::arrow::array::Float64Array;
+use datafusion_common::arrow::compute::SortOptions;
+use datafusion_common::arrow::datatypes::DataType;
+use datafusion_common::arrow::datatypes::Field;
+use datafusion_common::{exec_err, Result, ScalarValue};
+use datafusion_expr::{PartitionEvaluator, Signature, Volatility,
WindowUDFImpl};
+use datafusion_functions_window_common::field;
+use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
+use field::WindowUDFFieldArgs;
+
+define_udwf_and_expr!(
+ PercentRank,
+ percent_rank,
+ "Returns the relative rank of the current row: (rank - 1) / (total rows -
1)"
+);
+
+/// percent_rank expression
+#[derive(Debug)]
+pub struct PercentRank {
+ signature: Signature,
+}
+
+impl PercentRank {
+ /// Create a new `percent_rank` function
+ pub fn new() -> Self {
+ Self {
+ signature: Signature::any(0, Volatility::Immutable),
+ }
+ }
+}
+
+impl Default for PercentRank {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl WindowUDFImpl for PercentRank {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "percent_rank"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn partition_evaluator(
+ &self,
+ _partition_evaluator_args: PartitionEvaluatorArgs,
+ ) -> Result<Box<dyn PartitionEvaluator>> {
+ Ok(Box::<PercentRankEvaluator>::default())
+ }
+
+ fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+ Ok(Field::new(field_args.name(), DataType::Float64, false))
+ }
+
+ fn sort_options(&self) -> Option<SortOptions> {
+ Some(SortOptions {
+ descending: false,
+ nulls_first: false,
+ })
+ }
+}
+
+/// State for the `percent_rank` built-in window function.
+#[derive(Debug, Default)]
+struct PercentRankEvaluator {}
+
+impl PartitionEvaluator for PercentRankEvaluator {
+ fn is_causal(&self) -> bool {
+ // The percent_rank function doesn't need "future" values to emit
results:
+ false
+ }
+
+ fn evaluate(
+ &mut self,
+ _values: &[ArrayRef],
+ _range: &Range<usize>,
+ ) -> Result<ScalarValue> {
+ exec_err!("Can not execute PERCENT_RANK in a streaming fashion")
+ }
+
+ fn evaluate_all_with_rank(
+ &self,
+ num_rows: usize,
+ ranks_in_partition: &[Range<usize>],
+ ) -> Result<ArrayRef> {
+ let denominator = num_rows as f64;
+ let result =
+ // Returns the relative rank of the current row, that is (rank - 1) /
(total partition rows - 1). The value thus ranges from 0 to 1 inclusive.
+ Arc::new(Float64Array::from_iter_values(
+ ranks_in_partition
+ .iter()
+ .scan(0_u64, |acc, range| {
+ let len = range.end - range.start;
+ let value = (*acc as f64) / (denominator - 1.0).max(1.0);
+ let result = iter::repeat(value).take(len);
+ *acc += len as u64;
+ Some(result)
+ })
+ .flatten(),
+ ));
+
+ Ok(result)
+ }
+
+ fn supports_bounded_execution(&self) -> bool {
+ false
+ }
+
+ fn include_rank(&self) -> bool {
+ true
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use datafusion_common::cast::as_float64_array;
+
+ fn test_f64_result(
+ expr: &PercentRank,
+ num_rows: usize,
+ ranks: Vec<Range<usize>>,
+ expected: Vec<f64>,
+ ) -> Result<()> {
+ let args = PartitionEvaluatorArgs::default();
+ let result = expr
+ .partition_evaluator(args)?
+ .evaluate_all_with_rank(num_rows, &ranks)?;
+ let result = as_float64_array(&result)?;
+ let result = result.values();
+ assert_eq!(expected, *result);
+ Ok(())
+ }
+
+ #[test]
+ #[allow(clippy::single_range_in_vec_init)]
+ fn test_percent_rank() -> Result<()> {
+ let r = PercentRank::default();
+
+ // empty case
+ let expected = vec![0.0; 0];
+ test_f64_result(&r, 0, vec![0..0; 0], expected)?;
+
+ // singleton case
+ let expected = vec![0.0];
+ test_f64_result(&r, 1, vec![0..1], expected)?;
+
+ // uniform case
+ let expected = vec![0.0; 7];
+ test_f64_result(&r, 7, vec![0..7], expected)?;
+
+ // non-trivial case
+ let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5];
+ test_f64_result(&r, 7, vec![0..3, 3..7], expected)?;
+
+ Ok(())
+ }
+}
diff --git a/datafusion/functions-window/src/rank.rs
b/datafusion/functions-window/src/rank.rs
new file mode 100644
index 0000000000..c52dec9061
--- /dev/null
+++ b/datafusion/functions-window/src/rank.rs
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! `rank` window function implementation
+
+use std::any::Any;
+use std::fmt::Debug;
+use std::iter;
+use std::ops::Range;
+use std::sync::Arc;
+
+use crate::define_udwf_and_expr;
+use datafusion_common::arrow::array::ArrayRef;
+use datafusion_common::arrow::array::UInt64Array;
+use datafusion_common::arrow::compute::SortOptions;
+use datafusion_common::arrow::datatypes::DataType;
+use datafusion_common::arrow::datatypes::Field;
+use datafusion_common::utils::get_row_at_idx;
+use datafusion_common::{Result, ScalarValue};
+use datafusion_expr::{PartitionEvaluator, Signature, Volatility,
WindowUDFImpl};
+use datafusion_functions_window_common::field;
+use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
+use field::WindowUDFFieldArgs;
+
+define_udwf_and_expr!(
+ Rank,
+ rank,
+ "Returns rank of the current row with gaps. Same as `row_number` of its
first peer"
+);
+
+/// rank expression
+#[derive(Debug)]
+pub struct Rank {
+ signature: Signature,
+}
+
+impl Rank {
+ /// Create a new `rank` function
+ pub fn new() -> Self {
+ Self {
+ signature: Signature::any(0, Volatility::Immutable),
+ }
+ }
+}
+
+impl Default for Rank {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl WindowUDFImpl for Rank {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "rank"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn partition_evaluator(
+ &self,
+ _partition_evaluator_args: PartitionEvaluatorArgs,
+ ) -> Result<Box<dyn PartitionEvaluator>> {
+ Ok(Box::<RankEvaluator>::default())
+ }
+
+ fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+ Ok(Field::new(field_args.name(), DataType::UInt64, false))
+ }
+
+ fn sort_options(&self) -> Option<SortOptions> {
+ Some(SortOptions {
+ descending: false,
+ nulls_first: false,
+ })
+ }
+}
+
+/// State for the RANK(rank) built-in window function.
+#[derive(Debug, Clone, Default)]
+pub struct RankState {
+ /// The last values for rank as these values change, we increase n_rank
+ pub last_rank_data: Option<Vec<ScalarValue>>,
+ /// The index where last_rank_boundary is started
+ pub last_rank_boundary: usize,
+ /// Keep the number of entries in current rank
+ pub current_group_count: usize,
+ /// Rank number kept from the start
+ pub n_rank: usize,
+}
+
+/// State for the `rank` built-in window function.
+#[derive(Debug, Default)]
+struct RankEvaluator {
+ state: RankState,
+}
+
+impl PartitionEvaluator for RankEvaluator {
+ fn is_causal(&self) -> bool {
+ // The rank function doesn't need "future" values to emit results:
+ true
+ }
+
+ fn evaluate(
+ &mut self,
+ values: &[ArrayRef],
+ range: &Range<usize>,
+ ) -> Result<ScalarValue> {
+ let row_idx = range.start;
+ // There is no argument, values are order by column values (where rank
is calculated)
+ let range_columns = values;
+ let last_rank_data = get_row_at_idx(range_columns, row_idx)?;
+ let new_rank_encountered =
+ if let Some(state_last_rank_data) = &self.state.last_rank_data {
+ // if rank data changes, new rank is encountered
+ state_last_rank_data != &last_rank_data
+ } else {
+ // First rank seen
+ true
+ };
+ if new_rank_encountered {
+ self.state.last_rank_data = Some(last_rank_data);
+ self.state.last_rank_boundary += self.state.current_group_count;
+ self.state.current_group_count = 1;
+ self.state.n_rank += 1;
+ } else {
+ // data is still in the same rank
+ self.state.current_group_count += 1;
+ }
+
+ Ok(ScalarValue::UInt64(Some(
+ self.state.last_rank_boundary as u64 + 1,
+ )))
+ }
+
+ fn evaluate_all_with_rank(
+ &self,
+ _num_rows: usize,
+ ranks_in_partition: &[Range<usize>],
+ ) -> Result<ArrayRef> {
+ let result = Arc::new(UInt64Array::from_iter_values(
+ ranks_in_partition
+ .iter()
+ .scan(1_u64, |acc, range| {
+ let len = range.end - range.start;
+ let result = iter::repeat(*acc).take(len);
+ *acc += len as u64;
+ Some(result)
+ })
+ .flatten(),
+ ));
+
+ Ok(result)
+ }
+
+ fn supports_bounded_execution(&self) -> bool {
+ true
+ }
+
+ fn include_rank(&self) -> bool {
+ true
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use datafusion_common::cast::as_uint64_array;
+
+ fn test_with_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
+ test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected)
+ }
+
+ #[allow(clippy::single_range_in_vec_init)]
+ fn test_without_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
+ test_i32_result(expr, vec![0..8], expected)
+ }
+
+ fn test_i32_result(
+ expr: &Rank,
+ ranks: Vec<Range<usize>>,
+ expected: Vec<u64>,
+ ) -> Result<()> {
+ let args = PartitionEvaluatorArgs::default();
+ let result = expr
+ .partition_evaluator(args)?
+ .evaluate_all_with_rank(8, &ranks)?;
+ let result = as_uint64_array(&result)?;
+ let result = result.values();
+ assert_eq!(expected, *result);
+ Ok(())
+ }
+
+ #[test]
+ fn test_rank() -> Result<()> {
+ let r = Rank::default();
+ test_without_rank(&r, vec![1; 8])?;
+ test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?;
+ Ok(())
+ }
+}
diff --git a/datafusion/functions-window/src/row_number.rs
b/datafusion/functions-window/src/row_number.rs
index ca4372bef2..56af14fb84 100644
--- a/datafusion/functions-window/src/row_number.rs
+++ b/datafusion/functions-window/src/row_number.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! Defines physical expression for `row_number` that can evaluated at runtime
during query execution
+//! `row_number` window function implementation
use datafusion_common::arrow::array::ArrayRef;
use datafusion_common::arrow::array::UInt64Array;
diff --git a/datafusion/physical-expr/src/expressions/mod.rs
b/datafusion/physical-expr/src/expressions/mod.rs
index 177fd799ae..e07e11e431 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -39,7 +39,6 @@ pub use crate::window::cume_dist::{cume_dist, CumeDist};
pub use crate::window::lead_lag::{lag, lead, WindowShift};
pub use crate::window::nth_value::NthValue;
pub use crate::window::ntile::Ntile;
-pub use crate::window::rank::{dense_rank, percent_rank, rank, Rank, RankType};
pub use crate::PhysicalSortExpr;
pub use binary::{binary, similar_to, BinaryExpr};
diff --git a/datafusion/physical-expr/src/window/mod.rs
b/datafusion/physical-expr/src/window/mod.rs
index 2aeb053331..938bdac50f 100644
--- a/datafusion/physical-expr/src/window/mod.rs
+++ b/datafusion/physical-expr/src/window/mod.rs
@@ -22,7 +22,6 @@ pub(crate) mod cume_dist;
pub(crate) mod lead_lag;
pub(crate) mod nth_value;
pub(crate) mod ntile;
-pub(crate) mod rank;
mod sliding_aggregate;
mod window_expr;
diff --git a/datafusion/physical-expr/src/window/rank.rs
b/datafusion/physical-expr/src/window/rank.rs
deleted file mode 100644
index fa3d4e487f..0000000000
--- a/datafusion/physical-expr/src/window/rank.rs
+++ /dev/null
@@ -1,313 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Defines physical expression for `rank`, `dense_rank`, and `percent_rank`
that can evaluated
-//! at runtime during query execution
-
-use crate::expressions::Column;
-use crate::window::window_expr::RankState;
-use crate::window::BuiltInWindowFunctionExpr;
-use crate::{PhysicalExpr, PhysicalSortExpr};
-
-use arrow::array::ArrayRef;
-use arrow::array::{Float64Array, UInt64Array};
-use arrow::datatypes::{DataType, Field};
-use arrow_schema::{SchemaRef, SortOptions};
-use datafusion_common::utils::get_row_at_idx;
-use datafusion_common::{exec_err, Result, ScalarValue};
-use datafusion_expr::PartitionEvaluator;
-
-use std::any::Any;
-use std::iter;
-use std::ops::Range;
-use std::sync::Arc;
-
-/// Rank calculates the rank in the window function with order by
-#[derive(Debug)]
-pub struct Rank {
- name: String,
- rank_type: RankType,
- /// Output data type
- data_type: DataType,
-}
-
-impl Rank {
- /// Get rank_type of the rank in window function with order by
- pub fn get_type(&self) -> RankType {
- self.rank_type
- }
-}
-
-#[derive(Debug, Copy, Clone)]
-pub enum RankType {
- Basic,
- Dense,
- Percent,
-}
-
-/// Create a rank window function
-pub fn rank(name: String, data_type: &DataType) -> Rank {
- Rank {
- name,
- rank_type: RankType::Basic,
- data_type: data_type.clone(),
- }
-}
-
-/// Create a dense rank window function
-pub fn dense_rank(name: String, data_type: &DataType) -> Rank {
- Rank {
- name,
- rank_type: RankType::Dense,
- data_type: data_type.clone(),
- }
-}
-
-/// Create a percent rank window function
-pub fn percent_rank(name: String, data_type: &DataType) -> Rank {
- Rank {
- name,
- rank_type: RankType::Percent,
- data_type: data_type.clone(),
- }
-}
-
-impl BuiltInWindowFunctionExpr for Rank {
- /// Return a reference to Any that can be used for downcasting
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn field(&self) -> Result<Field> {
- let nullable = false;
- Ok(Field::new(self.name(), self.data_type.clone(), nullable))
- }
-
- fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![]
- }
-
- fn name(&self) -> &str {
- &self.name
- }
-
- fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
- Ok(Box::new(RankEvaluator {
- state: RankState::default(),
- rank_type: self.rank_type,
- }))
- }
-
- fn get_result_ordering(&self, schema: &SchemaRef) ->
Option<PhysicalSortExpr> {
- // The built-in RANK window function (in all modes) introduces a new
ordering:
- schema.column_with_name(self.name()).map(|(idx, field)| {
- let expr = Arc::new(Column::new(field.name(), idx));
- let options = SortOptions {
- descending: false,
- nulls_first: false,
- }; // ASC, NULLS LAST
- PhysicalSortExpr { expr, options }
- })
- }
-}
-
-#[derive(Debug)]
-pub(crate) struct RankEvaluator {
- state: RankState,
- rank_type: RankType,
-}
-
-impl PartitionEvaluator for RankEvaluator {
- fn is_causal(&self) -> bool {
- matches!(self.rank_type, RankType::Basic | RankType::Dense)
- }
-
- /// Evaluates the window function inside the given range.
- fn evaluate(
- &mut self,
- values: &[ArrayRef],
- range: &Range<usize>,
- ) -> Result<ScalarValue> {
- let row_idx = range.start;
- // There is no argument, values are order by column values (where rank
is calculated)
- let range_columns = values;
- let last_rank_data = get_row_at_idx(range_columns, row_idx)?;
- let new_rank_encountered =
- if let Some(state_last_rank_data) = &self.state.last_rank_data {
- // if rank data changes, new rank is encountered
- state_last_rank_data != &last_rank_data
- } else {
- // First rank seen
- true
- };
- if new_rank_encountered {
- self.state.last_rank_data = Some(last_rank_data);
- self.state.last_rank_boundary += self.state.current_group_count;
- self.state.current_group_count = 1;
- self.state.n_rank += 1;
- } else {
- // data is still in the same rank
- self.state.current_group_count += 1;
- }
- match self.rank_type {
- RankType::Basic => Ok(ScalarValue::UInt64(Some(
- self.state.last_rank_boundary as u64 + 1,
- ))),
- RankType::Dense => Ok(ScalarValue::UInt64(Some(self.state.n_rank
as u64))),
- RankType::Percent => {
- exec_err!("Can not execute PERCENT_RANK in a streaming
fashion")
- }
- }
- }
-
- fn evaluate_all_with_rank(
- &self,
- num_rows: usize,
- ranks_in_partition: &[Range<usize>],
- ) -> Result<ArrayRef> {
- // see https://www.postgresql.org/docs/current/functions-window.html
- let result: ArrayRef = match self.rank_type {
- RankType::Dense => Arc::new(UInt64Array::from_iter_values(
- ranks_in_partition
- .iter()
- .zip(1u64..)
- .flat_map(|(range, rank)| {
- let len = range.end - range.start;
- iter::repeat(rank).take(len)
- }),
- )),
- RankType::Percent => {
- // Returns the relative rank of the current row, that is (rank
- 1) / (total partition rows - 1). The value thus ranges from 0 to 1 inclusive.
- let denominator = num_rows as f64;
- Arc::new(Float64Array::from_iter_values(
- ranks_in_partition
- .iter()
- .scan(0_u64, |acc, range| {
- let len = range.end - range.start;
- let value = (*acc as f64) / (denominator -
1.0).max(1.0);
- let result = iter::repeat(value).take(len);
- *acc += len as u64;
- Some(result)
- })
- .flatten(),
- ))
- }
- RankType::Basic => Arc::new(UInt64Array::from_iter_values(
- ranks_in_partition
- .iter()
- .scan(1_u64, |acc, range| {
- let len = range.end - range.start;
- let result = iter::repeat(*acc).take(len);
- *acc += len as u64;
- Some(result)
- })
- .flatten(),
- )),
- };
- Ok(result)
- }
-
- fn supports_bounded_execution(&self) -> bool {
- matches!(self.rank_type, RankType::Basic | RankType::Dense)
- }
-
- fn include_rank(&self) -> bool {
- true
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use datafusion_common::cast::{as_float64_array, as_uint64_array};
-
- fn test_with_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
- test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected)
- }
-
- #[allow(clippy::single_range_in_vec_init)]
- fn test_without_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
- test_i32_result(expr, vec![0..8], expected)
- }
-
- fn test_f64_result(
- expr: &Rank,
- num_rows: usize,
- ranks: Vec<Range<usize>>,
- expected: Vec<f64>,
- ) -> Result<()> {
- let result = expr
- .create_evaluator()?
- .evaluate_all_with_rank(num_rows, &ranks)?;
- let result = as_float64_array(&result)?;
- let result = result.values();
- assert_eq!(expected, *result);
- Ok(())
- }
-
- fn test_i32_result(
- expr: &Rank,
- ranks: Vec<Range<usize>>,
- expected: Vec<u64>,
- ) -> Result<()> {
- let result = expr.create_evaluator()?.evaluate_all_with_rank(8,
&ranks)?;
- let result = as_uint64_array(&result)?;
- let result = result.values();
- assert_eq!(expected, *result);
- Ok(())
- }
-
- #[test]
- fn test_dense_rank() -> Result<()> {
- let r = dense_rank("arr".into(), &DataType::UInt64);
- test_without_rank(&r, vec![1; 8])?;
- test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?;
- Ok(())
- }
-
- #[test]
- fn test_rank() -> Result<()> {
- let r = rank("arr".into(), &DataType::UInt64);
- test_without_rank(&r, vec![1; 8])?;
- test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?;
- Ok(())
- }
-
- #[test]
- #[allow(clippy::single_range_in_vec_init)]
- fn test_percent_rank() -> Result<()> {
- let r = percent_rank("arr".into(), &DataType::Float64);
-
- // empty case
- let expected = vec![0.0; 0];
- test_f64_result(&r, 0, vec![0..0; 0], expected)?;
-
- // singleton case
- let expected = vec![0.0];
- test_f64_result(&r, 1, vec![0..1], expected)?;
-
- // uniform case
- let expected = vec![0.0; 7];
- test_f64_result(&r, 7, vec![0..7], expected)?;
-
- // non-trivial case
- let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5];
- test_f64_result(&r, 7, vec![0..3, 3..7], expected)?;
-
- Ok(())
- }
-}
diff --git a/datafusion/physical-expr/src/window/window_expr.rs
b/datafusion/physical-expr/src/window/window_expr.rs
index 8f6f78df8c..46c46fab68 100644
--- a/datafusion/physical-expr/src/window/window_expr.rs
+++ b/datafusion/physical-expr/src/window/window_expr.rs
@@ -530,19 +530,6 @@ pub enum WindowFn {
Aggregate(Box<dyn Accumulator>),
}
-/// State for the RANK(percent_rank, rank, dense_rank) built-in window
function.
-#[derive(Debug, Clone, Default)]
-pub struct RankState {
- /// The last values for rank as these values change, we increase n_rank
- pub last_rank_data: Option<Vec<ScalarValue>>,
- /// The index where last_rank_boundary is started
- pub last_rank_boundary: usize,
- /// Keep the number of entries in current rank
- pub current_group_count: usize,
- /// Rank number kept from the start
- pub n_rank: usize,
-}
-
/// Tag to differentiate special use cases of the NTH_VALUE built-in window
function.
#[derive(Debug, Copy, Clone)]
pub enum NthValueKind {
diff --git a/datafusion/physical-plan/src/windows/mod.rs
b/datafusion/physical-plan/src/windows/mod.rs
index ff5085a6d9..6f7d95bf95 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -21,10 +21,7 @@ use std::borrow::Borrow;
use std::sync::Arc;
use crate::{
- expressions::{
- cume_dist, dense_rank, lag, lead, percent_rank, rank, Literal,
NthValue, Ntile,
- PhysicalSortExpr,
- },
+ expressions::{cume_dist, lag, lead, Literal, NthValue, Ntile,
PhysicalSortExpr},
ExecutionPlan, ExecutionPlanProperties, InputOrderMode, PhysicalExpr,
};
@@ -231,9 +228,6 @@ fn create_built_in_window_expr(
let out_data_type: &DataType =
input_schema.field_with_name(&name)?.data_type();
Ok(match fun {
- BuiltInWindowFunction::Rank => Arc::new(rank(name, out_data_type)),
- BuiltInWindowFunction::DenseRank => Arc::new(dense_rank(name,
out_data_type)),
- BuiltInWindowFunction::PercentRank => Arc::new(percent_rank(name,
out_data_type)),
BuiltInWindowFunction::CumeDist => Arc::new(cume_dist(name,
out_data_type)),
BuiltInWindowFunction::Ntile => {
let n = get_scalar_value_from_args(args, 0)?.ok_or_else(|| {
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index e36c91e7d0..803cb49919 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -508,9 +508,9 @@ message ScalarUDFExprNode {
enum BuiltInWindowFunction {
UNSPECIFIED = 0; //
https://protobuf.dev/programming-guides/dos-donts/#unspecified-enum
// ROW_NUMBER = 0;
- RANK = 1;
- DENSE_RANK = 2;
- PERCENT_RANK = 3;
+ // RANK = 1;
+ // DENSE_RANK = 2;
+ // PERCENT_RANK = 3;
CUME_DIST = 4;
NTILE = 5;
LAG = 6;
@@ -739,7 +739,7 @@ message FileSinkConfig {
datafusion_common.Schema output_schema = 4;
repeated PartitionColumn table_partition_cols = 5;
bool keep_partition_by_columns = 9;
- InsertOp insert_op = 10;
+ InsertOp insert_op = 10;
}
enum InsertOp {
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 004798b3ba..c7d4c4561a 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -1662,9 +1662,6 @@ impl serde::Serialize for BuiltInWindowFunction {
{
let variant = match self {
Self::Unspecified => "UNSPECIFIED",
- Self::Rank => "RANK",
- Self::DenseRank => "DENSE_RANK",
- Self::PercentRank => "PERCENT_RANK",
Self::CumeDist => "CUME_DIST",
Self::Ntile => "NTILE",
Self::Lag => "LAG",
@@ -1684,9 +1681,6 @@ impl<'de> serde::Deserialize<'de> for
BuiltInWindowFunction {
{
const FIELDS: &[&str] = &[
"UNSPECIFIED",
- "RANK",
- "DENSE_RANK",
- "PERCENT_RANK",
"CUME_DIST",
"NTILE",
"LAG",
@@ -1735,9 +1729,6 @@ impl<'de> serde::Deserialize<'de> for
BuiltInWindowFunction {
{
match value {
"UNSPECIFIED" => Ok(BuiltInWindowFunction::Unspecified),
- "RANK" => Ok(BuiltInWindowFunction::Rank),
- "DENSE_RANK" => Ok(BuiltInWindowFunction::DenseRank),
- "PERCENT_RANK" => Ok(BuiltInWindowFunction::PercentRank),
"CUME_DIST" => Ok(BuiltInWindowFunction::CumeDist),
"NTILE" => Ok(BuiltInWindowFunction::Ntile),
"LAG" => Ok(BuiltInWindowFunction::Lag),
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 436347330d..8cba6f84f7 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1819,9 +1819,9 @@ pub enum BuiltInWindowFunction {
/// <https://protobuf.dev/programming-guides/dos-donts/#unspecified-enum>
Unspecified = 0,
/// ROW_NUMBER = 0;
- Rank = 1,
- DenseRank = 2,
- PercentRank = 3,
+ /// RANK = 1;
+ /// DENSE_RANK = 2;
+ /// PERCENT_RANK = 3;
CumeDist = 4,
Ntile = 5,
Lag = 6,
@@ -1838,9 +1838,6 @@ impl BuiltInWindowFunction {
pub fn as_str_name(&self) -> &'static str {
match self {
Self::Unspecified => "UNSPECIFIED",
- Self::Rank => "RANK",
- Self::DenseRank => "DENSE_RANK",
- Self::PercentRank => "PERCENT_RANK",
Self::CumeDist => "CUME_DIST",
Self::Ntile => "NTILE",
Self::Lag => "LAG",
@@ -1854,9 +1851,6 @@ impl BuiltInWindowFunction {
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"UNSPECIFIED" => Some(Self::Unspecified),
- "RANK" => Some(Self::Rank),
- "DENSE_RANK" => Some(Self::DenseRank),
- "PERCENT_RANK" => Some(Self::PercentRank),
"CUME_DIST" => Some(Self::CumeDist),
"NTILE" => Some(Self::Ntile),
"LAG" => Some(Self::Lag),
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs
b/datafusion/proto/src/logical_plan/from_proto.rs
index 893255ccc8..32e1b68203 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -142,9 +142,6 @@ impl From<protobuf::BuiltInWindowFunction> for
BuiltInWindowFunction {
fn from(built_in_function: protobuf::BuiltInWindowFunction) -> Self {
match built_in_function {
protobuf::BuiltInWindowFunction::Unspecified => todo!(),
- protobuf::BuiltInWindowFunction::Rank => Self::Rank,
- protobuf::BuiltInWindowFunction::PercentRank => Self::PercentRank,
- protobuf::BuiltInWindowFunction::DenseRank => Self::DenseRank,
protobuf::BuiltInWindowFunction::Lag => Self::Lag,
protobuf::BuiltInWindowFunction::Lead => Self::Lead,
protobuf::BuiltInWindowFunction::FirstValue => Self::FirstValue,
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs
b/datafusion/proto/src/logical_plan/to_proto.rs
index 63d1a007c1..07823b422f 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -119,11 +119,8 @@ impl From<&BuiltInWindowFunction> for
protobuf::BuiltInWindowFunction {
BuiltInWindowFunction::NthValue => Self::NthValue,
BuiltInWindowFunction::Ntile => Self::Ntile,
BuiltInWindowFunction::CumeDist => Self::CumeDist,
- BuiltInWindowFunction::PercentRank => Self::PercentRank,
- BuiltInWindowFunction::Rank => Self::Rank,
BuiltInWindowFunction::Lag => Self::Lag,
BuiltInWindowFunction::Lead => Self::Lead,
- BuiltInWindowFunction::DenseRank => Self::DenseRank,
}
}
}
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index 6f6065a1c2..85d4fe8a16 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -24,8 +24,8 @@ use datafusion::physical_expr::window::{NthValueKind,
SlidingAggregateWindowExpr
use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr};
use datafusion::physical_plan::expressions::{
BinaryExpr, CaseExpr, CastExpr, Column, CumeDist, InListExpr,
IsNotNullExpr,
- IsNullExpr, Literal, NegativeExpr, NotExpr, NthValue, Ntile, Rank,
RankType,
- TryCastExpr, WindowShift,
+ IsNullExpr, Literal, NegativeExpr, NotExpr, NthValue, Ntile, TryCastExpr,
+ WindowShift,
};
use datafusion::physical_plan::udaf::AggregateFunctionExpr;
use datafusion::physical_plan::windows::{BuiltInWindowExpr,
PlainAggregateWindowExpr};
@@ -109,14 +109,7 @@ pub fn serialize_physical_window_expr(
let expr = built_in_window_expr.get_built_in_func_expr();
let built_in_fn_expr = expr.as_any();
- let builtin_fn = if let Some(rank_expr) =
built_in_fn_expr.downcast_ref::<Rank>()
- {
- match rank_expr.get_type() {
- RankType::Basic => protobuf::BuiltInWindowFunction::Rank,
- RankType::Dense => protobuf::BuiltInWindowFunction::DenseRank,
- RankType::Percent =>
protobuf::BuiltInWindowFunction::PercentRank,
- }
- } else if built_in_fn_expr.downcast_ref::<CumeDist>().is_some() {
+ let builtin_fn = if
built_in_fn_expr.downcast_ref::<CumeDist>().is_some() {
protobuf::BuiltInWindowFunction::CumeDist
} else if let Some(ntile_expr) =
built_in_fn_expr.downcast_ref::<Ntile>() {
args.insert(
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 46b1af3d32..8c9b368598 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -47,6 +47,9 @@ use datafusion::functions_aggregate::expr_fn::{
};
use datafusion::functions_aggregate::min_max::max_udaf;
use datafusion::functions_nested::map::map;
+use datafusion::functions_window::dense_rank::dense_rank;
+use datafusion::functions_window::percent_rank::percent_rank;
+use datafusion::functions_window::rank::{rank, rank_udwf};
use datafusion::functions_window::row_number::row_number;
use datafusion::prelude::*;
use datafusion::test_util::{TestTableFactory, TestTableProvider};
@@ -938,6 +941,9 @@ async fn roundtrip_expr_api() -> Result<()> {
vec![lit(10), lit(20), lit(30)],
),
row_number(),
+ rank(),
+ dense_rank(),
+ percent_rank(),
nth_value(col("b"), 1, vec![]),
nth_value(
col("b"),
@@ -2305,9 +2311,7 @@ fn roundtrip_window() {
// 1. without window_frame
let test_expr1 = Expr::WindowFunction(expr::WindowFunction::new(
- WindowFunctionDefinition::BuiltInWindowFunction(
- datafusion_expr::BuiltInWindowFunction::Rank,
- ),
+ WindowFunctionDefinition::WindowUDF(rank_udwf()),
vec![],
))
.partition_by(vec![col("col1")])
@@ -2318,9 +2322,7 @@ fn roundtrip_window() {
// 2. with default window_frame
let test_expr2 = Expr::WindowFunction(expr::WindowFunction::new(
- WindowFunctionDefinition::BuiltInWindowFunction(
- datafusion_expr::BuiltInWindowFunction::Rank,
- ),
+ WindowFunctionDefinition::WindowUDF(rank_udwf()),
vec![],
))
.partition_by(vec![col("col1")])
@@ -2337,9 +2339,7 @@ fn roundtrip_window() {
);
let test_expr3 = Expr::WindowFunction(expr::WindowFunction::new(
- WindowFunctionDefinition::BuiltInWindowFunction(
- datafusion_expr::BuiltInWindowFunction::Rank,
- ),
+ WindowFunctionDefinition::WindowUDF(rank_udwf()),
vec![],
))
.partition_by(vec![col("col1")])
diff --git a/datafusion/sql/tests/common/mod.rs
b/datafusion/sql/tests/common/mod.rs
index fe0e5f7283..47caeec78d 100644
--- a/datafusion/sql/tests/common/mod.rs
+++ b/datafusion/sql/tests/common/mod.rs
@@ -54,6 +54,7 @@ pub(crate) struct MockSessionState {
scalar_functions: HashMap<String, Arc<ScalarUDF>>,
aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
expr_planners: Vec<Arc<dyn ExprPlanner>>,
+ window_functions: HashMap<String, Arc<WindowUDF>>,
pub config_options: ConfigOptions,
}
@@ -80,6 +81,12 @@ impl MockSessionState {
);
self
}
+
+ pub fn with_window_function(mut self, window_function: Arc<WindowUDF>) ->
Self {
+ self.window_functions
+ .insert(window_function.name().to_string(), window_function);
+ self
+ }
}
pub(crate) struct MockContextProvider {
@@ -217,8 +224,8 @@ impl ContextProvider for MockContextProvider {
unimplemented!()
}
- fn get_window_meta(&self, _name: &str) -> Option<Arc<WindowUDF>> {
- None
+ fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>> {
+ self.state.window_functions.get(name).cloned()
}
fn options(&self) -> &ConfigOptions {
diff --git a/datafusion/sql/tests/sql_integration.rs
b/datafusion/sql/tests/sql_integration.rs
index 44b591fede..19f3d31321 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -48,6 +48,7 @@ use datafusion_functions_aggregate::{
min_max::min_udaf,
};
use datafusion_functions_aggregate::{average::avg_udaf,
grouping::grouping_udaf};
+use datafusion_functions_window::rank::rank_udwf;
use rstest::rstest;
use sqlparser::dialect::{Dialect, GenericDialect, HiveDialect, MySqlDialect};
@@ -2633,6 +2634,7 @@ fn logical_plan_with_dialect_and_options(
.with_aggregate_function(min_udaf())
.with_aggregate_function(max_udaf())
.with_aggregate_function(grouping_udaf())
+ .with_window_function(rank_udwf())
.with_expr_planner(Arc::new(CoreFunctionPlanner::default()));
let context = MockContextProvider { state };
@@ -3059,8 +3061,8 @@ fn rank_partition_grouping() {
from
person
group by rollup(state, last_name)";
- let expected = "Projection: sum(person.age) AS total_sum, person.state,
person.last_name, grouping(person.state) + grouping(person.last_name) AS x,
RANK() PARTITION BY [grouping(person.state) + grouping(person.last_name), CASE
WHEN grouping(person.last_name) = Int64(0) THEN person.state END] ORDER BY
[sum(person.age) DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW AS the_rank\
- \n WindowAggr: windowExpr=[[RANK() PARTITION BY
[grouping(person.state) + grouping(person.last_name), CASE WHEN
grouping(person.last_name) = Int64(0) THEN person.state END] ORDER BY
[sum(person.age) DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]]\
+ let expected = "Projection: sum(person.age) AS total_sum, person.state,
person.last_name, grouping(person.state) + grouping(person.last_name) AS x,
rank() PARTITION BY [grouping(person.state) + grouping(person.last_name), CASE
WHEN grouping(person.last_name) = Int64(0) THEN person.state END] ORDER BY
[sum(person.age) DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW AS the_rank\
+ \n WindowAggr: windowExpr=[[rank() PARTITION BY
[grouping(person.state) + grouping(person.last_name), CASE WHEN
grouping(person.last_name) = Int64(0) THEN person.state END] ORDER BY
[sum(person.age) DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]]\
\n Aggregate: groupBy=[[ROLLUP (person.state, person.last_name)]],
aggr=[[sum(person.age), grouping(person.state), grouping(person.last_name)]]\
\n TableScan: person";
quick_test(sql, expected);
diff --git a/datafusion/sqllogictest/test_files/subquery_sort.slt
b/datafusion/sqllogictest/test_files/subquery_sort.slt
index 17affbc0ac..e4360a9269 100644
--- a/datafusion/sqllogictest/test_files/subquery_sort.slt
+++ b/datafusion/sqllogictest/test_files/subquery_sort.slt
@@ -93,14 +93,14 @@ logical_plan
02)--Sort: t2.c1 ASC NULLS LAST, t2.c3 ASC NULLS LAST, t2.c9 ASC NULLS LAST
03)----SubqueryAlias: t2
04)------Sort: sink_table.c1 ASC NULLS LAST, sink_table.c3 ASC NULLS LAST,
fetch=2
-05)--------Projection: sink_table.c1, RANK() ORDER BY [sink_table.c1 DESC
NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS r,
sink_table.c3, sink_table.c9
-06)----------WindowAggr: windowExpr=[[RANK() ORDER BY [sink_table.c1 DESC
NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+05)--------Projection: sink_table.c1, rank() ORDER BY [sink_table.c1 DESC
NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS r,
sink_table.c3, sink_table.c9
+06)----------WindowAggr: windowExpr=[[rank() ORDER BY [sink_table.c1 DESC
NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
07)------------TableScan: sink_table projection=[c1, c3, c9]
physical_plan
01)ProjectionExec: expr=[c1@0 as c1, r@1 as r]
02)--SortExec: TopK(fetch=2), expr=[c1@0 ASC NULLS LAST,c3@2 ASC NULLS
LAST,c9@3 ASC NULLS LAST], preserve_partitioning=[false]
-03)----ProjectionExec: expr=[c1@0 as c1, RANK() ORDER BY [sink_table.c1 DESC
NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as r, c3@1 as
c3, c9@2 as c9]
-04)------BoundedWindowAggExec: wdw=[RANK() ORDER BY [sink_table.c1 DESC NULLS
FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name:
"RANK() ORDER BY [sink_table.c1 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Utf8(NULL)), end_bound: CurrentRow, is_causal: false }],
mode=[Sorted]
+03)----ProjectionExec: expr=[c1@0 as c1, rank() ORDER BY [sink_table.c1 DESC
NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as r, c3@1 as
c3, c9@2 as c9]
+04)------BoundedWindowAggExec: wdw=[rank() ORDER BY [sink_table.c1 DESC NULLS
FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name:
"rank() ORDER BY [sink_table.c1 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Utf8(NULL)), end_bound: CurrentRow, is_causal: false }],
mode=[Sorted]
05)--------SortExec: expr=[c1@0 DESC], preserve_partitioning=[false]
06)----------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1,
c3, c9], has_header=true
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index cb6c6a5ace..40309a1f2d 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -2636,14 +2636,14 @@ EXPLAIN SELECT
----
logical_plan
01)Sort: annotated_data_finite.ts DESC NULLS FIRST, fetch=5
-02)--Projection: annotated_data_finite.ts,
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv1,
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv2,
last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lv1,
last_value(annot [...]
-03)----WindowAggr: windowExpr=[[first_value(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING, first_value(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1
FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER [...]
+02)--Projection: annotated_data_finite.ts,
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv1,
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv2,
last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lv1,
last_value(annot [...]
+03)----WindowAggr: windowExpr=[[first_value(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING, first_value(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1
FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER [...]
04)------WindowAggr: windowExpr=[[first_value(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING, first_value(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1
FOLLOWING, last_value(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING, last_value(annotated_data_finite.inc_col [...]
05)--------TableScan: annotated_data_finite projection=[ts, inc_col]
physical_plan
01)SortExec: TopK(fetch=5), expr=[ts@0 DESC], preserve_partitioning=[false]
-02)--ProjectionExec: expr=[ts@0 as ts,
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1,
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2,
last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1,
last_value( [...]
-03)----BoundedWindowAggExec: wdw=[first_value(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int32(10)), end_bo [...]
+02)--ProjectionExec: expr=[ts@0 as ts,
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1,
first_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2,
last_value(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1,
last_value( [...]
+03)----BoundedWindowAggExec: wdw=[first_value(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int32(10)), end_bo [...]
04)------BoundedWindowAggExec: wdw=[first_value(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "first_value(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int32(1)), e [...]
05)--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts,
inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
diff --git a/datafusion/substrait/tests/cases/serialize.rs
b/datafusion/substrait/tests/cases/serialize.rs
index da0898d222..72d685817d 100644
--- a/datafusion/substrait/tests/cases/serialize.rs
+++ b/datafusion/substrait/tests/cases/serialize.rs
@@ -117,8 +117,8 @@ mod tests {
let datafusion_plan = df.into_optimized_plan()?;
assert_eq!(
format!("{}", datafusion_plan),
- "Projection: data.b, RANK() PARTITION BY [data.a] ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, data.c\
- \n WindowAggr: windowExpr=[[RANK() PARTITION BY [data.a] ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
+ "Projection: data.b, rank() PARTITION BY [data.a] ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, data.c\
+ \n WindowAggr: windowExpr=[[rank() PARTITION BY [data.a] ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
\n TableScan: data projection=[a, b, c]",
);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]