This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b218be67a Simplify windows builtin functions return type (#8920)
2b218be67a is described below

commit 2b218be67a6c412629530b812836a6cec76efc32
Author: comphead <[email protected]>
AuthorDate: Mon Jan 22 00:34:43 2024 -0800

    Simplify windows builtin functions return type (#8920)
    
    * Simplify windows builtin functions
    
    * add field comments
---
 datafusion/core/src/physical_planner.rs            | 25 ++++----
 datafusion/core/tests/fuzz_cases/window_fuzz.rs    | 41 +++++++++++++-
 datafusion/expr/src/built_in_window_function.rs    |  4 +-
 datafusion/physical-expr/src/window/cume_dist.rs   | 14 +++--
 datafusion/physical-expr/src/window/lead_lag.rs    |  1 +
 datafusion/physical-expr/src/window/nth_value.rs   |  1 +
 datafusion/physical-expr/src/window/ntile.rs       | 13 +++--
 datafusion/physical-expr/src/window/rank.rs        | 23 ++++----
 datafusion/physical-expr/src/window/row_number.rs  | 16 ++++--
 datafusion/physical-plan/src/windows/mod.rs        | 42 ++++++++------
 .../proto/tests/cases/roundtrip_physical_plan.rs   |  3 +-
 datafusion/sqllogictest/test_files/window.slt      | 66 ++++++++++++++++++++++
 12 files changed, 186 insertions(+), 63 deletions(-)

diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index bc448fe06f..ed92688559 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -86,6 +86,7 @@ use datafusion_expr::expr::{
 };
 use datafusion_expr::expr_rewriter::unnormalize_cols;
 use 
datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
+use datafusion_expr::utils::exprlist_to_fields;
 use datafusion_expr::{
     DescribeTable, DmlStatement, RecursiveQuery, ScalarFunctionDefinition,
     StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
@@ -719,14 +720,16 @@ impl DefaultPhysicalPlanner {
                     }
 
                     let logical_input_schema = input.schema();
-                    let physical_input_schema = input_exec.schema();
+                    // Extend the schema to include window expression fields 
as builtin window functions derives its datatype from incoming schema
+                    let mut window_fields = 
logical_input_schema.fields().clone();
+                    
window_fields.extend_from_slice(&exprlist_to_fields(window_expr.iter(), 
input)?);
+                    let extended_schema = 
&DFSchema::new_with_metadata(window_fields, HashMap::new())?;
                     let window_expr = window_expr
                         .iter()
                         .map(|e| {
                             create_window_expr(
                                 e,
-                                logical_input_schema,
-                                &physical_input_schema,
+                                extended_schema,
                                 session_state.execution_props(),
                             )
                         })
@@ -1529,7 +1532,7 @@ fn get_physical_expr_pair(
 /// queries like:
 /// OVER (ORDER BY a RANGES BETWEEN 3 PRECEDING AND 5 PRECEDING)
 /// OVER (ORDER BY a RANGES BETWEEN INTERVAL '3 DAY' PRECEDING AND '5 DAY' 
PRECEDING)  are rejected
-pub fn is_window_valid(window_frame: &WindowFrame) -> bool {
+pub fn is_window_frame_bound_valid(window_frame: &WindowFrame) -> bool {
     match (&window_frame.start_bound, &window_frame.end_bound) {
         (WindowFrameBound::Following(_), WindowFrameBound::Preceding(_))
         | (WindowFrameBound::Following(_), WindowFrameBound::CurrentRow)
@@ -1549,10 +1552,10 @@ pub fn create_window_expr_with_name(
     e: &Expr,
     name: impl Into<String>,
     logical_input_schema: &DFSchema,
-    physical_input_schema: &Schema,
     execution_props: &ExecutionProps,
 ) -> Result<Arc<dyn WindowExpr>> {
     let name = name.into();
+    let physical_input_schema: &Schema = &logical_input_schema.into();
     match e {
         Expr::WindowFunction(WindowFunction {
             fun,
@@ -1575,7 +1578,8 @@ pub fn create_window_expr_with_name(
                     create_physical_sort_expr(e, logical_input_schema, 
execution_props)
                 })
                 .collect::<Result<Vec<_>>>()?;
-            if !is_window_valid(window_frame) {
+
+            if !is_window_frame_bound_valid(window_frame) {
                 return plan_err!(
                         "Invalid window frame: start bound ({}) cannot be 
larger than end bound ({})",
                         window_frame.start_bound, window_frame.end_bound
@@ -1601,7 +1605,6 @@ pub fn create_window_expr_with_name(
 pub fn create_window_expr(
     e: &Expr,
     logical_input_schema: &DFSchema,
-    physical_input_schema: &Schema,
     execution_props: &ExecutionProps,
 ) -> Result<Arc<dyn WindowExpr>> {
     // unpack aliased logical expressions, e.g. "sum(col) over () as total"
@@ -1609,13 +1612,7 @@ pub fn create_window_expr(
         Expr::Alias(Alias { expr, name, .. }) => (name.clone(), expr.as_ref()),
         _ => (e.display_name()?, e),
     };
-    create_window_expr_with_name(
-        e,
-        name,
-        logical_input_schema,
-        physical_input_schema,
-        execution_props,
-    )
+    create_window_expr_with_name(e, name, logical_input_schema, 
execution_props)
 }
 
 type AggregateExprWithOptionalArgs = (
diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
index 6e5c5f8eb9..4c440d6a5b 100644
--- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
@@ -22,6 +22,7 @@ use arrow::compute::{concat_batches, SortOptions};
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
 use arrow::util::pretty::pretty_format_batches;
+use arrow_schema::{Field, Schema};
 use datafusion::physical_plan::memory::MemoryExec;
 use datafusion::physical_plan::sorts::sort::SortExec;
 use datafusion::physical_plan::windows::{
@@ -37,6 +38,7 @@ use datafusion_expr::{
 };
 use datafusion_physical_expr::expressions::{cast, col, lit};
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
+use itertools::Itertools;
 use test_utils::add_empty_batches;
 
 use hashbrown::HashMap;
@@ -482,7 +484,6 @@ async fn run_window_test(
     let session_config = SessionConfig::new().with_batch_size(50);
     let ctx = SessionContext::new_with_config(session_config);
     let (window_fn, args, fn_name) = get_random_function(&schema, &mut rng, 
is_linear);
-
     let window_frame = get_random_window_frame(&mut rng, is_linear);
     let mut orderby_exprs = vec![];
     for column in &orderby_columns {
@@ -532,6 +533,40 @@ async fn run_window_test(
     if is_linear {
         exec1 = Arc::new(SortExec::new(sort_keys.clone(), exec1)) as _;
     }
+
+    // The schema needs to be enriched before the `create_window_expr`
+    // The reason for this is window expressions datatypes are derived from 
the schema
+    // The datafusion code enriches the schema on physical planner and this 
test copies the same behavior manually
+    // Also bunch of functions dont require input arguments thus just send an 
empty vec for such functions
+    let data_types = if [
+        "row_number",
+        "rank",
+        "dense_rank",
+        "percent_rank",
+        "ntile",
+        "cume_dist",
+    ]
+    .contains(&fn_name.as_str())
+    {
+        vec![]
+    } else {
+        args.iter()
+            .map(|e| e.clone().as_ref().data_type(&schema))
+            .collect::<Result<Vec<_>>>()?
+    };
+    let window_expr_return_type = window_fn.return_type(&data_types)?;
+    let mut window_fields = schema
+        .fields()
+        .iter()
+        .map(|f| f.as_ref().clone())
+        .collect_vec();
+    window_fields.extend_from_slice(&[Field::new(
+        &fn_name,
+        window_expr_return_type,
+        true,
+    )]);
+    let extended_schema = Arc::new(Schema::new(window_fields));
+
     let usual_window_exec = Arc::new(
         WindowAggExec::try_new(
             vec![create_window_expr(
@@ -541,7 +576,7 @@ async fn run_window_test(
                 &partitionby_exprs,
                 &orderby_exprs,
                 Arc::new(window_frame.clone()),
-                schema.as_ref(),
+                &extended_schema,
             )
             .unwrap()],
             exec1,
@@ -563,7 +598,7 @@ async fn run_window_test(
                 &partitionby_exprs,
                 &orderby_exprs,
                 Arc::new(window_frame.clone()),
-                schema.as_ref(),
+                extended_schema.as_ref(),
             )
             .unwrap()],
             exec2,
diff --git a/datafusion/expr/src/built_in_window_function.rs 
b/datafusion/expr/src/built_in_window_function.rs
index a03e3d2d24..f4b1cd03db 100644
--- a/datafusion/expr/src/built_in_window_function.rs
+++ b/datafusion/expr/src/built_in_window_function.rs
@@ -133,11 +133,11 @@ impl BuiltInWindowFunction {
         match self {
             BuiltInWindowFunction::RowNumber
             | BuiltInWindowFunction::Rank
-            | BuiltInWindowFunction::DenseRank => Ok(DataType::UInt64),
+            | BuiltInWindowFunction::DenseRank
+            | BuiltInWindowFunction::Ntile => Ok(DataType::UInt64),
             BuiltInWindowFunction::PercentRank | 
BuiltInWindowFunction::CumeDist => {
                 Ok(DataType::Float64)
             }
-            BuiltInWindowFunction::Ntile => Ok(DataType::UInt64),
             BuiltInWindowFunction::Lag
             | BuiltInWindowFunction::Lead
             | BuiltInWindowFunction::FirstValue
diff --git a/datafusion/physical-expr/src/window/cume_dist.rs 
b/datafusion/physical-expr/src/window/cume_dist.rs
index edef77c51c..9720187ea8 100644
--- a/datafusion/physical-expr/src/window/cume_dist.rs
+++ b/datafusion/physical-expr/src/window/cume_dist.rs
@@ -34,11 +34,16 @@ use std::sync::Arc;
 #[derive(Debug)]
 pub struct CumeDist {
     name: String,
+    /// Output data type
+    data_type: DataType,
 }
 
 /// Create a cume_dist window function
-pub fn cume_dist(name: String) -> CumeDist {
-    CumeDist { name }
+pub fn cume_dist(name: String, data_type: &DataType) -> CumeDist {
+    CumeDist {
+        name,
+        data_type: data_type.clone(),
+    }
 }
 
 impl BuiltInWindowFunctionExpr for CumeDist {
@@ -49,8 +54,7 @@ impl BuiltInWindowFunctionExpr for CumeDist {
 
     fn field(&self) -> Result<Field> {
         let nullable = false;
-        let data_type = DataType::Float64;
-        Ok(Field::new(self.name(), data_type, nullable))
+        Ok(Field::new(self.name(), self.data_type.clone(), nullable))
     }
 
     fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
@@ -119,7 +123,7 @@ mod tests {
     #[test]
     #[allow(clippy::single_range_in_vec_init)]
     fn test_cume_dist() -> Result<()> {
-        let r = cume_dist("arr".into());
+        let r = cume_dist("arr".into(), &DataType::Float64);
 
         let expected = vec![0.0; 0];
         test_i32_result(&r, 0, vec![], expected)?;
diff --git a/datafusion/physical-expr/src/window/lead_lag.rs 
b/datafusion/physical-expr/src/window/lead_lag.rs
index 7ee736ce9c..054a4c13e6 100644
--- a/datafusion/physical-expr/src/window/lead_lag.rs
+++ b/datafusion/physical-expr/src/window/lead_lag.rs
@@ -35,6 +35,7 @@ use std::sync::Arc;
 #[derive(Debug)]
 pub struct WindowShift {
     name: String,
+    /// Output data type
     data_type: DataType,
     shift_offset: i64,
     expr: Arc<dyn PhysicalExpr>,
diff --git a/datafusion/physical-expr/src/window/nth_value.rs 
b/datafusion/physical-expr/src/window/nth_value.rs
index b3c89122eb..05909ab25a 100644
--- a/datafusion/physical-expr/src/window/nth_value.rs
+++ b/datafusion/physical-expr/src/window/nth_value.rs
@@ -39,6 +39,7 @@ use datafusion_expr::PartitionEvaluator;
 pub struct NthValue {
     name: String,
     expr: Arc<dyn PhysicalExpr>,
+    /// Output data type
     data_type: DataType,
     kind: NthValueKind,
 }
diff --git a/datafusion/physical-expr/src/window/ntile.rs 
b/datafusion/physical-expr/src/window/ntile.rs
index f5442e1b0f..fb7a7ad84f 100644
--- a/datafusion/physical-expr/src/window/ntile.rs
+++ b/datafusion/physical-expr/src/window/ntile.rs
@@ -35,11 +35,17 @@ use std::sync::Arc;
 pub struct Ntile {
     name: String,
     n: u64,
+    /// Output data type
+    data_type: DataType,
 }
 
 impl Ntile {
-    pub fn new(name: String, n: u64) -> Self {
-        Self { name, n }
+    pub fn new(name: String, n: u64, data_type: &DataType) -> Self {
+        Self {
+            name,
+            n,
+            data_type: data_type.clone(),
+        }
     }
 
     pub fn get_n(&self) -> u64 {
@@ -54,8 +60,7 @@ impl BuiltInWindowFunctionExpr for Ntile {
 
     fn field(&self) -> Result<Field> {
         let nullable = false;
-        let data_type = DataType::UInt64;
-        Ok(Field::new(self.name(), data_type, nullable))
+        Ok(Field::new(self.name(), self.data_type.clone(), nullable))
     }
 
     fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
diff --git a/datafusion/physical-expr/src/window/rank.rs 
b/datafusion/physical-expr/src/window/rank.rs
index 86af5b3221..1f643f0280 100644
--- a/datafusion/physical-expr/src/window/rank.rs
+++ b/datafusion/physical-expr/src/window/rank.rs
@@ -41,6 +41,8 @@ use std::sync::Arc;
 pub struct Rank {
     name: String,
     rank_type: RankType,
+    /// Output data type
+    data_type: DataType,
 }
 
 impl Rank {
@@ -58,26 +60,29 @@ pub enum RankType {
 }
 
 /// Create a rank window function
-pub fn rank(name: String) -> Rank {
+pub fn rank(name: String, data_type: &DataType) -> Rank {
     Rank {
         name,
         rank_type: RankType::Basic,
+        data_type: data_type.clone(),
     }
 }
 
 /// Create a dense rank window function
-pub fn dense_rank(name: String) -> Rank {
+pub fn dense_rank(name: String, data_type: &DataType) -> Rank {
     Rank {
         name,
         rank_type: RankType::Dense,
+        data_type: data_type.clone(),
     }
 }
 
 /// Create a percent rank window function
-pub fn percent_rank(name: String) -> Rank {
+pub fn percent_rank(name: String, data_type: &DataType) -> Rank {
     Rank {
         name,
         rank_type: RankType::Percent,
+        data_type: data_type.clone(),
     }
 }
 
@@ -89,11 +94,7 @@ impl BuiltInWindowFunctionExpr for Rank {
 
     fn field(&self) -> Result<Field> {
         let nullable = false;
-        let data_type = match self.rank_type {
-            RankType::Basic | RankType::Dense => DataType::UInt64,
-            RankType::Percent => DataType::Float64,
-        };
-        Ok(Field::new(self.name(), data_type, nullable))
+        Ok(Field::new(self.name(), self.data_type.clone(), nullable))
     }
 
     fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
@@ -268,7 +269,7 @@ mod tests {
 
     #[test]
     fn test_dense_rank() -> Result<()> {
-        let r = dense_rank("arr".into());
+        let r = dense_rank("arr".into(), &DataType::UInt64);
         test_without_rank(&r, vec![1; 8])?;
         test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?;
         Ok(())
@@ -276,7 +277,7 @@ mod tests {
 
     #[test]
     fn test_rank() -> Result<()> {
-        let r = rank("arr".into());
+        let r = rank("arr".into(), &DataType::UInt64);
         test_without_rank(&r, vec![1; 8])?;
         test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?;
         Ok(())
@@ -285,7 +286,7 @@ mod tests {
     #[test]
     #[allow(clippy::single_range_in_vec_init)]
     fn test_percent_rank() -> Result<()> {
-        let r = percent_rank("arr".into());
+        let r = percent_rank("arr".into(), &DataType::Float64);
 
         // empty case
         let expected = vec![0.0; 0];
diff --git a/datafusion/physical-expr/src/window/row_number.rs 
b/datafusion/physical-expr/src/window/row_number.rs
index f5e2f65a65..759f447ab0 100644
--- a/datafusion/physical-expr/src/window/row_number.rs
+++ b/datafusion/physical-expr/src/window/row_number.rs
@@ -36,12 +36,17 @@ use std::sync::Arc;
 #[derive(Debug)]
 pub struct RowNumber {
     name: String,
+    /// Output data type
+    data_type: DataType,
 }
 
 impl RowNumber {
     /// Create a new ROW_NUMBER function
-    pub fn new(name: impl Into<String>) -> Self {
-        Self { name: name.into() }
+    pub fn new(name: impl Into<String>, data_type: &DataType) -> Self {
+        Self {
+            name: name.into(),
+            data_type: data_type.clone(),
+        }
     }
 }
 
@@ -53,8 +58,7 @@ impl BuiltInWindowFunctionExpr for RowNumber {
 
     fn field(&self) -> Result<Field> {
         let nullable = false;
-        let data_type = DataType::UInt64;
-        Ok(Field::new(self.name(), data_type, nullable))
+        Ok(Field::new(self.name(), self.data_type.clone(), nullable))
     }
 
     fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
@@ -127,7 +131,7 @@ mod tests {
         ]));
         let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, 
true)]);
         let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?;
-        let row_number = RowNumber::new("row_number".to_owned());
+        let row_number = RowNumber::new("row_number".to_owned(), 
&DataType::UInt64);
         let values = row_number.evaluate_args(&batch)?;
         let result = row_number
             .create_evaluator()?
@@ -145,7 +149,7 @@ mod tests {
         ]));
         let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, 
false)]);
         let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?;
-        let row_number = RowNumber::new("row_number".to_owned());
+        let row_number = RowNumber::new("row_number".to_owned(), 
&DataType::UInt64);
         let values = row_number.evaluate_args(&batch)?;
         let result = row_number
             .create_evaluator()?
diff --git a/datafusion/physical-plan/src/windows/mod.rs 
b/datafusion/physical-plan/src/windows/mod.rs
index a85e5cc31c..e55cc7fca7 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -160,12 +160,13 @@ fn create_built_in_window_expr(
     input_schema: &Schema,
     name: String,
 ) -> Result<Arc<dyn BuiltInWindowFunctionExpr>> {
+    let data_type = input_schema.field_with_name(&name)?.data_type();
     Ok(match fun {
-        BuiltInWindowFunction::RowNumber => Arc::new(RowNumber::new(name)),
-        BuiltInWindowFunction::Rank => Arc::new(rank(name)),
-        BuiltInWindowFunction::DenseRank => Arc::new(dense_rank(name)),
-        BuiltInWindowFunction::PercentRank => Arc::new(percent_rank(name)),
-        BuiltInWindowFunction::CumeDist => Arc::new(cume_dist(name)),
+        BuiltInWindowFunction::RowNumber => Arc::new(RowNumber::new(name, 
data_type)),
+        BuiltInWindowFunction::Rank => Arc::new(rank(name, data_type)),
+        BuiltInWindowFunction::DenseRank => Arc::new(dense_rank(name, 
data_type)),
+        BuiltInWindowFunction::PercentRank => Arc::new(percent_rank(name, 
data_type)),
+        BuiltInWindowFunction::CumeDist => Arc::new(cume_dist(name, 
data_type)),
         BuiltInWindowFunction::Ntile => {
             let n = get_scalar_value_from_args(args, 0)?.ok_or_else(|| {
                 DataFusionError::Execution(
@@ -179,32 +180,42 @@ fn create_built_in_window_expr(
 
             if n.is_unsigned() {
                 let n: u64 = n.try_into()?;
-                Arc::new(Ntile::new(name, n))
+                Arc::new(Ntile::new(name, n, data_type))
             } else {
                 let n: i64 = n.try_into()?;
                 if n <= 0 {
                     return exec_err!("NTILE requires a positive integer");
                 }
-                Arc::new(Ntile::new(name, n as u64))
+                Arc::new(Ntile::new(name, n as u64, data_type))
             }
         }
         BuiltInWindowFunction::Lag => {
             let arg = args[0].clone();
-            let data_type = args[0].data_type(input_schema)?;
             let shift_offset = get_scalar_value_from_args(args, 1)?
                 .map(|v| v.try_into())
                 .and_then(|v| v.ok());
             let default_value = get_scalar_value_from_args(args, 2)?;
-            Arc::new(lag(name, data_type, arg, shift_offset, default_value))
+            Arc::new(lag(
+                name,
+                data_type.clone(),
+                arg,
+                shift_offset,
+                default_value,
+            ))
         }
         BuiltInWindowFunction::Lead => {
             let arg = args[0].clone();
-            let data_type = args[0].data_type(input_schema)?;
             let shift_offset = get_scalar_value_from_args(args, 1)?
                 .map(|v| v.try_into())
                 .and_then(|v| v.ok());
             let default_value = get_scalar_value_from_args(args, 2)?;
-            Arc::new(lead(name, data_type, arg, shift_offset, default_value))
+            Arc::new(lead(
+                name,
+                data_type.clone(),
+                arg,
+                shift_offset,
+                default_value,
+            ))
         }
         BuiltInWindowFunction::NthValue => {
             let arg = args[0].clone();
@@ -214,18 +225,15 @@ fn create_built_in_window_expr(
                 .try_into()
                 .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
             let n: u32 = n as u32;
-            let data_type = args[0].data_type(input_schema)?;
-            Arc::new(NthValue::nth(name, arg, data_type, n)?)
+            Arc::new(NthValue::nth(name, arg, data_type.clone(), n)?)
         }
         BuiltInWindowFunction::FirstValue => {
             let arg = args[0].clone();
-            let data_type = args[0].data_type(input_schema)?;
-            Arc::new(NthValue::first(name, arg, data_type))
+            Arc::new(NthValue::first(name, arg, data_type.clone()))
         }
         BuiltInWindowFunction::LastValue => {
             let arg = args[0].clone();
-            let data_type = args[0].data_type(input_schema)?;
-            Arc::new(NthValue::last(name, arg, data_type))
+            Arc::new(NthValue::last(name, arg, data_type.clone()))
         }
     })
 }
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 3a13dc887f..8e0f75ce7d 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -253,7 +253,8 @@ fn roundtrip_nested_loop_join() -> Result<()> {
 fn roundtrip_window() -> Result<()> {
     let field_a = Field::new("a", DataType::Int64, false);
     let field_b = Field::new("b", DataType::Int64, false);
-    let schema = Arc::new(Schema::new(vec![field_a, field_b]));
+    let field_c = Field::new("FIRST_VALUE(a) PARTITION BY [b] ORDER BY [a ASC 
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", 
DataType::Int64, false);
+    let schema = Arc::new(Schema::new(vec![field_a, field_b, field_c]));
 
     let window_frame = WindowFrame::new_bounds(
         datafusion_expr::WindowFrameUnits::Range,
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index f8337e21d7..f6d8a1ce8f 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3906,3 +3906,69 @@ ProjectionExec: expr=[sn@0 as sn, ts@1 as ts, currency@2 
as currency, amount@3 a
 --BoundedWindowAggExec: wdw=[SUM(table_with_pk.amount) ORDER BY 
[table_with_pk.sn ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW: Ok(Field { name: "SUM(table_with_pk.amount) ORDER BY [table_with_pk.sn ASC 
NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: 
Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), 
end_bound: CurrentRow }], mode=[Sorted]
 ----SortExec: expr=[sn@0 ASC NULLS LAST]
 ------MemoryExec: partitions=1, partition_sizes=[1]
+
+# test ROW_NUMBER window function returns correct data_type
+query T
+select arrow_typeof(row_number() over ()) from (select 1 a)
+----
+UInt64
+
+# test RANK window function returns correct data_type
+query T
+select arrow_typeof(rank() over ()) from (select 1 a)
+----
+UInt64
+
+# test DENSE_RANK window function returns correct data_type
+query T
+select arrow_typeof(dense_rank() over ()) from (select 1 a)
+----
+UInt64
+
+# test PERCENT_RANK window function returns correct data_type
+query T
+select arrow_typeof(percent_rank() over ()) from (select 1 a)
+----
+Float64
+
+# test CUME_DIST window function returns correct data_type
+query T
+select arrow_typeof(cume_dist() over ()) from (select 1 a)
+----
+Float64
+
+# test NTILE window function returns correct data_type
+query T
+select arrow_typeof(ntile(1) over ()) from (select 1 a)
+----
+UInt64
+
+# test LAG window function returns correct data_type
+query T
+select arrow_typeof(lag(a) over ()) from (select 1 a)
+----
+Int64
+
+# test LEAD window function returns correct data_type
+query T
+select arrow_typeof(lead(a) over ()) from (select 1 a)
+----
+Int64
+
+# test FIRST_VALUE window function returns correct data_type
+query T
+select arrow_typeof(first_value(a) over ()) from (select 1 a)
+----
+Int64
+
+# test LAST_VALUE window function returns correct data_type
+query T
+select arrow_typeof(last_value(a) over ()) from (select 1 a)
+----
+Int64
+
+# test NTH_VALUE window function returns correct data_type
+query T
+select arrow_typeof(nth_value(a, 1) over ()) from (select 1 a)
+----
+Int64
\ No newline at end of file

Reply via email to