This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new dae1314003e [SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput 
should only be used with new outputs
dae1314003e is described below

commit dae1314003e6347f9403b24918cebe4de9fcd8dd
Author: Wenchen Fan <[email protected]>
AuthorDate: Fri Aug 11 21:02:07 2023 +0800

    [SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput should only be used 
with new outputs
    
    This resubmits https://github.com/apache/spark/pull/42408 to 3.5
    
    ### What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/41475 . It's 
risky to use `transformUpWithNewOutput` with existing attribute ids. If the 
plan contains duplicated attribute ids somewhere, then we will hit conflicting 
attributes and an assertion error will be thrown by 
`QueryPlan#transformUpWithNewOutput`.
    
    This PR takes a different approach. We canonicalize the plan first and then 
remove the alias-only project. Then we don't need `transformUpWithNewOutput` 
anymore as all attribute ids are normalized in the canonicalized plan.
    
    ### Why are the changes needed?
    
    fix potential bugs
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #42449 from cloud-fan/minor.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/analysis/CheckAnalysis.scala | 21 ++++++++++++---------
 1 file changed, 12 insertions(+), 9 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index e198fd58953..e063acaada5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -1071,13 +1071,13 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
     def check(plan: LogicalPlan): Unit = plan.foreach { node =>
       node match {
         case metrics @ CollectMetrics(name, _, _) =>
-          val simplifiedMetrics = simplifyPlanForCollectedMetrics(metrics)
+          val simplifiedMetrics = 
simplifyPlanForCollectedMetrics(metrics.canonicalized)
           metricsMap.get(name) match {
             case Some(other) =>
-              val simplifiedOther = simplifyPlanForCollectedMetrics(other)
+              val simplifiedOther = 
simplifyPlanForCollectedMetrics(other.canonicalized)
               // Exact duplicates are allowed. They can be the result
               // of a CTE that is used multiple times or a self join.
-              if (!simplifiedMetrics.sameResult(simplifiedOther)) {
+              if (simplifiedMetrics != simplifiedOther) {
                 failAnalysis(
                   errorClass = "DUPLICATED_METRICS_NAME",
                   messageParameters = Map("metricName" -> name))
@@ -1102,17 +1102,20 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
    * duplicates metric definition.
    */
   private def simplifyPlanForCollectedMetrics(plan: LogicalPlan): LogicalPlan 
= {
-    plan.transformUpWithNewOutput {
+    plan.resolveOperators {
       case p: Project if p.projectList.size == p.child.output.size =>
-        val assignExprIdOnly = p.projectList.zip(p.child.output).forall {
-          case (left: Alias, right: Attribute) =>
-            left.child.semanticEquals(right) && right.name == left.name
+        val assignExprIdOnly = p.projectList.zipWithIndex.forall {
+          case (Alias(attr: AttributeReference, _), index) =>
+            // The input plan of this method is already canonicalized. The 
attribute id becomes the
+            // ordinal of this attribute in the child outputs. So an 
alias-only Project means the
+            // the id of the aliased attribute is the same as its index in the 
project list.
+            attr.exprId.id == index
           case _ => false
         }
         if (assignExprIdOnly) {
-          (p.child, p.output.zip(p.child.output))
+          p.child
         } else {
-          (p, Nil)
+          p
         }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to