This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 898f3d8a0bd Revert "[SPARK-43979][SQL][FOLLOWUP] 
transformUpWithNewOutput` should only be used with new outputs"
898f3d8a0bd is described below

commit 898f3d8a0bdc044838a576789c9f26e96e73bc60
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Aug 10 23:04:07 2023 -0700

    Revert "[SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput` should only 
be used with new outputs"
    
    This reverts commit b8204d1b89eea0c32e5269fb155651157e2c96e8.
---
 .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala   | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 848749f9e3b..e198fd58953 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -1071,13 +1071,13 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
     def check(plan: LogicalPlan): Unit = plan.foreach { node =>
       node match {
         case metrics @ CollectMetrics(name, _, _) =>
-          val simplifiedMetrics = 
simplifyPlanForCollectedMetrics(metrics.canonicalized)
+          val simplifiedMetrics = simplifyPlanForCollectedMetrics(metrics)
           metricsMap.get(name) match {
             case Some(other) =>
-              val simplifiedOther = 
simplifyPlanForCollectedMetrics(other.canonicalized)
+              val simplifiedOther = simplifyPlanForCollectedMetrics(other)
               // Exact duplicates are allowed. They can be the result
               // of a CTE that is used multiple times or a self join.
-              if (simplifiedMetrics != simplifiedOther) {
+              if (!simplifiedMetrics.sameResult(simplifiedOther)) {
                 failAnalysis(
                   errorClass = "DUPLICATED_METRICS_NAME",
                   messageParameters = Map("metricName" -> name))
@@ -1102,7 +1102,7 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
    * duplicates metric definition.
    */
   private def simplifyPlanForCollectedMetrics(plan: LogicalPlan): LogicalPlan 
= {
-    plan.resolveOperators {
+    plan.transformUpWithNewOutput {
       case p: Project if p.projectList.size == p.child.output.size =>
         val assignExprIdOnly = p.projectList.zip(p.child.output).forall {
           case (left: Alias, right: Attribute) =>
@@ -1110,9 +1110,9 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
           case _ => false
         }
         if (assignExprIdOnly) {
-          p.child
+          (p.child, p.output.zip(p.child.output))
         } else {
-          p
+          (p, Nil)
         }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to