This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 71a046b73abe [SPARK-52157][SQL] Use earlier computed name for OuterReference in single-pass resolver 71a046b73abe is described below commit 71a046b73abe7ac244afa4cd09d80dffb133509d Author: mihailoale-db <mihailo.alek...@databricks.com> AuthorDate: Fri May 16 11:20:14 2025 +0200 [SPARK-52157][SQL] Use earlier computed name for OuterReference in single-pass resolver ### What changes were proposed in this pull request? In the following query we would have min(outer(t2.t2a)) as a name for min(t2a) expression. ``` SELECT t1a FROM t1 WHERE t1a IN (SELECT t2a FROM t2 WHERE EXISTS (SELECT min(t2a) FROM t3)) ``` This is a problem in compatibility between single-pass resolver and fixed-point analyzer because names in single-pass are generated after we finish resolution of aggregate expression min(t2a) (bottom-up manner) and at that point we have OuterReference wrapped around aggregate expression (name looks like outer(min(t2a))). In this PR I propose that we store the name earlier (in a form of a tag) and use it when needed ### Why are the changes needed? To make single-pass and fixed-point compatible. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50903 from mihailoale-db/outerreferencetag. Authored-by: mihailoale-db <mihailo.alek...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../resolver/AggregateExpressionResolver.scala | 42 ++++++++++++++-------- .../catalyst/expressions/namedExpressions.scala | 26 ++++++++++++-- 2 files changed, 52 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateExpressionResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateExpressionResolver.scala index 63642e3ab1f1..c13a01115429 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateExpressionResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateExpressionResolver.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.{ import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, OuterReference, SubExprUtils} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Sort} +import org.apache.spark.sql.catalyst.util.toPrettySQL import org.apache.spark.sql.errors.QueryCompilationErrors /** @@ -139,7 +140,9 @@ class AggregateExpressionResolver( * - Alias this subtree and put it inside the current [[SubqueryScope]]; * - If outer aggregates are allowed, replace the [[AggregateExpression]] with an * [[OuterReference]] to the auto-generated [[Alias]] that we created. This alias will later - * be injected into the outer [[Aggregate]]; + * be injected into the outer [[Aggregate]]; We store the name that needs to be used for the + * [[OuterReference]] in [[OuterReference.SINGLE_PASS_SQL_STRING_OVERRIDE]] computed based on + * the [[AggregateExpression]] without [[OuterReference]] pulled out. * - In case we have an [[AggregateExpression]] inside a [[Sort]] operator, we need to handle it * in a special way (see [[handleAggregateExpressionInSort]] for more details). * - Return the original [[AggregateExpression]] otherwise. This is done to stay compatible @@ -155,21 +158,32 @@ class AggregateExpressionResolver( ) } - if (subqueryRegistry.currentScope.isOuterAggregateAllowed) { - val aggregateExpressionWithStrippedOuterReferences = - SubExprUtils.stripOuterReference(aggregateExpression) + val resolvedOuterAggregateExpression = + if (subqueryRegistry.currentScope.isOuterAggregateAllowed) { + val aggregateExpressionWithStrippedOuterReferences = + SubExprUtils.stripOuterReference(aggregateExpression) - val outerAggregateExpressionAlias = autoGeneratedAliasProvider.newOuterAlias( - child = aggregateExpressionWithStrippedOuterReferences - ) - subqueryRegistry.currentScope.addOuterAggregateExpression( - outerAggregateExpressionAlias, - aggregateExpressionWithStrippedOuterReferences - ) + val outerAggregateExpressionAlias = autoGeneratedAliasProvider.newOuterAlias( + child = aggregateExpressionWithStrippedOuterReferences + ) + subqueryRegistry.currentScope.addOuterAggregateExpression( + outerAggregateExpressionAlias, + aggregateExpressionWithStrippedOuterReferences + ) - OuterReference(outerAggregateExpressionAlias.toAttribute) - } else { - aggregateExpression + OuterReference(outerAggregateExpressionAlias.toAttribute) + } else { + aggregateExpression + } + + resolvedOuterAggregateExpression match { + case outerReference: OuterReference => + outerReference.setTagValue( + OuterReference.SINGLE_PASS_SQL_STRING_OVERRIDE, + toPrettySQL(aggregateExpression) + ) + outerReference + case other => other } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 1d0082582114..0bfba04d6372 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark -import org.apache.spark.sql.catalyst.trees.TreePattern +import org.apache.spark.sql.catalyst.trees.{TreeNodeTag, TreePattern} import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, METADATA_COL_ATTR_KEY} @@ -443,7 +443,13 @@ case class OuterReference(e: NamedExpression) override def nullable: Boolean = e.nullable override def prettyName: String = "outer" - override def sql: String = s"$prettyName(${e.sql})" + override def sql: String = + getTagValue(OuterReference.SINGLE_PASS_SQL_STRING_OVERRIDE) match { + case Some(name) => + name + case None => + s"$prettyName(${e.sql})" + } override def name: String = e.name override def qualifier: Seq[String] = e.qualifier override def exprId: ExprId = e.exprId @@ -510,6 +516,22 @@ case class LateralColumnAliasReference(ne: NamedExpression, nameParts: Seq[Strin final override val nodePatterns: Seq[TreePattern] = Seq(LATERAL_COLUMN_ALIAS_REFERENCE) } +object OuterReference { + + /** + * In fixed-point [[OuterReference]] is extracted in [[UpdateOuterReferences]] which is invoked + * after the alias assignment ([[ResolveAliases]]) which is opposite of how it is done in the + * single-pass implementation (first we extract the [[OuterReference]] and then assign the name - + * in bottom-up manner). + * + * In order to make single-pass and fixed-point implementations compatible use earlier computed + * name (if defined) for [[OuterReference]] (defined in + * `AggregateExpressionResolver.handleOuterAggregateExpression`). + */ + val SINGLE_PASS_SQL_STRING_OVERRIDE = + TreeNodeTag[String]("single_pass_sql_string_override") +} + object VirtualColumn { // The attribute name used by Hive, which has different result than Spark, deprecated. val hiveGroupingIdName: String = "grouping__id" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org