This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 89fd0827015d [SPARK-55854][SQL] Tag pass-through duplicate attributes 
in Expand output to prevent AMBIGUOUS_REFERENCE
89fd0827015d is described below

commit 89fd0827015d1771ec8d3a622ab39c92446e2366
Author: Mihailo Timotic <[email protected]>
AuthorDate: Mon Mar 9 19:26:02 2026 +0800

    [SPARK-55854][SQL] Tag pass-through duplicate attributes in Expand output 
to prevent AMBIGUOUS_REFERENCE
    
    ### What changes were proposed in this pull request?
    
    When `Expand` is created for `ROLLUP`/`CUBE`/`GROUPING SETS`, its output 
contains duplicate-named attributes: the original pass-through child attribute 
(e.g., `a#0`) and a new grouping instance created via `newInstance()` (e.g., 
`a#5`). Both share the same name, which causes `AMBIGUOUS_REFERENCE` errors 
when any operator performs name-based resolution against the `Expand` output.
    
    This PR tags pass-through child attributes with `__is_duplicate` metadata 
in `Expand.apply()`, so that `AttributeSeq.getCandidatesForResolution` 
deprioritizes them when multiple candidates match by name. This is the same 
mechanism already used by `DeduplicateUnionChildOutput` for Union operators.
    
    Only attributes whose `ExprId` matches a simple `Attribute` child of a 
`groupByAlias` are tagged — complex grouping expressions (e.g., `c1 + 1`) 
produce aliases with different names than any child column, so no name conflict 
arises. ExprId-based resolution (used for already-resolved expressions like 
aggregate functions) is unaffected.
    
    The fix is guarded behind a new internal config 
`spark.sql.analyzer.expandTagPassthroughDuplicates` (default `true`).
    
    ### Why are the changes needed?
    
    The `Expand` operator for `ROLLUP`/`CUBE`/`GROUPING SETS` produces an 
output like `[a#0, b#1, c#2, a#5, gid#3]` where `a#0` is the pass-through child 
attribute and `a#5` is the new grouping attribute. Both have the name `"a"`. 
When any operator above the `Expand` resolves the reference `"a"` by name 
(e.g., a `Filter` or `Project` inserted by a custom analysis rule, or a 
correlated subquery whose outer reference resolves against the `Expand`'s 
output), `getCandidatesForResolution` retu [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. The fix prevents a latent `AMBIGUOUS_REFERENCE` error in name-based 
resolution against `Expand` output. Standard SQL queries are not affected 
because the `Aggregate` above the `Expand` already shields upper operators from 
seeing the duplicate names. The fix is defensive and makes the `Expand` output 
safe for any future feature or custom rule that may resolve names against it.
    
    ### How was this patch tested?
    
    7 new unit tests in `ResolveGroupingAnalyticsSuite`:
    
    - **Tagging behavior (flag enabled, default):**
      - Tags pass-through attribute for simple single-column grouping 
(`ROLLUP(a)`)
      - Does not tag for complex grouping expressions (`ROLLUP(a + 1)`)
      - Tags multiple pass-through attributes for multi-column grouping 
(`ROLLUP(a, b)`)
      - Preserves `ExprId` and name on tagged attributes
      - Demonstrates that `resolve("a")` succeeds with tagging and throws 
`AMBIGUOUS_REFERENCE` without tagging
    
    - **Flag disabled behavior:**
      - No tagging for single-column grouping; `resolve("a")` throws 
`AMBIGUOUS_REFERENCE`
      - No tagging for multi-column grouping; `resolve("a")` and `resolve("b")` 
both throw `AMBIGUOUS_REFERENCE`
    
    All 9 pre-existing tests in `ResolveGroupingAnalyticsSuite` continue to 
pass.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude claude-4.6-opus-high-thinking (Cursor)
    
    Closes #54641 from mihailotim-db/mihailo-timotic_data/expand_qualify.
    
    Authored-by: Mihailo Timotic <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../plans/logical/basicLogicalOperators.scala      |  55 ++++++-
 .../org/apache/spark/sql/internal/SQLConf.scala    |  11 ++
 .../analysis/ResolveGroupingAnalyticsSuite.scala   | 182 +++++++++++++++++++++
 3 files changed, 246 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 3fa7a4f4685e..c18b7fcecc48 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -1420,14 +1420,65 @@ object Expand {
       }
     }
 
