This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 1eb5d55  add cume_dist implementation (#1076)
1eb5d55 is described below

commit 1eb5d554b4a8436cb206870caa320956d7c7cfde
Author: Jiayu Liu <[email protected]>
AuthorDate: Fri Oct 8 23:06:06 2021 +0800

    add cume_dist implementation (#1076)
---
 .../expressions/{rank.rs => cume_dist.rs}          | 126 ++++++++++-----------
 datafusion/src/physical_plan/expressions/mod.rs    |   2 +
 datafusion/src/physical_plan/expressions/rank.rs   |   3 +-
 datafusion/src/physical_plan/windows/mod.rs        |   4 +-
 .../simple_window_ranked_built_in_functions.sql    |   1 +
 5 files changed, 65 insertions(+), 71 deletions(-)

diff --git a/datafusion/src/physical_plan/expressions/rank.rs 
b/datafusion/src/physical_plan/expressions/cume_dist.rs
similarity index 53%
copy from datafusion/src/physical_plan/expressions/rank.rs
copy to datafusion/src/physical_plan/expressions/cume_dist.rs
index b88dec3..7b0a45a 100644
--- a/datafusion/src/physical_plan/expressions/rank.rs
+++ b/datafusion/src/physical_plan/expressions/cume_dist.rs
@@ -15,13 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines physical expressions that can evaluated at runtime during query 
execution
+//! Defines physical expression for `cume_dist` that can evaluated
+//! at runtime during query execution
 
 use crate::error::Result;
 use crate::physical_plan::window_functions::PartitionEvaluator;
 use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, 
PhysicalExpr};
 use arrow::array::ArrayRef;
-use arrow::array::UInt64Array;
+use arrow::array::Float64Array;
 use arrow::datatypes::{DataType, Field};
 use arrow::record_batch::RecordBatch;
 use std::any::Any;
@@ -29,24 +30,18 @@ use std::iter;
 use std::ops::Range;
 use std::sync::Arc;
 
-/// Rank calculates the rank in the window function with order by
+/// CumeDist calculates the cume_dist in the window function with order by
 #[derive(Debug)]
-pub struct Rank {
+pub struct CumeDist {
     name: String,
-    dense: bool,
 }
 
-/// Create a rank window function
-pub fn rank(name: String) -> Rank {
-    Rank { name, dense: false }
+/// Create a cume_dist window function
+pub fn cume_dist(name: String) -> CumeDist {
+    CumeDist { name }
 }
 
-/// Create a dense rank window function
-pub fn dense_rank(name: String) -> Rank {
-    Rank { name, dense: true }
-}
-
-impl BuiltInWindowFunctionExpr for Rank {
+impl BuiltInWindowFunctionExpr for CumeDist {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
         self
@@ -54,7 +49,7 @@ impl BuiltInWindowFunctionExpr for Rank {
 
     fn field(&self) -> Result<Field> {
         let nullable = false;
-        let data_type = DataType::UInt64;
+        let data_type = DataType::Float64;
         Ok(Field::new(self.name(), data_type, nullable))
     }
 
@@ -70,48 +65,41 @@ impl BuiltInWindowFunctionExpr for Rank {
         &self,
         _batch: &RecordBatch,
     ) -> Result<Box<dyn PartitionEvaluator>> {
-        Ok(Box::new(RankEvaluator { dense: self.dense }))
+        Ok(Box::new(CumeDistEvaluator {}))
     }
 }
 
-pub(crate) struct RankEvaluator {
-    dense: bool,
-}
+pub(crate) struct CumeDistEvaluator;
 
-impl PartitionEvaluator for RankEvaluator {
+impl PartitionEvaluator for CumeDistEvaluator {
     fn include_rank(&self) -> bool {
         true
     }
 
     fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef> 
{
-        unreachable!("rank evaluation must be called with 
evaluate_partition_with_rank")
+        unreachable!(
+            "cume_dist evaluation must be called with 
evaluate_partition_with_rank"
+        )
     }
 
     fn evaluate_partition_with_rank(
         &self,
-        _partition: Range<usize>,
+        partition: Range<usize>,
         ranks_in_partition: &[Range<usize>],
     ) -> Result<ArrayRef> {
-        let result = if self.dense {
-            
UInt64Array::from_iter_values(ranks_in_partition.iter().zip(1u64..).flat_map(
-                |(range, rank)| {
+        let scaler = (partition.end - partition.start) as f64;
+        let result = Float64Array::from_iter_values(
+            ranks_in_partition
+                .iter()
+                .scan(0_u64, |acc, range| {
                     let len = range.end - range.start;
-                    iter::repeat(rank).take(len)
-                },
-            ))
-        } else {
-            UInt64Array::from_iter_values(
-                ranks_in_partition
-                    .iter()
-                    .scan(1_u64, |acc, range| {
-                        let len = range.end - range.start;
-                        let result = iter::repeat(*acc).take(len);
-                        *acc += len as u64;
-                        Some(result)
-                    })
-                    .flatten(),
-            )
-        };
+                    *acc += len as u64;
+                    let value: f64 = (*acc as f64) / scaler;
+                    let result = iter::repeat(value).take(len);
+                    Some(result)
+                })
+                .flatten(),
+        );
         Ok(Arc::new(result))
     }
 }
@@ -121,24 +109,12 @@ mod tests {
     use super::*;
     use arrow::{array::*, datatypes::*};
 
-    fn test_with_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
-        test_i32_result(
-            expr,
-            vec![-2, -2, 1, 3, 3, 3, 7, 8],
-            vec![0..2, 2..3, 3..6, 6..7, 7..8],
-            expected,
-        )
-    }
-
-    fn test_without_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
-        test_i32_result(expr, vec![-2, -2, 1, 3, 3, 3, 7, 8], vec![0..8], 
expected)
-    }
-
     fn test_i32_result(
-        expr: &Rank,
+        expr: &CumeDist,
         data: Vec<i32>,
+        partition: Range<usize>,
         ranks: Vec<Range<usize>>,
-        expected: Vec<u64>,
+        expected: Vec<f64>,
     ) -> Result<()> {
         let arr: ArrayRef = Arc::new(Int32Array::from(data));
         let values = vec![arr];
@@ -146,27 +122,39 @@ mod tests {
         let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
         let result = expr
             .create_evaluator(&batch)?
-            .evaluate_with_rank(vec![0..8], ranks)?;
+            .evaluate_with_rank(vec![partition], ranks)?;
         assert_eq!(1, result.len());
-        let result = result[0].as_any().downcast_ref::<UInt64Array>().unwrap();
+        let result = 
result[0].as_any().downcast_ref::<Float64Array>().unwrap();
         let result = result.values();
         assert_eq!(expected, result);
         Ok(())
     }
 
     #[test]
-    fn test_dense_rank() -> Result<()> {
-        let r = dense_rank("arr".into());
-        test_without_rank(&r, vec![1; 8])?;
-        test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?;
-        Ok(())
-    }
+    fn test_cume_dist() -> Result<()> {
+        let r = cume_dist("arr".into());
+
+        let expected = vec![0.0; 0];
+        test_i32_result(&r, vec![], 0..0, vec![], expected)?;
+
+        let expected = vec![1.0; 1];
+        test_i32_result(&r, vec![20; 1], 0..1, vec![0..1], expected)?;
+
+        let expected = vec![1.0; 2];
+        test_i32_result(&r, vec![20; 2], 0..2, vec![0..2], expected)?;
+
+        let expected = vec![0.5, 0.5, 1.0, 1.0];
+        test_i32_result(&r, vec![1, 1, 2, 2], 0..4, vec![0..2, 2..4], 
expected)?;
+
+        let expected = vec![0.25, 0.5, 0.75, 1.0];
+        test_i32_result(
+            &r,
+            vec![1, 2, 4, 5],
+            0..4,
+            vec![0..1, 1..2, 2..3, 3..4],
+            expected,
+        )?;
 
-    #[test]
-    fn test_rank() -> Result<()> {
-        let r = rank("arr".into());
-        test_without_rank(&r, vec![1; 8])?;
-        test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?;
         Ok(())
     }
 }
diff --git a/datafusion/src/physical_plan/expressions/mod.rs 
b/datafusion/src/physical_plan/expressions/mod.rs
index 5a5a118..9c05112 100644
--- a/datafusion/src/physical_plan/expressions/mod.rs
+++ b/datafusion/src/physical_plan/expressions/mod.rs
@@ -33,6 +33,7 @@ mod cast;
 mod coercion;
 mod column;
 mod count;
+mod cume_dist;
 mod in_list;
 mod is_not_null;
 mod is_null;
@@ -62,6 +63,7 @@ pub use cast::{
 };
 pub use column::{col, Column};
 pub use count::Count;
+pub use cume_dist::cume_dist;
 pub use in_list::{in_list, InListExpr};
 pub use is_not_null::{is_not_null, IsNotNullExpr};
 pub use is_null::{is_null, IsNullExpr};
diff --git a/datafusion/src/physical_plan/expressions/rank.rs 
b/datafusion/src/physical_plan/expressions/rank.rs
index b88dec3..b82cfa4 100644
--- a/datafusion/src/physical_plan/expressions/rank.rs
+++ b/datafusion/src/physical_plan/expressions/rank.rs
@@ -15,7 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines physical expressions that can evaluated at runtime during query 
execution
+//! Defines physical expression for `rank` and `dense_rank` that can evaluated
+//! at runtime during query execution
 
 use crate::error::Result;
 use crate::physical_plan::window_functions::PartitionEvaluator;
diff --git a/datafusion/src/physical_plan/windows/mod.rs 
b/datafusion/src/physical_plan/windows/mod.rs
index 7bd8312..f34649f 100644
--- a/datafusion/src/physical_plan/windows/mod.rs
+++ b/datafusion/src/physical_plan/windows/mod.rs
@@ -22,7 +22,8 @@ use crate::logical_plan::window_frames::WindowFrame;
 use crate::physical_plan::{
     aggregates,
     expressions::{
-        dense_rank, lag, lead, rank, Literal, NthValue, PhysicalSortExpr, 
RowNumber,
+        cume_dist, dense_rank, lag, lead, rank, Literal, NthValue, 
PhysicalSortExpr,
+        RowNumber,
     },
     type_coercion::coerce,
     window_functions::{
@@ -95,6 +96,7 @@ fn create_built_in_window_expr(
         BuiltInWindowFunction::RowNumber => Arc::new(RowNumber::new(name)),
         BuiltInWindowFunction::Rank => Arc::new(rank(name)),
         BuiltInWindowFunction::DenseRank => Arc::new(dense_rank(name)),
+        BuiltInWindowFunction::CumeDist => Arc::new(cume_dist(name)),
         BuiltInWindowFunction::Lag => {
             let coerced_args = coerce(args, input_schema, 
&signature_for_built_in(fun))?;
             let arg = coerced_args[0].clone();
diff --git a/integration-tests/sqls/simple_window_ranked_built_in_functions.sql 
b/integration-tests/sqls/simple_window_ranked_built_in_functions.sql
index 0ea6b04..adffcbf 100644
--- a/integration-tests/sqls/simple_window_ranked_built_in_functions.sql
+++ b/integration-tests/sqls/simple_window_ranked_built_in_functions.sql
@@ -16,6 +16,7 @@
 
 select
   c9,
+  cume_dist() OVER (PARTITION BY c2 ORDER BY c3) cume_dist_by_c3,
   rank() OVER (PARTITION BY c2 ORDER BY c3) rank_by_c3,
   dense_rank() OVER (PARTITION BY c2 ORDER BY c3) dense_rank_by_c3
 FROM test

Reply via email to