This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 37028fafc4f9 [SPARK-52079] Normalize order of attributes in inner 
project lists
37028fafc4f9 is described below

commit 37028fafc4f9fc873195a88f0840ab69edcf9d2b
Author: Mihailo Timotic <mihailo.timo...@databricks.com>
AuthorDate: Mon May 12 14:05:00 2025 +0200

    [SPARK-52079] Normalize order of attributes in inner project lists
    
    ### What changes were proposed in this pull request?
    Normalize order of attributes in inner project lists. This PR is 
reintroducing a change made in https://github.com/apache/spark/pull/49230. This 
change was reverted in https://github.com/apache/spark/pull/49460 as it was no 
longer necessary, but is now needed again for a different use case
    
    ### Why are the changes needed?
    Fixed-point analyzer resolves `Sort` and `Having` on top of an `Aggregate` 
via `TempResolvedColumn`. By doing this, fixed-point first extends an 
`Aggregate` with the resolved column and then proceeds with other resolution 
rules. This is a problem when resolving LCAs in fixed-point and single-pass, 
because single-pass adds this missing column after LCA resolution and fixed 
point first adds the column and only then performs LCA resolution. This causes 
plan mismatches where we have diffe [...]
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Added test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #50858 from mihailotim-db/mihailotim-db/normalize_inner_project.
    
    Authored-by: Mihailo Timotic <mihailo.timo...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/plans/NormalizePlan.scala   | 25 +++++++++++++++-------
 .../sql/catalyst/plans/NormalizePlanSuite.scala    | 20 +++++++++++++++++
 2 files changed, 37 insertions(+), 8 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
index 18339e81b682..bf2b38c45f85 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
@@ -154,15 +154,14 @@ object NormalizePlan extends PredicateHelper {
             
.getTagValue(DeduplicateRelations.PROJECT_FOR_EXPRESSION_ID_DEDUPLICATION)
             .isDefined =>
         project.child
+      case Project(outerProjectList, innerProject: Project) =>
+        val normalizedInnerProjectList = 
normalizeProjectList(innerProject.projectList)
+        val orderedInnerProjectList = normalizedInnerProjectList.sortBy(_.name)
+        val newInnerProject =
+          Project(orderedInnerProjectList, innerProject.child)
+        Project(normalizeProjectList(outerProjectList), newInnerProject)
       case Project(projectList, child) =>
-        val projList = projectList
-          .map { e =>
-            e.transformUp {
-              case g: GetViewColumnByNameAndOrdinal => g.copy(viewDDL = None)
-            }
-          }
-          .asInstanceOf[Seq[NamedExpression]]
-        Project(projList, child)
+        Project(normalizeProjectList(projectList), child)
       case c: KeepAnalyzedQuery => c.storeAnalyzedQuery()
       case localRelation: LocalRelation if !localRelation.data.isEmpty =>
         /**
@@ -198,6 +197,16 @@ object NormalizePlan extends PredicateHelper {
     case LessThanOrEqual(l, r) if l.hashCode() > r.hashCode() => 
GreaterThanOrEqual(r, l)
     case _ => condition // Don't reorder.
   }
+
+  private def normalizeProjectList(projectList: Seq[NamedExpression]): 
Seq[NamedExpression] = {
+    projectList
+      .map { e =>
+        e.transformUp {
+          case g: GetViewColumnByNameAndOrdinal => g.copy(viewDDL = None)
+        }
+      }
+      .asInstanceOf[Seq[NamedExpression]]
+  }
 }
 
 class CteIdNormalizer {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/NormalizePlanSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/NormalizePlanSuite.scala
index d2ac103e14f7..575b10c13349 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/NormalizePlanSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/NormalizePlanSuite.scala
@@ -38,6 +38,26 @@ import org.apache.spark.sql.types.BooleanType
 
 class NormalizePlanSuite extends SparkFunSuite with SQLConfHelper {
 
+  test("Normalize Project") {
+    val baselineCol1 = $"col1".int
+    val testCol1 = baselineCol1.newInstance()
+    val baselinePlan = LocalRelation(baselineCol1).select(baselineCol1)
+    val testPlan = LocalRelation(testCol1).select(testCol1)
+
+    assert(baselinePlan != testPlan)
+    assert(NormalizePlan(baselinePlan) == NormalizePlan(testPlan))
+  }
+
+  test("Normalize ordering in a project list of an inner Project") {
+    val baselinePlan =
+      LocalRelation($"col1".int, $"col2".string).select($"col1", 
$"col2").select($"col1")
+    val testPlan =
+      LocalRelation($"col1".int, $"col2".string).select($"col2", 
$"col1").select($"col1")
+
+    assert(baselinePlan != testPlan)
+    assert(NormalizePlan(baselinePlan) == NormalizePlan(testPlan))
+  }
+
   test("Normalize InheritAnalysisRules expressions") {
     val castWithoutTimezone =
       Cast(child = Literal(1), dataType = BooleanType, ansiEnabled = 
conf.ansiEnabled)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to