This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b676a2d9a67 [SPARK-43979][SQL][FOLLOW-UP] Simplify metrics plan should replace nodes by new attributes b676a2d9a67 is described below commit b676a2d9a67c121c563979f93605d1215473ae32 Author: Rui Wang <rui.w...@databricks.com> AuthorDate: Sun Jun 18 13:45:46 2023 -0700 [SPARK-43979][SQL][FOLLOW-UP] Simplify metrics plan should replace nodes by new attributes ### What changes were proposed in this pull request? https://github.com/apache/spark/pull/41475 introduces a fix that we remove extra alias-only project which might cause same metrics mismatch over the query plan. However, to make it more robust, we need to replace the attributes if we need to drop the extra Project. ### Why are the changes needed? Enhance the fix to cover more test case. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #41620 from amaliujia/fix_json_followup. Authored-by: Rui Wang <rui.w...@databricks.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../spark/sql/catalyst/analysis/CheckAnalysis.scala | 6 +++--- .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 16 ++++++++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index d3dc9a75dd5..e47966f1e27 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -1081,7 +1081,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB * duplicates metric definition. */ private def simplifyPlanForCollectedMetrics(plan: LogicalPlan): LogicalPlan = { - plan.resolveOperatorsDown { + plan.resolveOperatorsUpWithNewOutput { case p: Project if p.projectList.size == p.child.output.size => val assignExprIdOnly = p.projectList.zip(p.child.output).forall { case (left: Alias, right: Attribute) => @@ -1089,9 +1089,9 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB case _ => false } if (assignExprIdOnly) { - p.child + (p.child, p.output.zip(p.child.output)) } else { - p + (p, Nil) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 7b4a4a52a85..381c7714402 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -4682,6 +4682,22 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark """ ).observe("my_event", count("*")) df1.crossJoin(df1) + + val df2 = spark.sql( + """ + WITH t1 AS ( + SELECT customer_id, age, row_number() OVER(PARTITION BY customer_id ORDER BY age ASC) rn + FROM tmp_view) + SELECT customer_id, age FROM t1 WHERE rn = 1 + """.stripMargin + ).observe("my_event2", count("*")).as("df2") + + val df3 = spark.range(1, 5).toDF("id").withColumn("zaak_id", lit(1)) + .withColumn("targetid", lit(2)).as("df3") + val df4 = spark.range(1, 5).toDF("id").withColumn("zaak_id", lit(2)).as("df4") + val df5 = df4.join(df2, col("df4.id") === col("df2.customer_id"), "inner") + val df6 = df3.join(df2, col("df3.zaak_id") === col("df2.customer_id"), "outer") + df5.crossJoin(df6) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org