This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 71f97490afb0 [SPARK-47241][SQL][FOLLOWUP] Fix issue when laterally 
referencing a `Generator`
71f97490afb0 is described below

commit 71f97490afb029997100147429903b9e41812a60
Author: Mihailo Timotic <mihailo.timo...@databricks.com>
AuthorDate: Thu Mar 20 13:28:03 2025 +0800

    [SPARK-47241][SQL][FOLLOWUP] Fix issue when laterally referencing a 
`Generator`
    
    ### What changes were proposed in this pull request?
    Fix issue when laterally referencing a `Generator`.
    
    ### Why are the changes needed?
    Fix the following query pattern:
    ```
    WITH cte AS (SELECT EXPLODE(ARRAY(1, 2, 3)) AS c1, c1) SELECT * FROM cte
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added a test case to `LateralColumnAliasSuite`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #50310 from mihailotim-db/mihailotim-db/generator_lca.
    
    Authored-by: Mihailo Timotic <mihailo.timo...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 62c06692e47ef4b8488ea5d8ecae7c708f0ee808)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 25 +++--------
 .../catalyst/analysis/ColumnResolutionHelper.scala |  9 +++-
 .../apache/spark/sql/LateralColumnAliasSuite.scala | 51 +++++++---------------
 3 files changed, 31 insertions(+), 54 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 8adc3b3dd4ab..2e05024e8690 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2983,20 +2983,6 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
       }
     }
 
-    // We must wait until all expressions except for generator functions are 
resolved before
-    // rewriting generator functions in Project/Aggregate. This is necessary 
to make this rule
-    // stable for different execution orders of analyzer rules. See also 
SPARK-47241.
-    private def canRewriteGenerator(namedExprs: Seq[NamedExpression]): Boolean 
= {
-      namedExprs.forall { ne =>
-        ne.resolved || {
-          trimNonTopLevelAliases(ne) match {
-            case AliasedGenerator(_, _, _) => true
-            case _ => false
-          }
-        }
-      }
-    }
-
     def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
       _.containsPattern(GENERATOR), ruleId) {
       case p @ Project(Seq(UnresolvedStarWithColumns(_, _, _)), _) =>
@@ -3015,8 +3001,11 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
         val generators = aggList.filter(hasGenerator).map(trimAlias)
         throw QueryCompilationErrors.moreThanOneGeneratorError(generators)
 
-      case Aggregate(groupList, aggList, child, _) if 
canRewriteGenerator(aggList) &&
-          aggList.exists(hasGenerator) =>
+      case Aggregate(groupList, aggList, child, _) if
+        aggList.forall {
+          case AliasedGenerator(_, _, _) => true
+          case other => other.resolved
+        } && aggList.exists(hasGenerator) =>
         // If generator in the aggregate list was visited, set the boolean 
flag true.
         var generatorVisited = false
 
@@ -3061,8 +3050,8 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
         // first for replacing `Project` with `Aggregate`.
         p
 
-      case p @ Project(projectList, child) if canRewriteGenerator(projectList) 
&&
-          projectList.exists(hasGenerator) =>
+      // The star will be expanded differently if we insert `Generate` under 
`Project` too early.
+      case p @ Project(projectList, child) if 
!projectList.exists(_.exists(_.isInstanceOf[Star])) =>
         val (resolvedGenerator, newProjectList) = projectList
           .map(trimNonTopLevelAliases)
           .foldLeft((None: Option[Generate], Nil: Seq[NamedExpression])) { 
(res, e) =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
index d9c7857055c0..ae4e034b6696 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
@@ -428,7 +428,14 @@ trait ColumnResolutionHelper extends Logging with 
DataTypeErrorsBase {
           // Lateral column alias does not have qualifiers. We always use the 
first name part to
           // look up lateral column aliases.
           val lowerCasedName = u.nameParts.head.toLowerCase(Locale.ROOT)
-          aliasMap.get(lowerCasedName).map {
+          aliasMap.get(lowerCasedName).filter {
+            // Do not resolve LCA with aliased `Generator`, as it will be 
rewritten by the rule
+            // `ExtractGenerator` with fresh output attribute IDs. The 
`Generator` will be pulled
+            // out and put in a `Generate` node below `Project`, so that we 
can resolve the column
+            // normally without LCA resolution.
+            case scala.util.Left(alias) => !alias.child.isInstanceOf[Generator]
+            case _ => true
+          }.map {
             case scala.util.Left(alias) =>
               if (alias.resolved) {
                 val resolvedAttr = resolveExpressionByPlanOutput(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/LateralColumnAliasSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/LateralColumnAliasSuite.scala
index 3def42cd7ee5..ae8d84aadb03 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/LateralColumnAliasSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/LateralColumnAliasSuite.scala
@@ -1366,40 +1366,21 @@ class LateralColumnAliasSuite extends 
LateralColumnAliasSuiteBase {
     sql("select 1 as a, a").queryExecution.assertAnalyzed()
   }
 
-  test("SPARK-49349: Improve error message for LCA with Generate") {
-    checkError(
-      exception = intercept[AnalysisException] {
-        sql(
-          s"""
-            |SELECT
-            |  explode(split(name , ',')) AS new_name,
-            |  new_name like 'a%'
-            |FROM $testTable
-            |""".stripMargin)
-      },
-      condition = "UNSUPPORTED_FEATURE.LATERAL_COLUMN_ALIAS_IN_GENERATOR",
-      sqlState = "0A000",
-      parameters = Map(
-        "lca" -> "`new_name`",
-        "generatorExpr" -> "\"unresolvedalias(lateralAliasReference(new_name) 
LIKE a%)\""))
-
-    checkError(
-      exception = intercept[AnalysisException] {
-        sql(
-          s"""
-             |SELECT
-             |  explode_outer(from_json(name,'array<struct<values:string>>')) 
as newName,
-             |  size(from_json(newName.values,'array<string>')) +
-             |    size(array(from_json(newName.values,'map<string,string>'))) 
as size
-             |FROM $testTable
-             |""".stripMargin)
-      },
-      condition = "UNSUPPORTED_FEATURE.LATERAL_COLUMN_ALIAS_IN_GENERATOR",
-      sqlState = "0A000",
-      parameters = Map(
-        "lca" -> "`newName.values`",
-        "generatorExpr" -> 
("\"(size(from_json(lateralAliasReference(newName.values), " +
-          "array<string>)) + 
size(array(from_json(lateralAliasReference(newName.values), " +
-          "map<string,string>)))) AS size\"")))
+  test("LateralColumnAlias with Generate") {
+    checkAnswer(
+      sql("WITH cte AS (SELECT EXPLODE(ARRAY(1, 2, 3)) AS c1, c1) SELECT * 
FROM cte"),
+      Row(1, 1) :: Row(2, 2) :: Row(3, 3) :: Nil
+    )
+    checkAnswer(
+      sql(
+        s"""
+           |SELECT
+           |  explode(split(name , ',')) AS new_name,
+           |  new_name like 'a%'
+           |FROM $testTable
+           |""".stripMargin),
+      Row("alex", true) :: Row("amy", true) :: Row("cathy", false) ::
+        Row("david", false) :: Row("jen", false) :: Nil
+    )
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to