This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new d35af54842d1 [SPARK-48428][SQL] Fix IllegalStateException in 
NestedColumnAliasing
d35af54842d1 is described below

commit d35af54842d17725aea7208c9d7b170b33a95012
Author: Emil Ejbyfeldt <[email protected]>
AuthorDate: Thu Jun 27 15:31:20 2024 +0800

    [SPARK-48428][SQL] Fix IllegalStateException in NestedColumnAliasing
    
    ### What changes were proposed in this pull request?
    
    In #35170 SPARK-37855 and #32301 SPARK-35194 introduced conditions for 
ExtractValues that can currently not be handled. The considtion is introduced 
after `collectRootReferenceAndExtractValue` and just removes these candidates. 
This is problematic since these expressions might have contained 
`AttributeReference` that needed to not do an incorrect aliasing. This fixes 
this family of bugs by moving the conditions into the function 
`collectRootReferenceAndExtractValue`.
    
    ### Why are the changes needed?
    
    The current code leads to `IllegalStateException` runtime failures.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, fixes a bug.
    
    ### How was this patch tested?
    
    Existing and new unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #46756 from eejbyfeldt/SPARK-48428.
    
    Authored-by: Emil Ejbyfeldt <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit b11608c96f8aeeaa03c6e5038700483266b32448)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../catalyst/optimizer/NestedColumnAliasing.scala   | 16 ++++++++--------
 .../optimizer/NestedColumnAliasingSuite.scala       | 21 +++++++++++++++++++++
 2 files changed, 29 insertions(+), 8 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
index 5d4fcf772b8f..778813e4e9c6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
@@ -217,6 +217,11 @@ object NestedColumnAliasing {
     case _ => false
   }
 
+  private def canAlias(ev: Expression): Boolean = {
+    // we can not alias the attr from lambda variable whose expr id is not 
available
+    !ev.exists(_.isInstanceOf[NamedLambdaVariable]) && ev.references.size == 1
+  }
+
   /**
    * Returns two types of expressions:
    * - Root references that are individually accessed
@@ -225,11 +230,11 @@ object NestedColumnAliasing {
    */
   private def collectRootReferenceAndExtractValue(e: Expression): 
Seq[Expression] = e match {
     case _: AttributeReference => Seq(e)
-    case GetStructField(_: ExtractValue | _: AttributeReference, _, _) => 
Seq(e)
+    case GetStructField(_: ExtractValue | _: AttributeReference, _, _) if 
canAlias(e) => Seq(e)
     case GetArrayStructFields(_: MapValues |
                               _: MapKeys |
                               _: ExtractValue |
-                              _: AttributeReference, _, _, _, _) => Seq(e)
+                              _: AttributeReference, _, _, _, _) if 
canAlias(e) => Seq(e)
     case es if es.children.nonEmpty => 
es.children.flatMap(collectRootReferenceAndExtractValue)
     case _ => Seq.empty
   }
@@ -248,13 +253,8 @@ object NestedColumnAliasing {
     val otherRootReferences = new mutable.ArrayBuffer[AttributeReference]()
     exprList.foreach { e =>
       extractor(e).foreach {
-        // we can not alias the attr from lambda variable whose expr id is not 
available
-        case ev: ExtractValue if 
!ev.exists(_.isInstanceOf[NamedLambdaVariable]) =>
-          if (ev.references.size == 1) {
-            nestedFieldReferences.append(ev)
-          }
+        case ev: ExtractValue => nestedFieldReferences.append(ev)
         case ar: AttributeReference => otherRootReferences.append(ar)
-        case _ => // ignore
       }
     }
     val exclusiveAttrSet = AttributeSet(exclusiveAttrs ++ otherRootReferences)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
index cb6b9ac8d8be..6ce394dbd68b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
@@ -861,6 +861,27 @@ class NestedColumnAliasingSuite extends SchemaPruningTest {
     // The plan is expected to be unchanged.
     comparePlans(plan, RemoveNoopOperators.apply(optimized.get))
   }
+
+  test("SPARK-48428: Do not pushdown when attr is used in expression with 
mutliple references") {
+    val query = contact
+      .limit(5)
+      .select(
+        GetStructField(GetStructField(CreateStruct(Seq($"id", $"employer")), 
1), 0),
+        $"employer.id")
+      .analyze
+
+    val optimized = Optimize.execute(query)
+
+    val expected = contact
+      .select($"id", $"employer")
+      .limit(5)
+      .select(
+        GetStructField(GetStructField(CreateStruct(Seq($"id", $"employer")), 
1), 0),
+        $"employer.id")
+      .analyze
+
+    comparePlans(optimized, expected)
+  }
 }
 
 object NestedColumnAliasingSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to