This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new 71f97490afb0 [SPARK-47241][SQL][FOLLOWUP] Fix issue when laterally referencing a `Generator` 71f97490afb0 is described below commit 71f97490afb029997100147429903b9e41812a60 Author: Mihailo Timotic <mihailo.timo...@databricks.com> AuthorDate: Thu Mar 20 13:28:03 2025 +0800 [SPARK-47241][SQL][FOLLOWUP] Fix issue when laterally referencing a `Generator` ### What changes were proposed in this pull request? Fix issue when laterally referencing a `Generator`. ### Why are the changes needed? Fix the following query pattern: ``` WITH cte AS (SELECT EXPLODE(ARRAY(1, 2, 3)) AS c1, c1) SELECT * FROM cte ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a test case to `LateralColumnAliasSuite` ### Was this patch authored or co-authored using generative AI tooling? No Closes #50310 from mihailotim-db/mihailotim-db/generator_lca. Authored-by: Mihailo Timotic <mihailo.timo...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 62c06692e47ef4b8488ea5d8ecae7c708f0ee808) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 25 +++-------- .../catalyst/analysis/ColumnResolutionHelper.scala | 9 +++- .../apache/spark/sql/LateralColumnAliasSuite.scala | 51 +++++++--------------- 3 files changed, 31 insertions(+), 54 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 8adc3b3dd4ab..2e05024e8690 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2983,20 +2983,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor } } - // We must wait until all expressions except for generator functions are resolved before - // rewriting generator functions in Project/Aggregate. This is necessary to make this rule - // stable for different execution orders of analyzer rules. See also SPARK-47241. - private def canRewriteGenerator(namedExprs: Seq[NamedExpression]): Boolean = { - namedExprs.forall { ne => - ne.resolved || { - trimNonTopLevelAliases(ne) match { - case AliasedGenerator(_, _, _) => true - case _ => false - } - } - } - } - def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( _.containsPattern(GENERATOR), ruleId) { case p @ Project(Seq(UnresolvedStarWithColumns(_, _, _)), _) => @@ -3015,8 +3001,11 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor val generators = aggList.filter(hasGenerator).map(trimAlias) throw QueryCompilationErrors.moreThanOneGeneratorError(generators) - case Aggregate(groupList, aggList, child, _) if canRewriteGenerator(aggList) && - aggList.exists(hasGenerator) => + case Aggregate(groupList, aggList, child, _) if + aggList.forall { + case AliasedGenerator(_, _, _) => true + case other => other.resolved + } && aggList.exists(hasGenerator) => // If generator in the aggregate list was visited, set the boolean flag true. var generatorVisited = false @@ -3061,8 +3050,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor // first for replacing `Project` with `Aggregate`. p - case p @ Project(projectList, child) if canRewriteGenerator(projectList) && - projectList.exists(hasGenerator) => + // The star will be expanded differently if we insert `Generate` under `Project` too early. + case p @ Project(projectList, child) if !projectList.exists(_.exists(_.isInstanceOf[Star])) => val (resolvedGenerator, newProjectList) = projectList .map(trimNonTopLevelAliases) .foldLeft((None: Option[Generate], Nil: Seq[NamedExpression])) { (res, e) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala index d9c7857055c0..ae4e034b6696 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala @@ -428,7 +428,14 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase { // Lateral column alias does not have qualifiers. We always use the first name part to // look up lateral column aliases. val lowerCasedName = u.nameParts.head.toLowerCase(Locale.ROOT) - aliasMap.get(lowerCasedName).map { + aliasMap.get(lowerCasedName).filter { + // Do not resolve LCA with aliased `Generator`, as it will be rewritten by the rule + // `ExtractGenerator` with fresh output attribute IDs. The `Generator` will be pulled + // out and put in a `Generate` node below `Project`, so that we can resolve the column + // normally without LCA resolution. + case scala.util.Left(alias) => !alias.child.isInstanceOf[Generator] + case _ => true + }.map { case scala.util.Left(alias) => if (alias.resolved) { val resolvedAttr = resolveExpressionByPlanOutput( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/LateralColumnAliasSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/LateralColumnAliasSuite.scala index 3def42cd7ee5..ae8d84aadb03 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/LateralColumnAliasSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/LateralColumnAliasSuite.scala @@ -1366,40 +1366,21 @@ class LateralColumnAliasSuite extends LateralColumnAliasSuiteBase { sql("select 1 as a, a").queryExecution.assertAnalyzed() } - test("SPARK-49349: Improve error message for LCA with Generate") { - checkError( - exception = intercept[AnalysisException] { - sql( - s""" - |SELECT - | explode(split(name , ',')) AS new_name, - | new_name like 'a%' - |FROM $testTable - |""".stripMargin) - }, - condition = "UNSUPPORTED_FEATURE.LATERAL_COLUMN_ALIAS_IN_GENERATOR", - sqlState = "0A000", - parameters = Map( - "lca" -> "`new_name`", - "generatorExpr" -> "\"unresolvedalias(lateralAliasReference(new_name) LIKE a%)\"")) - - checkError( - exception = intercept[AnalysisException] { - sql( - s""" - |SELECT - | explode_outer(from_json(name,'array<struct<values:string>>')) as newName, - | size(from_json(newName.values,'array<string>')) + - | size(array(from_json(newName.values,'map<string,string>'))) as size - |FROM $testTable - |""".stripMargin) - }, - condition = "UNSUPPORTED_FEATURE.LATERAL_COLUMN_ALIAS_IN_GENERATOR", - sqlState = "0A000", - parameters = Map( - "lca" -> "`newName.values`", - "generatorExpr" -> ("\"(size(from_json(lateralAliasReference(newName.values), " + - "array<string>)) + size(array(from_json(lateralAliasReference(newName.values), " + - "map<string,string>)))) AS size\""))) + test("LateralColumnAlias with Generate") { + checkAnswer( + sql("WITH cte AS (SELECT EXPLODE(ARRAY(1, 2, 3)) AS c1, c1) SELECT * FROM cte"), + Row(1, 1) :: Row(2, 2) :: Row(3, 3) :: Nil + ) + checkAnswer( + sql( + s""" + |SELECT + | explode(split(name , ',')) AS new_name, + | new_name like 'a%' + |FROM $testTable + |""".stripMargin), + Row("alex", true) :: Row("amy", true) :: Row("cathy", false) :: + Row("david", false) :: Row("jen", false) :: Nil + ) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org