This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch branch-53
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-53 by this push:
     new 2c1ca2f337 [branch-53] fix: SanityCheckPlan error with window 
functions and NVL filter (#20231) (#20932)
2c1ca2f337 is described below

commit 2c1ca2f33776999bf4e2f7ab86214845a0666635
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Mar 13 20:22:17 2026 -0400

    [branch-53] fix: SanityCheckPlan error with window functions and NVL filter 
(#20231) (#20932)
    
    - Part of https://github.com/apache/datafusion/issues/19692
    - Closes https://github.com/apache/datafusion/issues/20194 on branch-53
    
    This PR:
    - Backports https://github.com/apache/datafusion/pull/20231 from
    @EeshanBembi to the branch-53 line
    
    Co-authored-by: EeshanBembi <[email protected]>
---
 datafusion/physical-plan/src/filter.rs        | 101 ++++++++++++++++++++++----
 datafusion/sqllogictest/test_files/window.slt |  46 ++++++++++++
 2 files changed, 134 insertions(+), 13 deletions(-)

diff --git a/datafusion/physical-plan/src/filter.rs 
b/datafusion/physical-plan/src/filter.rs
index 7bc5d346cd..21f5727d86 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -57,12 +57,12 @@ use datafusion_common::{
 use datafusion_execution::TaskContext;
 use datafusion_expr::Operator;
 use datafusion_physical_expr::equivalence::ProjectionMapping;
-use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit};
+use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, lit};
 use datafusion_physical_expr::intervals::utils::check_support;
 use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
 use datafusion_physical_expr::{
-    AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, 
PhysicalExpr, analyze,
-    conjunction, split_conjunction,
+    AcrossPartitions, AnalysisContext, ConstExpr, EquivalenceProperties, 
ExprBoundaries,
+    PhysicalExpr, analyze, conjunction, split_conjunction,
 };
 
 use datafusion_physical_expr_common::physical_expr::fmt_sql;
@@ -348,6 +348,20 @@ impl FilterExec {
         })
     }
 