+    val taggedChildOutput =
+      if 
(SQLConf.get.getConf(SQLConf.EXPAND_TAG_PASSTHROUGH_DUPLICATES_ENABLED)) {
+        tagPassthroughDuplicates(childOutput, groupByAliases)
+      } else {
+        childOutput
+      }
+
     val output = if (hasDuplicateGroupingSets) {
       val gpos = AttributeReference("_gen_grouping_pos", IntegerType, false)()
-      childOutput ++ groupByAttrs.map(_.newInstance()) :+ gid :+ gpos
+      taggedChildOutput ++ groupByAttrs.map(_.newInstance()) :+ gid :+ gpos
     } else {
-      childOutput ++ groupByAttrs.map(_.newInstance()) :+ gid
+      taggedChildOutput ++ groupByAttrs.map(_.newInstance()) :+ gid
     }
     Expand(projections, output, Project(childOutput ++ groupByAliases, child))
   }
+
+  /**
+   * Tags child output attributes that will be duplicated in the Expand output 
with
+   * `__is_duplicate` metadata.
+   *
+   * When a `groupByAlias` wraps a simple attribute (e.g., `Alias(c1#0, 
"c1")`), the
+   * Expand output contains both the pass-through child attribute (`c1#0`) and 
a new
+   * grouping instance created via `newInstance()` (e.g., `c1#5`). Both share 
the same
+   * name, which causes `AMBIGUOUS_REFERENCE` errors during name-based 
resolution.
+   *
+   * By tagging the pass-through copy with `__is_duplicate`,
+   * `AttributeSeq.getCandidatesForResolution` filters it out when multiple 
candidates
+   * match by name, allowing the produced grouping attribute to be resolved 
instead.
+   * ExprId-based resolution (used for already-resolved expressions like 
aggregate
+   * functions) is unaffected by this metadata.
+   *
+   * Only child attributes whose ExprId matches a `groupByAlias` child that is 
a simple
+   * `Attribute` are tagged. Complex grouping expressions (e.g., `c1 + 1`) 
produce
+   * aliases with different names than any child column, so no name conflict 
arises.
+   */
+  private def tagPassthroughDuplicates(
+      childOutput: Seq[Attribute],
+      groupByAliases: Seq[Alias]): Seq[Attribute] = {
+    val duplicatedExprIds = new java.util.HashSet[ExprId](groupByAliases.size)
+    groupByAliases.foreach {
+      case Alias(attr: Attribute, _) => duplicatedExprIds.add(attr.exprId)
+      case _ =>
+    }
+
+    if (!duplicatedExprIds.isEmpty) {
+      childOutput.map { attr =>
+        if (duplicatedExprIds.contains(attr.exprId)) {
+          attr.withMetadata(new MetadataBuilder()
+            .withMetadata(attr.metadata)
+            .putNull("__is_duplicate")
+            .build())
+        } else {
+          attr
+        }
+      }
+    } else {
+      childOutput
+    }
+  }
 }
 
 /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 060ee811dd9c..84f2f6b90aa7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -271,6 +271,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val EXPAND_TAG_PASSTHROUGH_DUPLICATES_ENABLED =
