This is an automated email from the ASF dual-hosted git repository.

ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 819d357787 Add support for functional dependency for ROW_NUMBER window 
function. (#8737)
819d357787 is described below

commit 819d3577872a082f2aea7a68ae83d68534049662
Author: Mustafa Akur <[email protected]>
AuthorDate: Thu Jan 4 09:39:02 2024 +0300

    Add support for functional dependency for ROW_NUMBER window function. 
(#8737)
    
    * Add primary key support for row_number window function
    
    * Add comments, minor changes
    
    * Add new test
    
    * Review
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 datafusion/expr/src/logical_plan/plan.rs      | 59 +++++++++++++++++++++++----
 datafusion/sqllogictest/test_files/window.slt | 40 +++++++++++++++++-
 2 files changed, 91 insertions(+), 8 deletions(-)

diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index c0c520c4e2..93a38fb40d 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -25,7 +25,9 @@ use std::sync::Arc;
 use super::dml::CopyTo;
 use super::DdlStatement;
 use crate::dml::CopyOptions;
-use crate::expr::{Alias, Exists, InSubquery, Placeholder, Sort as SortExpr};
+use crate::expr::{
+    Alias, Exists, InSubquery, Placeholder, Sort as SortExpr, WindowFunction,
+};
 use crate::expr_rewriter::{create_col_from_scalar_expr, normalize_cols};
 use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
 use crate::logical_plan::extension::UserDefinedLogicalNode;
@@ -36,9 +38,9 @@ use crate::utils::{
     split_conjunction,
 };
 use crate::{
-    build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, 
CreateView, Expr,
-    ExprSchemable, LogicalPlanBuilder, Operator, TableProviderFilterPushDown,
-    TableSource,
+    build_join_schema, expr_vec_fmt, BinaryExpr, BuiltInWindowFunction,
+    CreateMemoryTable, CreateView, Expr, ExprSchemable, LogicalPlanBuilder, 
Operator,
+    TableProviderFilterPushDown, TableSource, WindowFunctionDefinition,
 };
 
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -48,9 +50,10 @@ use datafusion_common::tree_node::{
 };
 use datafusion_common::{
     aggregate_functional_dependencies, internal_err, plan_err, Column, 
Constraints,
-    DFField, DFSchema, DFSchemaRef, DataFusionError, Dependency, 
FunctionalDependencies,
-    OwnedTableReference, ParamValues, Result, UnnestOptions,
+    DFField, DFSchema, DFSchemaRef, DataFusionError, Dependency, 
FunctionalDependence,
+    FunctionalDependencies, OwnedTableReference, ParamValues, Result, 
UnnestOptions,
 };
+
 // backwards compatibility
 pub use datafusion_common::display::{PlanType, StringifiedPlan, 
ToStringifiedPlan};
 pub use datafusion_common::{JoinConstraint, JoinType};
@@ -1967,7 +1970,9 @@ pub struct Window {
 impl Window {
     /// Create a new window operator.
     pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> 
Result<Self> {
-        let mut window_fields: Vec<DFField> = input.schema().fields().clone();
+        let fields = input.schema().fields();
+        let input_len = fields.len();
+        let mut window_fields = fields.clone();
         
window_fields.extend_from_slice(&exprlist_to_fields(window_expr.iter(), 
&input)?);
         let metadata = input.schema().metadata().clone();
 
@@ -1976,6 +1981,46 @@ impl Window {
             input.schema().functional_dependencies().clone();
         window_func_dependencies.extend_target_indices(window_fields.len());
 
+        // Since we know that ROW_NUMBER outputs will be unique (i.e. it 
consists
+        // of consecutive numbers per partition), we can represent this fact 
with
+        // functional dependencies.
+        let mut new_dependencies = window_expr
+            .iter()
+            .enumerate()
+            .filter_map(|(idx, expr)| {
+                if let Expr::WindowFunction(WindowFunction {
+                    // Function is ROW_NUMBER
+                    fun:
+                        WindowFunctionDefinition::BuiltInWindowFunction(
+                            BuiltInWindowFunction::RowNumber,
+                        ),
+                    partition_by,
+                    ..
+                }) = expr
+                {
+                    // When there is no PARTITION BY, row number will be unique
+                    // across the entire table.
+                    if partition_by.is_empty() {
+                        return Some(idx + input_len);
+                    }
+                }
+                None
+            })
+            .map(|idx| {
+                FunctionalDependence::new(vec![idx], vec![], false)
+                    .with_mode(Dependency::Single)
+            })
+            .collect::<Vec<_>>();
+
+        if !new_dependencies.is_empty() {
+            for dependence in new_dependencies.iter_mut() {
+                dependence.target_indices = (0..window_fields.len()).collect();
+            }
+            // Add the dependency introduced because of ROW_NUMBER window 
function to the functional dependency
+            let new_deps = FunctionalDependencies::new(new_dependencies);
+            window_func_dependencies.extend(new_deps);
+        }
+
         Ok(Window {
             input,
             window_expr,
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index aa083290b4..7d6d592013 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3832,4 +3832,42 @@ select row_number() over (partition by 1 order by 1) rn,
 from (select 1 a union all select 2 a) x;
 ----
 1 1 1 1 1 1
-2 1 1 2 2 1
\ No newline at end of file
+2 1 1 2 2 1
+
+# when partition by expression is empty row number result will be unique.
+query TII
+SELECT *
+FROM (SELECT c1, c2, ROW_NUMBER() OVER() as rn
+    FROM aggregate_test_100
+    LIMIT 5)
+GROUP BY rn
+ORDER BY rn;
+----
+c 2 1
+d 5 2
+b 1 3
+a 1 4
+b 5 5
+
+# when partition by expression is constant row number result will be unique.
+query TII
+SELECT *
+FROM (SELECT c1, c2, ROW_NUMBER() OVER(PARTITION BY 3) as rn
+    FROM aggregate_test_100
+    LIMIT 5)
+GROUP BY rn
+ORDER BY rn;
+----
+c 2 1
+d 5 2
+b 1 3
+a 1 4
+b 5 5
+
+statement error DataFusion error: Error during planning: Projection references 
non-aggregate values: Expression aggregate_test_100.c1 could not be resolved 
from available columns: rn
+SELECT *
+FROM (SELECT c1, c2, ROW_NUMBER() OVER(PARTITION BY c1) as rn
+    FROM aggregate_test_100
+    LIMIT 5)
+GROUP BY rn
+ORDER BY rn;

Reply via email to