This is an automated email from the ASF dual-hosted git repository.
ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 819d357787 Add support for functional dependency for ROW_NUMBER window
function. (#8737)
819d357787 is described below
commit 819d3577872a082f2aea7a68ae83d68534049662
Author: Mustafa Akur <[email protected]>
AuthorDate: Thu Jan 4 09:39:02 2024 +0300
Add support for functional dependency for ROW_NUMBER window function.
(#8737)
* Add primary key support for row_number window function
* Add comments, minor changes
* Add new test
* Review
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
datafusion/expr/src/logical_plan/plan.rs | 59 +++++++++++++++++++++++----
datafusion/sqllogictest/test_files/window.slt | 40 +++++++++++++++++-
2 files changed, 91 insertions(+), 8 deletions(-)
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index c0c520c4e2..93a38fb40d 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -25,7 +25,9 @@ use std::sync::Arc;
use super::dml::CopyTo;
use super::DdlStatement;
use crate::dml::CopyOptions;
-use crate::expr::{Alias, Exists, InSubquery, Placeholder, Sort as SortExpr};
+use crate::expr::{
+ Alias, Exists, InSubquery, Placeholder, Sort as SortExpr, WindowFunction,
+};
use crate::expr_rewriter::{create_col_from_scalar_expr, normalize_cols};
use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
use crate::logical_plan::extension::UserDefinedLogicalNode;
@@ -36,9 +38,9 @@ use crate::utils::{
split_conjunction,
};
use crate::{
- build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable,
CreateView, Expr,
- ExprSchemable, LogicalPlanBuilder, Operator, TableProviderFilterPushDown,
- TableSource,
+ build_join_schema, expr_vec_fmt, BinaryExpr, BuiltInWindowFunction,
+ CreateMemoryTable, CreateView, Expr, ExprSchemable, LogicalPlanBuilder,
Operator,
+ TableProviderFilterPushDown, TableSource, WindowFunctionDefinition,
};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -48,9 +50,10 @@ use datafusion_common::tree_node::{
};
use datafusion_common::{
aggregate_functional_dependencies, internal_err, plan_err, Column,
Constraints,
- DFField, DFSchema, DFSchemaRef, DataFusionError, Dependency,
FunctionalDependencies,
- OwnedTableReference, ParamValues, Result, UnnestOptions,
+ DFField, DFSchema, DFSchemaRef, DataFusionError, Dependency,
FunctionalDependence,
+ FunctionalDependencies, OwnedTableReference, ParamValues, Result,
UnnestOptions,
};
+
// backwards compatibility
pub use datafusion_common::display::{PlanType, StringifiedPlan,
ToStringifiedPlan};
pub use datafusion_common::{JoinConstraint, JoinType};
@@ -1967,7 +1970,9 @@ pub struct Window {
impl Window {
/// Create a new window operator.
pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) ->
Result<Self> {
- let mut window_fields: Vec<DFField> = input.schema().fields().clone();
+ let fields = input.schema().fields();
+ let input_len = fields.len();
+ let mut window_fields = fields.clone();
window_fields.extend_from_slice(&exprlist_to_fields(window_expr.iter(),
&input)?);
let metadata = input.schema().metadata().clone();
@@ -1976,6 +1981,46 @@ impl Window {
input.schema().functional_dependencies().clone();
window_func_dependencies.extend_target_indices(window_fields.len());
+ // Since we know that ROW_NUMBER outputs will be unique (i.e. it
consists
+ // of consecutive numbers per partition), we can represent this fact
with
+ // functional dependencies.
+ let mut new_dependencies = window_expr
+ .iter()
+ .enumerate()
+ .filter_map(|(idx, expr)| {
+ if let Expr::WindowFunction(WindowFunction {
+ // Function is ROW_NUMBER
+ fun:
+ WindowFunctionDefinition::BuiltInWindowFunction(
+ BuiltInWindowFunction::RowNumber,
+ ),
+ partition_by,
+ ..
+ }) = expr
+ {
+ // When there is no PARTITION BY, row number will be unique
+ // across the entire table.
+ if partition_by.is_empty() {
+ return Some(idx + input_len);
+ }
+ }
+ None
+ })
+ .map(|idx| {
+ FunctionalDependence::new(vec![idx], vec![], false)
+ .with_mode(Dependency::Single)
+ })
+ .collect::<Vec<_>>();
+
+ if !new_dependencies.is_empty() {
+ for dependence in new_dependencies.iter_mut() {
+ dependence.target_indices = (0..window_fields.len()).collect();
+ }
+ // Add the dependency introduced because of ROW_NUMBER window
function to the functional dependency
+ let new_deps = FunctionalDependencies::new(new_dependencies);
+ window_func_dependencies.extend(new_deps);
+ }
+
Ok(Window {
input,
window_expr,
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index aa083290b4..7d6d592013 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3832,4 +3832,42 @@ select row_number() over (partition by 1 order by 1) rn,
from (select 1 a union all select 2 a) x;
----
1 1 1 1 1 1
-2 1 1 2 2 1
\ No newline at end of file
+2 1 1 2 2 1
+
+# when partition by expression is empty row number result will be unique.
+query TII
+SELECT *
+FROM (SELECT c1, c2, ROW_NUMBER() OVER() as rn
+ FROM aggregate_test_100
+ LIMIT 5)
+GROUP BY rn
+ORDER BY rn;
+----
+c 2 1
+d 5 2
+b 1 3
+a 1 4
+b 5 5
+
+# when partition by expression is constant row number result will be unique.
+query TII
+SELECT *
+FROM (SELECT c1, c2, ROW_NUMBER() OVER(PARTITION BY 3) as rn
+ FROM aggregate_test_100
+ LIMIT 5)
+GROUP BY rn
+ORDER BY rn;
+----
+c 2 1
+d 5 2
+b 1 3
+a 1 4
+b 5 5
+
+statement error DataFusion error: Error during planning: Projection references
non-aggregate values: Expression aggregate_test_100.c1 could not be resolved
from available columns: rn
+SELECT *
+FROM (SELECT c1, c2, ROW_NUMBER() OVER(PARTITION BY c1) as rn
+ FROM aggregate_test_100
+ LIMIT 5)
+GROUP BY rn
+ORDER BY rn;