This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 0baa5c7d2b7 [SPARK-36718][SQL][FOLLOWUP] Fix the `isExtractOnly` check 
in CollapseProject
0baa5c7d2b7 is described below

commit 0baa5c7d2b71f379fad6a8a0b72f427acf70f4e4
Author: Wenchen Fan <cloud0...@gmail.com>
AuthorDate: Wed May 11 21:58:14 2022 -0700

    [SPARK-36718][SQL][FOLLOWUP] Fix the `isExtractOnly` check in 
CollapseProject
    
    This PR fixes a perf regression in Spark 3.3 caused by 
https://github.com/apache/spark/pull/33958
    
    In `CollapseProject`, we want to treat `CreateStruct` and its friends as 
cheap expressions if they are only referenced by `ExtractValue`, but the check 
is too conservative, which causes a perf regression. This PR fixes this check. 
Now "extract-only" means: the attribute only appears as a child of 
`ExtractValue`, but the consumer expression can be in any shape.
    
    Fixes perf regression
    
    No
    
    new tests
    
    Closes #36510 from cloud-fan/bug.
    
    Lead-authored-by: Wenchen Fan <cloud0...@gmail.com>
    Co-authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
    (cherry picked from commit 547f032d04bd2cf06c54b5a4a2f984f5166beb7d)
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala       | 14 ++++++++------
 .../sql/catalyst/optimizer/CollapseProjectSuite.scala  | 18 +++++++++++++++---
 2 files changed, 23 insertions(+), 9 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 753d81e4003..759a7044f15 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -997,12 +997,14 @@ object CollapseProject extends Rule[LogicalPlan] with 
AliasHelper {
       }
   }
 
-  @scala.annotation.tailrec
-  private def isExtractOnly(expr: Expression, ref: Attribute): Boolean = expr 
match {
-    case a: Alias => isExtractOnly(a.child, ref)
-    case e: ExtractValue => isExtractOnly(e.children.head, ref)
-    case a: Attribute => a.semanticEquals(ref)
-    case _ => false
+  private def isExtractOnly(expr: Expression, ref: Attribute): Boolean = {
+    def hasRefInNonExtractValue(e: Expression): Boolean = e match {
+      case a: Attribute => a.semanticEquals(ref)
+      // The first child of `ExtractValue` is the complex type to be extracted.
+      case e: ExtractValue if e.children.head.semanticEquals(ref) => false
+      case _ => e.children.exists(hasRefInNonExtractValue)
+    }
+    !hasRefInNonExtractValue(expr)
   }
 
   /**
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
index c1d13d14b05..f6c3209726b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
@@ -30,7 +30,8 @@ class CollapseProjectSuite extends PlanTest {
   object Optimize extends RuleExecutor[LogicalPlan] {
     val batches =
       Batch("Subqueries", FixedPoint(10), EliminateSubqueryAliases) ::
-      Batch("CollapseProject", Once, CollapseProject) :: Nil
+      Batch("CollapseProject", Once, CollapseProject) ::
+      Batch("SimplifyExtractValueOps", Once, SimplifyExtractValueOps) :: Nil
   }
 
   val testRelation = LocalRelation('a.int, 'b.int)
@@ -123,12 +124,23 @@ class CollapseProjectSuite extends PlanTest {
 
   test("SPARK-36718: do not collapse project if non-cheap expressions will be 
repeated") {
     val query = testRelation
-      .select(('a + 1).as('a_plus_1))
-      .select(('a_plus_1 + 'a_plus_1).as('a_2_plus_2))
+      .select(($"a" + 1).as("a_plus_1"))
+      .select(($"a_plus_1" + $"a_plus_1").as("a_2_plus_2"))
       .analyze
 
     val optimized = Optimize.execute(query)
     comparePlans(optimized, query)
+
+    // CreateStruct is an exception if it's only referenced by ExtractValue.
+    val query2 = testRelation
+      .select(namedStruct("a", $"a", "a_plus_1", $"a" + 1).as("struct"))
+      .select(($"struct".getField("a") + 
$"struct".getField("a_plus_1")).as("add"))
+      .analyze
+    val optimized2 = Optimize.execute(query2)
+    val expected2 = testRelation
+      .select(($"a" + ($"a" + 1)).as("add"))
+      .analyze
+    comparePlans(optimized2, expected2)
   }
 
   test("preserve top-level alias metadata while collapsing projects") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to