This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f093f405c Implement cardinality_effect for window execs and UnionExec 
(#20321)
0f093f405c is described below

commit 0f093f405c399df8673298a4ecc6814f0823ea9e
Author: Namgung Chan <[email protected]>
AuthorDate: Thu Mar 5 07:28:55 2026 +0900

    Implement cardinality_effect for window execs and UnionExec (#20321)
    
    ## Which issue does this PR close?
    
    - Closes #20291.
    
    ## Rationale for this change
    
    `WindowAggExec` and `BoundedWindowAggExec` did not implement
    `cardinality_effect`, which left this property as `Unknown`.
    
    Both operators preserve row cardinality:
    - They evaluate window expressions per input row and append result
    columns.
    - They do not filter out rows.
    - They do not duplicate rows.
    
    So their cardinality effect is `Equal`.
    
    This PR also updates `UnionExec`, which combines rows from multiple
    children. Its cardinality effect should be `GreaterEqual` instead of
    defaulting to `Unknown`.
    
    ## What changes are included in this PR?
    
    - Implement `cardinality_effect` for `WindowAggExec` as
    `CardinalityEffect::Equal`.
    - Implement `cardinality_effect` for `BoundedWindowAggExec` as
    `CardinalityEffect::Equal`.
    - Implement `cardinality_effect` for `UnionExec` as
    `CardinalityEffect::GreaterEqual`.
    
    ## Are these changes tested?
    
    Unit tested.
    
    ## Are there any user-facing changes?
    
    No.
    
    ## Additional note
    
    I used a coding agent for implementation/PR drafting and reviewed the
    changes myself. If this conflicts with project policy, please let me
    know.
---
 datafusion/physical-plan/src/union.rs              | 31 +++++++++++++-
 .../src/windows/bounded_window_agg_exec.rs         | 24 +++++++++++
 .../physical-plan/src/windows/window_agg_exec.rs   | 50 +++++++++++++++++++++-
 3 files changed, 102 insertions(+), 3 deletions(-)

diff --git a/datafusion/physical-plan/src/union.rs 
b/datafusion/physical-plan/src/union.rs
index 5c4f821c98..168048295d 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -34,8 +34,8 @@ use super::{
 };
 use crate::check_if_same_properties;
 use crate::execution_plan::{
-    InvariantLevel, boundedness_from_children, check_default_invariants,
-    emission_type_from_children,
+    CardinalityEffect, InvariantLevel, boundedness_from_children,
+    check_default_invariants, emission_type_from_children,
 };
 use crate::filter::FilterExec;
 use crate::filter_pushdown::{
@@ -360,6 +360,12 @@ impl ExecutionPlan for UnionExec {
         }
     }
 
+    fn cardinality_effect(&self) -> CardinalityEffect {
+        // Union combines rows from multiple inputs, so output rows are not 
tied
+        // to any single input and can only be constrained as greater-or-equal.
+        CardinalityEffect::GreaterEqual
+    }
+
     fn supports_limit_pushdown(&self) -> bool {
         true
     }
@@ -1210,4 +1216,25 @@ mod tests {
             )
         );
     }
+
+    #[test]
+    fn test_union_cardinality_effect() -> Result<()> {
+        let schema = create_test_schema()?;
+        let input1: Arc<dyn ExecutionPlan> =
+            Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
+        let input2: Arc<dyn ExecutionPlan> =
+            Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
+
+        let union = UnionExec::try_new(vec![input1, input2])?;
+        let union = union
+            .as_any()
+            .downcast_ref::<UnionExec>()
+            .expect("expected UnionExec for multiple inputs");
+
+        assert!(matches!(
+            union.cardinality_effect(),
+            CardinalityEffect::GreaterEqual
+        ));
+        Ok(())
+    }
 }
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs 
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index 0a3d592776..f589b4d748 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -66,6 +66,7 @@ use datafusion_physical_expr_common::sort_expr::{
     OrderingRequirements, PhysicalSortExpr,
 };
 
