This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 71a046b73abe [SPARK-52157][SQL] Use earlier computed name for 
OuterReference in single-pass resolver
71a046b73abe is described below

commit 71a046b73abe7ac244afa4cd09d80dffb133509d
Author: mihailoale-db <mihailo.alek...@databricks.com>
AuthorDate: Fri May 16 11:20:14 2025 +0200

    [SPARK-52157][SQL] Use earlier computed name for OuterReference in 
single-pass resolver
    
    ### What changes were proposed in this pull request?
    In the following query we would have min(outer(t2.t2a)) as a name for 
min(t2a) expression.
    
    ```
    SELECT t1a
    FROM   t1
    WHERE  t1a IN (SELECT t2a
                   FROM   t2
                   WHERE  EXISTS (SELECT min(t2a)
                                  FROM   t3))
    ```
    
    This is a problem in compatibility between single-pass resolver and 
fixed-point analyzer because names in single-pass are generated after we finish 
resolution of aggregate expression min(t2a) (bottom-up manner) and at that 
point we have OuterReference wrapped around aggregate expression (name looks 
like outer(min(t2a))).
    In this PR I propose that we store the name earlier (in a form of a tag) 
and use it when needed
    
    ### Why are the changes needed?
    To make single-pass and fixed-point compatible.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #50903 from mihailoale-db/outerreferencetag.
    
    Authored-by: mihailoale-db <mihailo.alek...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../resolver/AggregateExpressionResolver.scala     | 42 ++++++++++++++--------
 .../catalyst/expressions/namedExpressions.scala    | 26 ++++++++++++--
 2 files changed, 52 insertions(+), 16 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateExpressionResolver.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateExpressionResolver.scala
index 63642e3ab1f1..c13a01115429 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateExpressionResolver.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateExpressionResolver.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.{
 import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, 
OuterReference, SubExprUtils}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Sort}
+import org.apache.spark.sql.catalyst.util.toPrettySQL
 import org.apache.spark.sql.errors.QueryCompilationErrors
 
 /**
@@ -139,7 +140,9 @@ class AggregateExpressionResolver(
    *  - Alias this subtree and put it inside the current [[SubqueryScope]];
    *  - If outer aggregates are allowed, replace the [[AggregateExpression]] 
with an
    *    [[OuterReference]] to the auto-generated [[Alias]] that we created. 
This alias will later
-   *    be injected into the outer [[Aggregate]];
+   *    be injected into the outer [[Aggregate]]; We store the name that needs 
to be used for the
+   *    [[OuterReference]] in 
[[OuterReference.SINGLE_PASS_SQL_STRING_OVERRIDE]] computed based on
+   *    the [[AggregateExpression]] without [[OuterReference]] pulled out.
    *  - In case we have an [[AggregateExpression]] inside a [[Sort]] operator, 
we need to handle it
    *    in a special way (see [[handleAggregateExpressionInSort]] for more 
details).
    *  - Return the original [[AggregateExpression]] otherwise. This is done to 
stay compatible
@@ -155,21 +158,32 @@ class AggregateExpressionResolver(
       )
     }
 
-    if (subqueryRegistry.currentScope.isOuterAggregateAllowed) {
-      val aggregateExpressionWithStrippedOuterReferences =
-        SubExprUtils.stripOuterReference(aggregateExpression)
+    val resolvedOuterAggregateExpression =
+      if (subqueryRegistry.currentScope.isOuterAggregateAllowed) {
+        val aggregateExpressionWithStrippedOuterReferences =
+          SubExprUtils.stripOuterReference(aggregateExpression)
 
-      val outerAggregateExpressionAlias = 
autoGeneratedAliasProvider.newOuterAlias(
-        child = aggregateExpressionWithStrippedOuterReferences
-      )
-      subqueryRegistry.currentScope.addOuterAggregateExpression(
-        outerAggregateExpressionAlias,
-        aggregateExpressionWithStrippedOuterReferences
-      )
+        val outerAggregateExpressionAlias = 
autoGeneratedAliasProvider.newOuterAlias(
+          child = aggregateExpressionWithStrippedOuterReferences
+        )
+        subqueryRegistry.currentScope.addOuterAggregateExpression(
+          outerAggregateExpressionAlias,
+          aggregateExpressionWithStrippedOuterReferences
+        )
 
-      OuterReference(outerAggregateExpressionAlias.toAttribute)
-    } else {
-      aggregateExpression
+        OuterReference(outerAggregateExpressionAlias.toAttribute)
+      } else {
+        aggregateExpression
+      }
+
+    resolvedOuterAggregateExpression match {
+      case outerReference: OuterReference =>
+        outerReference.setTagValue(
+          OuterReference.SINGLE_PASS_SQL_STRING_OVERRIDE,
+          toPrettySQL(aggregateExpression)
+        )
+        outerReference
+      case other => other
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 1d0082582114..0bfba04d6372 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
-import org.apache.spark.sql.catalyst.trees.TreePattern
+import org.apache.spark.sql.catalyst.trees.{TreeNodeTag, TreePattern}
 import org.apache.spark.sql.catalyst.trees.TreePattern._
 import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, 
METADATA_COL_ATTR_KEY}
@@ -443,7 +443,13 @@ case class OuterReference(e: NamedExpression)
   override def nullable: Boolean = e.nullable
   override def prettyName: String = "outer"
 
-  override def sql: String = s"$prettyName(${e.sql})"
+  override def sql: String =
+    getTagValue(OuterReference.SINGLE_PASS_SQL_STRING_OVERRIDE) match {
+      case Some(name) =>
+        name
+      case None =>
+        s"$prettyName(${e.sql})"
+    }
   override def name: String = e.name
   override def qualifier: Seq[String] = e.qualifier
   override def exprId: ExprId = e.exprId
@@ -510,6 +516,22 @@ case class LateralColumnAliasReference(ne: 
NamedExpression, nameParts: Seq[Strin
   final override val nodePatterns: Seq[TreePattern] = 
Seq(LATERAL_COLUMN_ALIAS_REFERENCE)
 }
 
+object OuterReference {
+
+  /**
+   * In fixed-point [[OuterReference]] is extracted in 
[[UpdateOuterReferences]] which is invoked
+   * after the alias assignment ([[ResolveAliases]]) which is opposite of how 
it is done in the
+   * single-pass implementation (first we extract the [[OuterReference]] and 
then assign the name -
+   * in bottom-up manner).
+   *
+   * In order to make single-pass and fixed-point implementations compatible 
use earlier computed
+   * name (if defined) for [[OuterReference]] (defined in
+   * `AggregateExpressionResolver.handleOuterAggregateExpression`).
+   */
+  val SINGLE_PASS_SQL_STRING_OVERRIDE =
+    TreeNodeTag[String]("single_pass_sql_string_override")
+}
+
 object VirtualColumn {
   // The attribute name used by Hive, which has different result than Spark, 
deprecated.
   val hiveGroupingIdName: String = "grouping__id"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to