This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 1eb5d55 add cume_dist implementation (#1076)
1eb5d55 is described below
commit 1eb5d554b4a8436cb206870caa320956d7c7cfde
Author: Jiayu Liu <[email protected]>
AuthorDate: Fri Oct 8 23:06:06 2021 +0800
add cume_dist implementation (#1076)
---
.../expressions/{rank.rs => cume_dist.rs} | 126 ++++++++++-----------
datafusion/src/physical_plan/expressions/mod.rs | 2 +
datafusion/src/physical_plan/expressions/rank.rs | 3 +-
datafusion/src/physical_plan/windows/mod.rs | 4 +-
.../simple_window_ranked_built_in_functions.sql | 1 +
5 files changed, 65 insertions(+), 71 deletions(-)
diff --git a/datafusion/src/physical_plan/expressions/rank.rs
b/datafusion/src/physical_plan/expressions/cume_dist.rs
similarity index 53%
copy from datafusion/src/physical_plan/expressions/rank.rs
copy to datafusion/src/physical_plan/expressions/cume_dist.rs
index b88dec3..7b0a45a 100644
--- a/datafusion/src/physical_plan/expressions/rank.rs
+++ b/datafusion/src/physical_plan/expressions/cume_dist.rs
@@ -15,13 +15,14 @@
// specific language governing permissions and limitations
// under the License.
-//! Defines physical expressions that can evaluated at runtime during query
execution
+//! Defines physical expression for `cume_dist` that can evaluated
+//! at runtime during query execution
use crate::error::Result;
use crate::physical_plan::window_functions::PartitionEvaluator;
use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr,
PhysicalExpr};
use arrow::array::ArrayRef;
-use arrow::array::UInt64Array;
+use arrow::array::Float64Array;
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
use std::any::Any;
@@ -29,24 +30,18 @@ use std::iter;
use std::ops::Range;
use std::sync::Arc;
-/// Rank calculates the rank in the window function with order by
+/// CumeDist calculates the cume_dist in the window function with order by
#[derive(Debug)]
-pub struct Rank {
+pub struct CumeDist {
name: String,
- dense: bool,
}
-/// Create a rank window function
-pub fn rank(name: String) -> Rank {
- Rank { name, dense: false }
+/// Create a cume_dist window function
+pub fn cume_dist(name: String) -> CumeDist {
+ CumeDist { name }
}
-/// Create a dense rank window function
-pub fn dense_rank(name: String) -> Rank {
- Rank { name, dense: true }
-}
-
-impl BuiltInWindowFunctionExpr for Rank {
+impl BuiltInWindowFunctionExpr for CumeDist {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
@@ -54,7 +49,7 @@ impl BuiltInWindowFunctionExpr for Rank {
fn field(&self) -> Result<Field> {
let nullable = false;
- let data_type = DataType::UInt64;
+ let data_type = DataType::Float64;
Ok(Field::new(self.name(), data_type, nullable))
}
@@ -70,48 +65,41 @@ impl BuiltInWindowFunctionExpr for Rank {
&self,
_batch: &RecordBatch,
) -> Result<Box<dyn PartitionEvaluator>> {
- Ok(Box::new(RankEvaluator { dense: self.dense }))
+ Ok(Box::new(CumeDistEvaluator {}))
}
}
-pub(crate) struct RankEvaluator {
- dense: bool,
-}
+pub(crate) struct CumeDistEvaluator;
-impl PartitionEvaluator for RankEvaluator {
+impl PartitionEvaluator for CumeDistEvaluator {
fn include_rank(&self) -> bool {
true
}
fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef>
{
- unreachable!("rank evaluation must be called with
evaluate_partition_with_rank")
+ unreachable!(
+ "cume_dist evaluation must be called with
evaluate_partition_with_rank"
+ )
}
fn evaluate_partition_with_rank(
&self,
- _partition: Range<usize>,
+ partition: Range<usize>,
ranks_in_partition: &[Range<usize>],
) -> Result<ArrayRef> {
- let result = if self.dense {
-
UInt64Array::from_iter_values(ranks_in_partition.iter().zip(1u64..).flat_map(
- |(range, rank)| {
+ let scaler = (partition.end - partition.start) as f64;
+ let result = Float64Array::from_iter_values(
+ ranks_in_partition
+ .iter()
+ .scan(0_u64, |acc, range| {
let len = range.end - range.start;
- iter::repeat(rank).take(len)
- },
- ))
- } else {
- UInt64Array::from_iter_values(
- ranks_in_partition
- .iter()
- .scan(1_u64, |acc, range| {
- let len = range.end - range.start;
- let result = iter::repeat(*acc).take(len);
- *acc += len as u64;
- Some(result)
- })
- .flatten(),
- )
- };
+ *acc += len as u64;
+ let value: f64 = (*acc as f64) / scaler;
+ let result = iter::repeat(value).take(len);
+ Some(result)
+ })
+ .flatten(),
+ );
Ok(Arc::new(result))
}
}
@@ -121,24 +109,12 @@ mod tests {
use super::*;
use arrow::{array::*, datatypes::*};
- fn test_with_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
- test_i32_result(
- expr,
- vec![-2, -2, 1, 3, 3, 3, 7, 8],
- vec![0..2, 2..3, 3..6, 6..7, 7..8],
- expected,
- )
- }
-
- fn test_without_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
- test_i32_result(expr, vec![-2, -2, 1, 3, 3, 3, 7, 8], vec![0..8],
expected)
- }
-
fn test_i32_result(
- expr: &Rank,
+ expr: &CumeDist,
data: Vec<i32>,
+ partition: Range<usize>,
ranks: Vec<Range<usize>>,
- expected: Vec<u64>,
+ expected: Vec<f64>,
) -> Result<()> {
let arr: ArrayRef = Arc::new(Int32Array::from(data));
let values = vec![arr];
@@ -146,27 +122,39 @@ mod tests {
let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
let result = expr
.create_evaluator(&batch)?
- .evaluate_with_rank(vec![0..8], ranks)?;
+ .evaluate_with_rank(vec![partition], ranks)?;
assert_eq!(1, result.len());
- let result = result[0].as_any().downcast_ref::<UInt64Array>().unwrap();
+ let result =
result[0].as_any().downcast_ref::<Float64Array>().unwrap();
let result = result.values();
assert_eq!(expected, result);
Ok(())
}
#[test]
- fn test_dense_rank() -> Result<()> {
- let r = dense_rank("arr".into());
- test_without_rank(&r, vec![1; 8])?;
- test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?;
- Ok(())
- }
+ fn test_cume_dist() -> Result<()> {
+ let r = cume_dist("arr".into());
+
+ let expected = vec![0.0; 0];
+ test_i32_result(&r, vec![], 0..0, vec![], expected)?;
+
+ let expected = vec![1.0; 1];
+ test_i32_result(&r, vec![20; 1], 0..1, vec![0..1], expected)?;
+
+ let expected = vec![1.0; 2];
+ test_i32_result(&r, vec![20; 2], 0..2, vec![0..2], expected)?;
+
+ let expected = vec![0.5, 0.5, 1.0, 1.0];
+ test_i32_result(&r, vec![1, 1, 2, 2], 0..4, vec![0..2, 2..4],
expected)?;
+
+ let expected = vec![0.25, 0.5, 0.75, 1.0];
+ test_i32_result(
+ &r,
+ vec![1, 2, 4, 5],
+ 0..4,
+ vec![0..1, 1..2, 2..3, 3..4],
+ expected,
+ )?;
- #[test]
- fn test_rank() -> Result<()> {
- let r = rank("arr".into());
- test_without_rank(&r, vec![1; 8])?;
- test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?;
Ok(())
}
}
diff --git a/datafusion/src/physical_plan/expressions/mod.rs
b/datafusion/src/physical_plan/expressions/mod.rs
index 5a5a118..9c05112 100644
--- a/datafusion/src/physical_plan/expressions/mod.rs
+++ b/datafusion/src/physical_plan/expressions/mod.rs
@@ -33,6 +33,7 @@ mod cast;
mod coercion;
mod column;
mod count;
+mod cume_dist;
mod in_list;
mod is_not_null;
mod is_null;
@@ -62,6 +63,7 @@ pub use cast::{
};
pub use column::{col, Column};
pub use count::Count;
+pub use cume_dist::cume_dist;
pub use in_list::{in_list, InListExpr};
pub use is_not_null::{is_not_null, IsNotNullExpr};
pub use is_null::{is_null, IsNullExpr};
diff --git a/datafusion/src/physical_plan/expressions/rank.rs
b/datafusion/src/physical_plan/expressions/rank.rs
index b88dec3..b82cfa4 100644
--- a/datafusion/src/physical_plan/expressions/rank.rs
+++ b/datafusion/src/physical_plan/expressions/rank.rs
@@ -15,7 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-//! Defines physical expressions that can evaluated at runtime during query
execution
+//! Defines physical expression for `rank` and `dense_rank` that can evaluated
+//! at runtime during query execution
use crate::error::Result;
use crate::physical_plan::window_functions::PartitionEvaluator;
diff --git a/datafusion/src/physical_plan/windows/mod.rs
b/datafusion/src/physical_plan/windows/mod.rs
index 7bd8312..f34649f 100644
--- a/datafusion/src/physical_plan/windows/mod.rs
+++ b/datafusion/src/physical_plan/windows/mod.rs
@@ -22,7 +22,8 @@ use crate::logical_plan::window_frames::WindowFrame;
use crate::physical_plan::{
aggregates,
expressions::{
- dense_rank, lag, lead, rank, Literal, NthValue, PhysicalSortExpr,
RowNumber,
+ cume_dist, dense_rank, lag, lead, rank, Literal, NthValue,
PhysicalSortExpr,
+ RowNumber,
},
type_coercion::coerce,
window_functions::{
@@ -95,6 +96,7 @@ fn create_built_in_window_expr(
BuiltInWindowFunction::RowNumber => Arc::new(RowNumber::new(name)),
BuiltInWindowFunction::Rank => Arc::new(rank(name)),
BuiltInWindowFunction::DenseRank => Arc::new(dense_rank(name)),
+ BuiltInWindowFunction::CumeDist => Arc::new(cume_dist(name)),
BuiltInWindowFunction::Lag => {
let coerced_args = coerce(args, input_schema,
&signature_for_built_in(fun))?;
let arg = coerced_args[0].clone();
diff --git a/integration-tests/sqls/simple_window_ranked_built_in_functions.sql
b/integration-tests/sqls/simple_window_ranked_built_in_functions.sql
index 0ea6b04..adffcbf 100644
--- a/integration-tests/sqls/simple_window_ranked_built_in_functions.sql
+++ b/integration-tests/sqls/simple_window_ranked_built_in_functions.sql
@@ -16,6 +16,7 @@
select
c9,
+ cume_dist() OVER (PARTITION BY c2 ORDER BY c3) cume_dist_by_c3,
rank() OVER (PARTITION BY c2 ORDER BY c3) rank_by_c3,
dense_rank() OVER (PARTITION BY c2 ORDER BY c3) dense_rank_by_c3
FROM test