+use crate::execution_plan::CardinalityEffect;
 use ahash::RandomState;
 use futures::stream::Stream;
 use futures::{StreamExt, ready};
@@ -398,6 +399,10 @@ impl ExecutionPlan for BoundedWindowAggExec {
         let input_stat = self.input.partition_statistics(partition)?;
         self.statistics_helper(input_stat)
     }
+
+    fn cardinality_effect(&self) -> CardinalityEffect {
+        CardinalityEffect::Equal
+    }
 }
 
 /// Trait that specifies how we search for (or calculate) partitions. It has 
two
@@ -1266,6 +1271,7 @@ mod tests {
     use std::time::Duration;
 
     use crate::common::collect;
+    use crate::execution_plan::CardinalityEffect;
     use crate::expressions::PhysicalSortExpr;
     use crate::projection::{ProjectionExec, ProjectionExpr};
     use crate::streaming::{PartitionStream, StreamingTableExec};
@@ -1850,4 +1856,22 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_bounded_window_agg_cardinality_effect() -> Result<()> {
+        let schema = test_schema();
+        let input: Arc<dyn ExecutionPlan> =
+            Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
+        let plan = bounded_window_exec_pb_latent_range(input, 1, "hash", 
"sn")?;
+        let plan = plan
+            .as_any()
+            .downcast_ref::<BoundedWindowAggExec>()
+            .expect("expected BoundedWindowAggExec");
+
+        assert!(matches!(
+            plan.cardinality_effect(),
+            CardinalityEffect::Equal
+        ));
+        Ok(())
+    }
 }
diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs 
b/datafusion/physical-plan/src/windows/window_agg_exec.rs
index 01710d5b3c..f71f1cbfe6 100644
--- a/datafusion/physical-plan/src/windows/window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs
@@ -23,7 +23,7 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use super::utils::create_schema;
-use crate::execution_plan::EmissionType;
+use crate::execution_plan::{CardinalityEffect, EmissionType};
 use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
 use crate::windows::{
     calc_requirements, get_ordered_partition_by_indices, 
get_partition_by_sort_exprs,
@@ -315,6 +315,10 @@ impl ExecutionPlan for WindowAggExec {
             total_byte_size: Precision::Absent,
         })
     }
+
+    fn cardinality_effect(&self) -> CardinalityEffect {
+        CardinalityEffect::Equal
+    }
 }
 
 /// Compute the window aggregate columns
@@ -464,3 +468,47 @@ impl RecordBatchStream for WindowAggStream {
         Arc::clone(&self.schema)
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::test::TestMemoryExec;
+    use crate::windows::create_window_expr;
+    use arrow::datatypes::{DataType, Field, Schema};
+    use datafusion_common::ScalarValue;
+    use datafusion_expr::{
+        WindowFrame, WindowFrameBound, WindowFrameUnits, 
WindowFunctionDefinition,
+    };
+    use datafusion_functions_aggregate::count::count_udaf;
+
+    #[test]
+    fn test_window_agg_cardinality_effect() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![Field::new("a", 
DataType::Int64, true)]));
+        let input: Arc<dyn ExecutionPlan> =
+            Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
+        let args = vec![crate::expressions::col("a", &schema)?];
+        let window_expr = create_window_expr(
+            &WindowFunctionDefinition::AggregateUDF(count_udaf()),
+            "count(a)".to_string(),
+            &args,
+            &[],
+            &[],
+            Arc::new(WindowFrame::new_bounds(
+                WindowFrameUnits::Rows,
+                WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
+                WindowFrameBound::CurrentRow,
+            )),
+            Arc::clone(&schema),
+            false,
+            false,
+            None,
+        )?;
+
+        let window = WindowAggExec::try_new(vec![window_expr], input, true)?;
+        assert!(matches!(
+            window.cardinality_effect(),
+            CardinalityEffect::Equal
+        ));
+        Ok(())
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to