This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 26e59f26b10c [SPARK-49492][CONNECT] Reattach attempted on inactive 
ExecutionHolder
26e59f26b10c is described below

commit 26e59f26b10c1647b4517869fc911fb7e13fdd22
Author: Changgyoo Park <[email protected]>
AuthorDate: Fri Sep 6 12:09:30 2024 +0900

    [SPARK-49492][CONNECT] Reattach attempted on inactive ExecutionHolder
    
    ### What changes were proposed in this pull request?
    
    Check the status of the ExecutionHolder before reattaching it to the client.
    
    ### Why are the changes needed?
    
    An ExecutionHolder may fail to spawn an ExecuteThreadRunner if an exception 
is thrown before doing so. In that case, reattaching the ExecutionHolder will 
succeed but no progress will be made.
    -> *As a result, the job is hanging.*
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add a test case to ReattachableExecuteSuite.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #47955 from changgyoopark-db/SPARK-49492.
    
    Authored-by: Changgyoo Park <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../service/SparkConnectExecutePlanHandler.scala   |  9 +++++++++
 .../execution/ReattachableExecuteSuite.scala       | 23 ++++++++++++++++++++++
 2 files changed, 32 insertions(+)

diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala
index 1ab5f26f90b1..73a20e448be8 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala
@@ -31,6 +31,15 @@ class SparkConnectExecutePlanHandler(responseObserver: 
StreamObserver[proto.Exec
     try {
       executeHolder.eventsManager.postStarted()
       executeHolder.start()
+    } catch {
+      // Errors raised before the execution holder has finished spawning a 
thread are considered
+      // plan execution failure, and the client should not try reattaching it 
afterwards.
+      case t: Throwable =>
+        
SparkConnectService.executionManager.removeExecuteHolder(executeHolder.key)
+        throw t
+    }
+
+    try {
       val responseSender =
         new 
ExecuteGrpcResponseSender[proto.ExecutePlanResponse](executeHolder, 
responseObserver)
       executeHolder.runGrpcResponseSender(responseSender)
diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
index 25e6cc48a199..2606284c25bd 100644
--- 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
@@ -429,4 +429,27 @@ class ReattachableExecuteSuite extends 
SparkConnectServerTest {
     val abandonedExecutions = manager.listAbandonedExecutions
     assert(abandonedExecutions.forall(_.operationId != dummyOpId))
   }
+
+  test("SPARK-49492: reattach must not succeed on an inactive execution 
holder") {
+    withRawBlockingStub { stub =>
+      val operationId = UUID.randomUUID().toString
+
+      // supply an invalid plan so that the execute plan handler raises an 
error
+      val iter = stub.executePlan(
+        buildExecutePlanRequest(proto.Plan.newBuilder().build(), operationId = 
operationId))
+
+      // expect that the execution fails before spawning an execute thread
+      val ee = intercept[StatusRuntimeException] {
+        iter.next()
+      }
+      assert(ee.getMessage.contains("INTERNAL"))
+
+      // reattach must fail
+      val reattach = 
stub.reattachExecute(buildReattachExecuteRequest(operationId, None))
+      val re = intercept[StatusRuntimeException] {
+        reattach.hasNext()
+      }
+      assert(re.getMessage.contains("INVALID_HANDLE.OPERATION_NOT_FOUND"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to