This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cec3c4f7c1b [SPARK-49460][SQL] Remove `cleanupResource()` from 
EmptyRelationExec
9cec3c4f7c1b is described below

commit 9cec3c4f7c1b467023f0eefff69e8b7c5105417d
Author: Ziqi Liu <[email protected]>
AuthorDate: Sat Aug 31 10:05:18 2024 +0800

    [SPARK-49460][SQL] Remove `cleanupResource()` from EmptyRelationExec
    
    ### What changes were proposed in this pull request?
    Remove cleanupResource() from`EmptyRelationExec`
    
    ### Why are the changes needed?
    
    This bug was introduced in https://github.com/apache/spark/pull/46830 : 
`cleanupResources` might be executed on the executor where `logical` is null.
    
    After revisiting cleanupResources relevant code paths, I think 
`EmptyRelationExec` doesn't need to anything here.
    
    - for driver side cleanup, we have [this code 
path](https://github.com/apache/spark/blob/0602020eb3b346a8c50ad32eeda4e6dabb70c584/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala)
 to cleanup each AQE query stage.
    - for executor side cleanup, so far we only have SortMergeJoinExec which 
invoke cleanupResource during its execution, so upon the time when 
EmptyRelationExec is created, it's guaranteed necessary cleanup has been done.
    -
    After all, `EmptyRelationExec` is only a never-execute wrapper for 
materialized physical query stages, it should not be responsible for any 
cleanup invocation.
    
    So I'm removing `cleanupResources` implementation from `EmptyRelationExec`.
    
    ### Does this PR introduce _any_ user-facing change?
    NO
    
    ### How was this patch tested?
    New unit test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    NO
    
    Closes #47931 from liuzqt/SPARK-49460.
    
    Authored-by: Ziqi Liu <[email protected]>
    Signed-off-by: yangjie01 <[email protected]>
---
 .../spark/sql/execution/EmptyRelationExec.scala    | 10 ------
 .../adaptive/AdaptiveQueryExecSuite.scala          | 37 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 10 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala
index 085c0b22524c..8a544de7567e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala
@@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.adaptive.LogicalQueryStage
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
 /**
@@ -81,13 +80,4 @@ case class EmptyRelationExec(@transient logical: 
LogicalPlan) extends LeafExecNo
   override def doCanonicalize(): SparkPlan = {
     this.copy(logical = LocalRelation(logical.output).canonicalized)
   }
-
-  override protected[sql] def cleanupResources(): Unit = {
-    logical.foreach {
-      case LogicalQueryStage(_, physical) =>
-        physical.cleanupResources()
-      case _ =>
-    }
-    super.cleanupResources()
-  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index fc54e7ecd46d..938a96a86b01 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -1608,6 +1608,43 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-49460: NPE in EmptyRelationExec.cleanupResources") {
+    withTable("t1left", "t1right", "t1empty") {
+      spark.sql("create table t1left (a int, b int);")
+      spark.sql("insert into t1left values (1, 1), (2,2), (3,3);")
+      spark.sql("create table t1right (a int, b int);")
+      spark.sql("create table t1empty (a int, b int);")
+      spark.sql("insert into t1right values (2,20), (4, 40);")
+
+      spark.sql("""
+                  |with leftT as (
+                  |  with erp as (
+                  |    select
+                  |      *
+                  |    from
+                  |      t1left
+                  |      join t1empty on t1left.a = t1empty.a
+                  |      join t1right on t1left.a = t1right.a
+                  |  )
+                  |  SELECT
+                  |    CASE
+                  |      WHEN COUNT(*) = 0 THEN 4
+                  |      ELSE NULL
+                  |    END AS a
+                  |  FROM
+                  |    erp
+                  |  HAVING
+                  |    COUNT(*) = 0
+                  |)
+                  |select
+                  |  /*+ MERGEJOIN(t1right) */
+                  |  *
+                  |from
+                  |  leftT
+                  |  join t1right on leftT.a = 
t1right.a""".stripMargin).collect()
+    }
+  }
+
   test("SPARK-35585: Support propagate empty relation through project/filter") 
{
     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to