+    buildConf("spark.sql.analyzer.expandTagPassthroughDuplicates")
+      .internal()
+      .version("4.2.0")
+      .doc(
+        "When true, Expand tags pass-through child attributes that share a 
name with a " +
+        "grouping attribute using __is_duplicate metadata, so that name-based 
resolution " +
+        "against the Expand output does not produce AMBIGUOUS_REFERENCE 
errors.")
+      .booleanConf
+      .createWithDefault(true)
+
   val ONLY_NECESSARY_AND_UNIQUE_METADATA_COLUMNS =
     buildConf("spark.sql.analyzer.uniqueNecessaryMetadataColumns")
       .internal()
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupingAnalyticsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupingAnalyticsSuite.scala
index d075e5f50e50..ceb68ab0c92b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupingAnalyticsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupingAnalyticsSuite.scala
@@ -19,10 +19,12 @@ package org.apache.spark.sql.catalyst.analysis
 
 import java.util.TimeZone
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.QueryPlanningTracker
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 class ResolveGroupingAnalyticsSuite extends AnalysisTest {
@@ -279,6 +281,186 @@ class ResolveGroupingAnalyticsSuite extends AnalysisTest {
       Seq("grouping()/grouping_id() can only be used with 
GroupingSets/Cube/Rollup"))
   }
 
+  test("Expand tags pass-through duplicates for simple attribute grouping") {
+    val groupByAliases = Seq(Alias(a, "a")())
+    val groupByAttrs = groupByAliases.map(_.toAttribute)
+    val gidAttr = AttributeReference(VirtualColumn.groupingIdName, 
GroupingID.dataType, false)()
+    val groupingSetsAttrs = 
BaseGroupingSets.rollupExprs(Seq(Seq(groupByAttrs.head)))
+      .map(_.map(e => groupByAttrs.find(_.semanticEquals(e)).get))
+
+    val expand = Expand(groupingSetsAttrs, groupByAliases, groupByAttrs, 
gidAttr, r1)
+    val childOutputInExpand = expand.output.take(r1.output.length)
+
+    assert(childOutputInExpand.head.metadata.contains("__is_duplicate"),
+      "pass-through attribute 'a' should be tagged with __is_duplicate")
+    assert(!childOutputInExpand(1).metadata.contains("__is_duplicate"),
+      "non-grouped attribute 'b' should not be tagged")
+    assert(!childOutputInExpand(2).metadata.contains("__is_duplicate"),
+      "non-grouped attribute 'c' should not be tagged")
+  }
+
+  test("Expand does not tag pass-through for complex grouping expressions") {
+    val complexExpr = a + Literal(1)
+    val groupByAliases = Seq(Alias(complexExpr, "(a + 1)")())
+    val groupByAttrs = groupByAliases.map(_.toAttribute)
+    val gidAttr = AttributeReference(VirtualColumn.groupingIdName, 
GroupingID.dataType, false)()
+    val groupingSetsAttrs = 
BaseGroupingSets.rollupExprs(Seq(Seq(groupByAttrs.head)))
+      .map(_.map(e => groupByAttrs.find(_.semanticEquals(e)).get))
+
+    val expand = Expand(groupingSetsAttrs, groupByAliases, groupByAttrs, 
gidAttr, r1)
+    val childOutputInExpand = expand.output.take(r1.output.length)
+
+    childOutputInExpand.foreach { attr =>
+      assert(!attr.metadata.contains("__is_duplicate"),
+        s"attribute '${attr.name}' should not be tagged for complex grouping 
expression")
+    }
+  }
+
+  test("Expand tags multiple pass-through duplicates for multi-column 
grouping") {
+    val groupByAliases = Seq(Alias(a, "a")(), Alias(b, "b")())
+    val groupByAttrs = groupByAliases.map(_.toAttribute)
+    val gidAttr = AttributeReference(VirtualColumn.groupingIdName, 
GroupingID.dataType, false)()
+    val groupingSetsAttrs =
+      BaseGroupingSets.rollupExprs(groupByAttrs.map(Seq(_)))
+        .map(_.map(e => groupByAttrs.find(_.semanticEquals(e)).get))
+
+    val expand = Expand(groupingSetsAttrs, groupByAliases, groupByAttrs, 
gidAttr, r1)
+    val childOutputInExpand = expand.output.take(r1.output.length)
+
+    assert(childOutputInExpand.head.metadata.contains("__is_duplicate"),
+      "pass-through attribute 'a' should be tagged")
+    assert(childOutputInExpand(1).metadata.contains("__is_duplicate"),
+      "pass-through attribute 'b' should be tagged")
+    assert(!childOutputInExpand(2).metadata.contains("__is_duplicate"),
+      "non-grouped attribute 'c' should not be tagged")
+  }
+
+  test("Expand tagged duplicates preserve ExprId") {
+    val groupByAliases = Seq(Alias(a, "a")())
+    val groupByAttrs = groupByAliases.map(_.toAttribute)
+    val gidAttr = AttributeReference(VirtualColumn.groupingIdName, 
GroupingID.dataType, false)()
+    val groupingSetsAttrs = 
BaseGroupingSets.rollupExprs(Seq(Seq(groupByAttrs.head)))
+      .map(_.map(e => groupByAttrs.find(_.semanticEquals(e)).get))
+
+    val expand = Expand(groupingSetsAttrs, groupByAliases, groupByAttrs, 
gidAttr, r1)
+    val taggedAttr = expand.output.head
+
+    assert(taggedAttr.exprId == a.exprId,
+      "tagged attribute should preserve the original ExprId")
+    assert(taggedAttr.name == a.name,
+      "tagged attribute should preserve the original name")
+  }
+
+  test("Expand pass-through tagging prevents AMBIGUOUS_REFERENCE on name-based 
resolution") {
+    // The Expand for ROLLUP(a) produces an output with two attributes named 
"a":
+    // the pass-through child attribute (a#original) and the new grouping 
instance
+    // (a#new). Any operator that resolves "a" by name against this output 
would see
+    // two candidates and throw AMBIGUOUS_REFERENCE.
+    //
+    // This can happen when an operator sits directly above the Expand and 
contains an
+    // unresolved reference -- for example, a Filter or Project inserted 
between the
+    // Expand and its parent Aggregate by a custom analysis rule, or a 
correlated
+    // subquery whose outer reference resolves against the Expand's output.
+    val groupByAliases = Seq(Alias(a, "a")())
+    val groupByAttrs = groupByAliases.map(_.toAttribute)
+    val gidAttr = AttributeReference(VirtualColumn.groupingIdName, 
GroupingID.dataType, false)()
+    val groupingSetsAttrs = 
BaseGroupingSets.rollupExprs(Seq(Seq(groupByAttrs.head)))
+      .map(_.map(e => groupByAttrs.find(_.semanticEquals(e)).get))
+
+    val expand = Expand(groupingSetsAttrs, groupByAliases, groupByAttrs, 
gidAttr, r1)
+    assert(expand.output.count(_.name == "a") == 2,
+      "Expand output should have 2 attributes named 'a'")
+
+    // With __is_duplicate tagging (the fix), resolve() returns a single 
result.
+    val resolved = expand.output.resolve(Seq("a"), caseSensitiveResolution)
+    assert(resolved.isDefined, "should resolve 'a' successfully with tagging")
+    assert(!resolved.get.toAttribute.metadata.contains("__is_duplicate"),
+      "resolved attribute should be the grouping instance, not the tagged 
pass-through")
+
+    // Without tagging, resolve() throws AMBIGUOUS_REFERENCE because both
+    // candidates match and neither is deprioritized.
+    val untaggedOutput = expand.output.map { attr =>
+      if (attr.metadata.contains("__is_duplicate")) {
+        attr.withMetadata(Metadata.empty)
+      } else {
+        attr
+      }
+    }
+    checkError(
+      exception = intercept[AnalysisException] {
+        untaggedOutput.resolve(Seq("a"), caseSensitiveResolution)
+      },
+      condition = "AMBIGUOUS_REFERENCE",
+      parameters = Map("name" -> "`a`", "referenceNames" -> "[`a`, `a`]")
+    )
+  }
+
+  test("Expand does not tag pass-through duplicates when flag is disabled") {
+    withSQLConf(
+      SQLConf.EXPAND_TAG_PASSTHROUGH_DUPLICATES_ENABLED.key -> "false") {
+      val groupByAliases = Seq(Alias(a, "a")())
+      val groupByAttrs = groupByAliases.map(_.toAttribute)
+      val gidAttr = AttributeReference(VirtualColumn.groupingIdName, 
GroupingID.dataType, false)()
+      val groupingSetsAttrs = 
BaseGroupingSets.rollupExprs(Seq(Seq(groupByAttrs.head)))
+        .map(_.map(e => groupByAttrs.find(_.semanticEquals(e)).get))
+
+      val expand = Expand(groupingSetsAttrs, groupByAliases, groupByAttrs, 
gidAttr, r1)
+      val childOutputInExpand = expand.output.take(r1.output.length)
+
+      childOutputInExpand.foreach { attr =>
+        assert(!attr.metadata.contains("__is_duplicate"),
+          s"attribute '${attr.name}' should not be tagged when flag is 
disabled")
+      }
+
+      assert(expand.output.count(_.name == "a") == 2)
+      checkError(
+        exception = intercept[AnalysisException] {
+          expand.output.resolve(Seq("a"), caseSensitiveResolution)
+        },
+        condition = "AMBIGUOUS_REFERENCE",
+        parameters = Map("name" -> "`a`", "referenceNames" -> "[`a`, `a`]")
+      )
+    }
+  }
+
+  test("Expand does not tag multi-column pass-through duplicates when flag is 
disabled") {
+    withSQLConf(
+      SQLConf.EXPAND_TAG_PASSTHROUGH_DUPLICATES_ENABLED.key -> "false") {
+      val groupByAliases = Seq(Alias(a, "a")(), Alias(b, "b")())
+      val groupByAttrs = groupByAliases.map(_.toAttribute)
+      val gidAttr = AttributeReference(VirtualColumn.groupingIdName, 
GroupingID.dataType, false)()
+      val groupingSetsAttrs =
+        BaseGroupingSets.rollupExprs(groupByAttrs.map(Seq(_)))
+          .map(_.map(e => groupByAttrs.find(_.semanticEquals(e)).get))
+
+      val expand = Expand(groupingSetsAttrs, groupByAliases, groupByAttrs, 
gidAttr, r1)
+      val childOutputInExpand = expand.output.take(r1.output.length)
+
+      childOutputInExpand.foreach { attr =>
+        assert(!attr.metadata.contains("__is_duplicate"),
+          s"attribute '${attr.name}' should not be tagged when flag is 
disabled")
+      }
+
+      assert(expand.output.count(_.name == "a") == 2)
+      checkError(
+        exception = intercept[AnalysisException] {
+          expand.output.resolve(Seq("a"), caseSensitiveResolution)
+        },
+        condition = "AMBIGUOUS_REFERENCE",
+        parameters = Map("name" -> "`a`", "referenceNames" -> "[`a`, `a`]")
+      )
+
+      assert(expand.output.count(_.name == "b") == 2)
+      checkError(
+        exception = intercept[AnalysisException] {
+          expand.output.resolve(Seq("b"), caseSensitiveResolution)
+        },
+        condition = "AMBIGUOUS_REFERENCE",
+        parameters = Map("name" -> "`b`", "referenceNames" -> "[`b`, `b`]")
+      )
+    }
+  }
+
   test("sort with grouping function") {
     // Sort with Grouping function
     val originalPlan = Sort(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to