+    /// Returns the `AcrossPartitions` value for `expr` if it is constant:
+    /// either already known constant in `input_eqs`, or a `Literal`
+    /// (which is inherently constant across all partitions).
+    fn expr_constant_or_literal(
+        expr: &Arc<dyn PhysicalExpr>,
+        input_eqs: &EquivalenceProperties,
+    ) -> Option<AcrossPartitions> {
+        input_eqs.is_expr_constant(expr).or_else(|| {
+            expr.as_any()
+                .downcast_ref::<Literal>()
+                .map(|l| AcrossPartitions::Uniform(Some(l.value().clone())))
+        })
+    }
+
     fn extend_constants(
         input: &Arc<dyn ExecutionPlan>,
         predicate: &Arc<dyn PhysicalExpr>,
@@ -360,18 +374,24 @@ impl FilterExec {
             if let Some(binary) = 
conjunction.as_any().downcast_ref::<BinaryExpr>()
                 && binary.op() == &Operator::Eq
             {
-                // Filter evaluates to single value for all partitions
-                if input_eqs.is_expr_constant(binary.left()).is_some() {
-                    let across = input_eqs
-                        .is_expr_constant(binary.right())
-                        .unwrap_or_default();
+                // Check if either side is constant — either already known
+                // constant from the input equivalence properties, or a literal
+                // value (which is inherently constant across all partitions).
+                let left_const = Self::expr_constant_or_literal(binary.left(), 
input_eqs);
+                let right_const =
+                    Self::expr_constant_or_literal(binary.right(), input_eqs);
+
+                if let Some(left_across) = left_const {
+                    // LEFT is constant, so RIGHT must also be constant.
+                    // Use RIGHT's known across value if available, otherwise
+                    // propagate LEFT's (e.g. Uniform from a literal).
+                    let across = right_const.unwrap_or(left_across);
                     res_constants
                         .push(ConstExpr::new(Arc::clone(binary.right()), 
across));
-                } else if input_eqs.is_expr_constant(binary.right()).is_some() 
{
-                    let across = input_eqs
-                        .is_expr_constant(binary.left())
-                        .unwrap_or_default();
-                    
res_constants.push(ConstExpr::new(Arc::clone(binary.left()), across));
+                } else if let Some(right_across) = right_const {
+                    // RIGHT is constant, so LEFT must also be constant.
+                    res_constants
+                        .push(ConstExpr::new(Arc::clone(binary.left()), 
right_across));
                 }
             }
         }
@@ -1003,6 +1023,19 @@ fn collect_columns_from_predicate_inner(
     let predicates = split_conjunction(predicate);
     predicates.into_iter().for_each(|p| {
         if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
+            // Only extract pairs where at least one side is a Column 
reference.
+            // Pairs like `complex_expr = literal` should not create 
equivalence
+            // classes — the literal could appear in many unrelated expressions
+            // (e.g. sort keys), and normalize_expr's deep traversal would
+            // replace those occurrences with the complex expression, 
corrupting
+            // sort orderings. Constant propagation for such pairs is handled
+            // separately by `extend_constants`.
+            let has_direct_column_operand =
+                binary.left().as_any().downcast_ref::<Column>().is_some()
+                    || 
binary.right().as_any().downcast_ref::<Column>().is_some();
+            if !has_direct_column_operand {
+                return;
+            }
             match binary.op() {
                 Operator::Eq => {
                     eq_predicate_columns.push((binary.left(), binary.right()))
@@ -2155,6 +2188,48 @@ mod tests {
         Ok(())
     }
 
+    /// Regression test for https://github.com/apache/datafusion/issues/20194
+    ///
+    /// `collect_columns_from_predicate_inner` should only extract equality
+    /// pairs where at least one side is a Column. Pairs like
+    /// `complex_expr = literal` must not create equivalence classes because
+    /// `normalize_expr`'s deep traversal would replace the literal inside
+    /// unrelated expressions (e.g. sort keys) with the complex expression.
+    #[test]
+    fn test_collect_columns_skips_non_column_pairs() -> Result<()> {
+        let schema = test::aggr_test_schema();
+
+        // Simulate: nvl(c2, 0) = 0  →  (c2 IS DISTINCT FROM 0) = 0
+        // Neither side is a Column, so this should NOT be extracted.
+        let complex_expr: Arc<dyn PhysicalExpr> = binary(
+            col("c2", &schema)?,
+            Operator::IsDistinctFrom,
+            lit(0u32),
+            &schema,
+        )?;
+        let predicate: Arc<dyn PhysicalExpr> =
+            binary(complex_expr, Operator::Eq, lit(0u32), &schema)?;
+
+        let (equal_pairs, _) = 
collect_columns_from_predicate_inner(&predicate);
+        assert_eq!(
+            0,
+            equal_pairs.len(),
+            "Should not extract equality pairs where neither side is a Column"
+        );
+
+        // But col = literal should still be extracted
+        let predicate: Arc<dyn PhysicalExpr> =
+            binary(col("c2", &schema)?, Operator::Eq, lit(0u32), &schema)?;
+        let (equal_pairs, _) = 
collect_columns_from_predicate_inner(&predicate);
+        assert_eq!(
+            1,
+            equal_pairs.len(),
+            "Should extract equality pairs where one side is a Column"
+        );
+
+        Ok(())
+    }
+
     /// Columns with Absent min/max statistics should remain Absent after
     /// FilterExec.
     #[tokio::test]
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index c3e6f39adb..9fc053d38c 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -6081,3 +6081,49 @@ WHERE acctbal > (
 );
 ----
 1
+
+# Regression test for https://github.com/apache/datafusion/issues/20194
+# Window function with CASE WHEN in ORDER BY combined with NVL filter
+# should not trigger SanityCheckPlan error from equivalence normalization
+# replacing literals in sort expressions with complex filter expressions.
+statement ok
+CREATE TABLE issue_20194_t1 (
+  value_1_1 decimal(25) NULL,
+  value_1_2 int NULL,
+  value_1_3 bigint NULL
+);
+
+statement ok
+CREATE TABLE issue_20194_t2 (
+  value_2_1 bigint NULL,
+  value_2_2 varchar(140) NULL,
+  value_2_3 varchar(140) NULL
+);
+
+statement ok
+INSERT INTO issue_20194_t1 (value_1_1, value_1_2, value_1_3) VALUES 
(6774502793, 10040029, 1120);
+
+statement ok
+INSERT INTO issue_20194_t2 (value_2_1, value_2_2, value_2_3) VALUES (1120, 
'0', '0');
+
+query RII
+SELECT
+  t1.value_1_1, t1.value_1_2,
+  ROW_NUMBER() OVER (
+    PARTITION BY t1.value_1_1, t1.value_1_2
+    ORDER BY
+      CASE WHEN t2.value_2_2 = '0' THEN 1 ELSE 0 END ASC,
+      CASE WHEN t2.value_2_3 = '0' THEN 1 ELSE 0 END ASC
+  ) AS ord
+FROM issue_20194_t1 t1
+INNER JOIN issue_20194_t2 t2
+  ON t1.value_1_3 = t2.value_2_1
+  AND nvl(t2.value_2_3, '0') = '0';
+----
+6774502793 10040029 1
+
+statement ok
+DROP TABLE issue_20194_t1;
+
+statement ok
+DROP TABLE issue_20194_t2;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to