This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 967f2b6b2e60 [SPARK-53308][SQL] Don't remove aliases in 
RemoveRedundantAliases that would cause duplicates
967f2b6b2e60 is described below

commit 967f2b6b2e60a9a8e9851284d2d1107036b0f690
Author: mihailoale-db <[email protected]>
AuthorDate: Wed Aug 20 11:14:25 2025 +0800

    [SPARK-53308][SQL] Don't remove aliases in RemoveRedundantAliases that 
would cause duplicates
    
    ### What changes were proposed in this pull request?
    In case a `Project`, `Aggregate` or `Window` is a child of `Union`, we 
don't remove an `Alias` in case it is on top of an `Attribute` which exists in 
the output set of the operator. This is needed because otherwise, we end up 
having an operator with duplicates in its output. When that happens, `Union` is 
not resolved and we fail (but we shouldn't).
    Consider this example:
    ```
    SELECT col1 FROM values(1) WHERE 100 IN (SELECT col1 UNION SELECT col1);
    ```
    
    In this PR I propose that we fix above query (it should pass, now it fails) 
by not removing `Alias`es under `Union` that would cause duplicates.
    
    ### Why are the changes needed?
    To fix a query pattern.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Added test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #52060 from mihailoale-db/unionduplicates.
    
    Authored-by: mihailoale-db <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   | 63 ++++++++++++++++++----
 .../RemoveRedundantAliasAndProjectSuite.scala      | 33 +++++++++++-
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala |  6 +++
 3 files changed, 92 insertions(+), 10 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 4b149ebfb1f3..ef505a014411 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -640,16 +640,23 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
       case u: Union =>
         var first = true
         plan.mapChildren { child =>
-          if (first) {
-            first = false
-            // `Union` inherits its first child's outputs. We don't remove 
those aliases from the
-            // first child's tree that prevent aliased attributes to appear 
multiple times in the
-            // `Union`'s output. A parent projection node on the top of an 
`Union` with non-unique
-            // output attributes could return incorrect result.
-            removeRedundantAliases(child, excluded ++ child.outputSet)
+          if (!conf.unionIsResolvedWhenDuplicatesPerChildResolved || 
shouldRemoveAliasesUnderUnion(
+            child
+          )) {
+            if (first) {
+              first = false
+              // `Union` inherits its first child's outputs. We don't remove 
those aliases from the
+              // first child's tree that prevent aliased attributes to appear 
multiple times in the
+              // `Union`'s output. A parent projection node on the top of an 
`Union` with
+              // non-unique output attributes could return incorrect result.
+              removeRedundantAliases(child, excluded ++ child.outputSet)
+            } else {
+              // We don't need to exclude those attributes that `Union` 
inherits from its first
+              // child.
+              removeRedundantAliases(child, excluded -- 
u.children.head.outputSet)
+            }
           } else {
-            // We don't need to exclude those attributes that `Union` inherits 
from its first child.
-            removeRedundantAliases(child, excluded -- 
u.children.head.outputSet)
+            child
           }
         }
 
@@ -694,6 +701,44 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
     }
   }
 
+  /**
+   * In case a [[Project]], [[Aggregate]] or [[Window]] is a child of 
[[Union]], we don't remove an
+   * [[Alias]] in case it is on top of an [[Attribute]] which exists in the 
output set of the
+   * operator. This is needed because otherwise, we end up having an operator 
with duplicates in
+   * its output. When that happens, [[Union]] is not resolved, and we fail 
(but we shouldn't).
+   * In this example:
+   *
+   * {{{ SELECT col1 FROM values(1) WHERE 100 IN (SELECT col1 UNION SELECT 
col1); }}}
+   *
+   * Without `shouldRemoveAliasesUnderUnion` check, we would remove the 
[[Alias]] introduced in
+   * [[DeduplicateRelations]] rule (in a [[Project]] tagged as
+   * `PROJECT_FOR_EXPRESSION_ID_DEDUPLICATION`), the result is unresolved 
[[Union]] which causes the
+   * failure. With the check, [[Alias]] stays, and we resolve the plan 
properly.
+   */
+  private def shouldRemoveAliasesUnderUnion(operator: LogicalPlan): Boolean = {
+    operator match {
+      case project: Project =>
+        project.projectList.forall {
+          case Alias(attribute: Attribute, _) =>
+            !project.outputSet.contains(attribute)
+          case _ => true
+        }
+      case aggregate: Aggregate =>
+        aggregate.aggregateExpressions.forall {
+          case Alias(attribute: Attribute, _) =>
+            !aggregate.outputSet.contains(attribute)
+          case _ => true
+        }
+      case window: Window =>
+        window.windowExpressions.forall {
+          case Alias(attribute: Attribute, _) =>
+            !window.outputSet.contains(attribute)
+          case _ => true
+        }
+      case other => true
+    }
+  }
+
   def apply(plan: LogicalPlan): LogicalPlan = removeRedundantAliases(plan, 
AttributeSet.empty)
 }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
index 552a638f6e61..fec683183100 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.sql.types.{IntegerType, MetadataBuilder}
 
 class RemoveRedundantAliasAndProjectSuite extends PlanTest {
 
@@ -238,4 +238,35 @@ class RemoveRedundantAliasAndProjectSuite extends PlanTest 
{
       comparePlans(optimized, expectedWhenNotEnabled)
     }
   }
+
+  test("SPARK-53308: Don't remove aliases in RemoveRedundantAliases that would 
cause duplicates") {
+    val exprId = NamedExpression.newExprId
+    val attribute = AttributeReference("attr", IntegerType)(exprId = exprId)
+    val project = Project(
+      Seq(
+        Alias(attribute, "attr")(),
+        attribute
+      ),
+      LocalRelation(attribute)
+    )
+    val projectWithoutAlias =
+      Project(
+        Seq(
+          attribute,
+          attribute
+        ),
+        LocalRelation(attribute)
+      )
+    val union = Union(Seq(project, project))
+
+    
withSQLConf(SQLConf.UNION_IS_RESOLVED_WHEN_DUPLICATES_PER_CHILD_RESOLVED.key -> 
"true") {
+      val optimized = Optimize.execute(union)
+      comparePlans(union, optimized)
+    }
+
+    
withSQLConf(SQLConf.UNION_IS_RESOLVED_WHEN_DUPLICATES_PER_CHILD_RESOLVED.key -> 
"false") {
+      val optimized = Optimize.execute(union)
+      comparePlans(optimized, Union(Seq(project, projectWithoutAlias)))
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 905f34cd7d34..89a6a12a7e4e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -5073,6 +5073,12 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
 
     checkAnswer(df, Seq(Row(null, null, 820), Row(null, "east", 420), Row("a", 
null, 370)))
   }
+
+  test("SPARK-53308: Don't remove aliases in RemoveRedundantAliases that would 
cause duplicates") {
+    val df = sql("SELECT col1 FROM values(1) WHERE 1 IN (SELECT col1 UNION 
SELECT col1);")
+
+    checkAnswer(df, Row(1))
+  }
 }
 
 case class Foo(bar: Option[String])


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to