This is an automated email from the ASF dual-hosted git repository.

ptoth pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new d529d32c5c0 [SPARK-44934][SQL] Use outputSet instead of output to 
check if column pruning occurred in PushdownPredicateAndPruneColumnsForCTEDef
d529d32c5c0 is described below

commit d529d32c5c06d5bbc34c91f70b0f3d8281ed5347
Author: Wen Yuen Pang <wenyuen.p...@databricks.com>
AuthorDate: Thu Aug 24 17:00:04 2023 +0200

    [SPARK-44934][SQL] Use outputSet instead of output to check if column 
pruning occurred in PushdownPredicateAndPruneColumnsForCTEDef
    
    ### What changes were proposed in this pull request?
    
    Originally, when a CTE has duplicate expression IDs in its output, the rule 
PushdownPredicatesAndPruneColumnsForCTEDef wrongly assesses that the columns in 
the CTE were pruned, as it compares the size of the attribute set containing 
the union of columns (which is unique) and the original output of the CTE 
(which contains duplicate columns) and notices that the former is less than the 
latter. This causes incorrect pruning of the CTE output, resulting in a missing 
reference and causing  [...]
    
    This PR changes the logic to use the needsPruning function to assess 
whether a CTE has been pruned, which uses the outputSet to check if any columns 
has been pruned instead of the output.
    
    ### Why are the changes needed?
    
    The incorrect behaviour of PushdownPredicatesAndPruneColumnsForCTEDef in 
CTEs with duplicate expression IDs in its output causes a crash when such a 
query is run.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit test for the crashing case was added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #42635 from wenyuen-db/SPARK-44934.
    
    Authored-by: Wen Yuen Pang <wenyuen.p...@databricks.com>
    Signed-off-by: Peter Toth <peter.t...@gmail.com>
    (cherry picked from commit 3b405948ee47702e5a7250dc27430836145b0e19)
    Signed-off-by: Peter Toth <peter.t...@gmail.com>
---
 ...ushdownPredicatesAndPruneColumnsForCTEDef.scala |  2 +-
 .../org/apache/spark/sql/CTEInlineSuite.scala      | 33 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala
index 2195eef2fc9..ffe897d1764 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala
@@ -142,7 +142,7 @@ object PushdownPredicatesAndPruneColumnsForCTEDef extends 
Rule[LogicalPlan] {
 
     case cteRef @ CTERelationRef(cteId, _, output, _) =>
       val (cteDef, _, _, newAttrSet) = cteMap(cteId)
-      if (newAttrSet.size < output.size) {
+      if (needsPruning(cteDef.child, newAttrSet)) {
         val indices = newAttrSet.toSeq.map(cteDef.output.indexOf)
         val newOutput = indices.map(output)
         cteRef.copy(output = newOutput)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
index e758c6f8df5..88109f25659 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
@@ -639,6 +639,39 @@ abstract class CTEInlineSuiteBase
       }
     }
   }
+
+  test("SPARK-44934: CTE column pruning handles duplicate exprIds in CTE") {
+    withTempView("t") {
+      Seq((0, 1, 2), (1, 2, 3)).toDF("c1", "c2", 
"c3").createOrReplaceTempView("t")
+      val query =
+        """
+          |with cte as (
+          |  select c1, c1, c2, c3 from t where random() > 0
+          |)
+          |select cte.c1, cte2.c1, cte.c2, cte2.c3 from
+          |  (select c1, c2 from cte) cte
+          |    inner join
+          |  (select c1, c3 from cte) cte2
+          |    on cte.c1 = cte2.c1
+          """.stripMargin
+
+      val df = sql(query)
+      checkAnswer(df, Row(0, 0, 1, 2) :: Row(1, 1, 2, 3) :: Nil)
+      assert(
+        df.queryExecution.analyzed.collect {
+          case WithCTE(_, cteDefs) => cteDefs
+        }.head.length == 1,
+        "With-CTE should contain 1 CTE def after analysis.")
+      val cteRepartitions = df.queryExecution.optimizedPlan.collect {
+        case r: RepartitionOperation => r
+      }
+      assert(cteRepartitions.length == 2,
+        "CTE should not be inlined after optimization.")
+      assert(cteRepartitions.head.collectFirst {
+        case p: Project if p.projectList.length == 4 => p
+      }.isDefined, "CTE columns should not be pruned.")
+    }
+  }
 }
 
 class CTEInlineSuiteAEOff extends CTEInlineSuiteBase with 
DisableAdaptiveExecutionSuite


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to