This is an automated email from the ASF dual-hosted git repository.

sarutak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 09979af0d9cd [SPARK-53339][CONNECT] Fix interrupt on pending 
operations by moving `postStarted()` and allowing Pending to Canceled/Failed 
transition
09979af0d9cd is described below

commit 09979af0d9cd4c20fd0ed082362273d15fe739d1
Author: Kousuke Saruta <[email protected]>
AuthorDate: Mon Mar 16 16:52:48 2026 +0900

    [SPARK-53339][CONNECT] Fix interrupt on pending operations by moving 
`postStarted()` and allowing Pending to Canceled/Failed transition
    
    ### What changes were proposed in this pull request?
    This PR aims to solve SPARK-53339 using a different approach than #52083.
    The issue is that interrupting an operation in `Pending` state causes an 
`IllegalStateException` and leaves the operation in a broken state where 
subsequent interrupts never work.
    The root cause is that in 
`SparkConnectExecutionManager#createExecuteHolderAndAttach`, there was a window 
between `createExecuteHolder` (which registers the operation) and 
`postStarted()` where the operation was registered but still in `Pending` 
state. If an interrupt arrived during this window:
    
    1. `ExecuteThreadRunner#interrupt()` transitioned `state` from `notStarted` 
to `interrupted` via CAS
    2. `ErrorUtils.handleError` was called with `isInterrupted=true`, which 
called `postCanceled()`
    3. `postCanceled()` threw `IllegalStateException` because `Pending` was not 
in its allowed source statuses
    4. All subsequent interrupts for the same operation failed silently because 
`ExecuteThreadRunner.state` was already in the terminal `interrupted` state
    
    This issue can be reproduced by inserting `Thread.sleep(100)` into 
`SparkConnectExecutionManager#createExecuteHolderAndAttach` like as follows:
    
    ```
         val executeHolder = createExecuteHolder(executeKey, request, 
sessionHolder)
         try {
    +      Thread.sleep(1000)
           executeHolder.eventsManager.postStarted()
           executeHolder.start()
         } catch {
    ```
    
    And then run a test `interrupt all - background queries, foreground 
interrupt` in `SparkSessionE2ESuite`.
    ```
    $ build/sbt 'connect-client-jvm/testOnly 
org.apache.spark.sql.connect.SparkSessionE2ESuite -- -z "interrupt all - 
background queries, foreground interrupt"'
    ```
    
    The fix consists of:
    
    1. **Move `postStarted()` into `ExecuteThreadRunner#executeInternal()`** — 
Previously, `postStarted()` was called in `createExecuteHolderAndAttach` before 
`start()`, creating a window where an interrupt could race with the status 
transition. By moving `postStarted()` to right after the `notStarted -> 
started` CAS in `executeInternal()`, the status transition and the CAS are now 
sequenced — if interrupt wins the CAS (`notStarted -> interrupted`), 
`postStarted()` is never called.
    
    2. **Allow `Pending -> Canceled` and `Pending -> Failed` transitions** — 
When interrupt wins the CAS before `postStarted()` is called, 
`ExecuteEventsManager._status` is still `Pending`. The `postCanceled()` call 
from `ErrorUtils.handleError` needs to transition from `Pending` to `Canceled`. 
Similarly, `postFailed()` needs to handle the case where `postStarted()` itself 
throws an exception (e.g., session state check failure) while `_status` is 
still `Pending`.
    
    3. **Remove plan validation from `postStarted()`** — `postStarted()` 
previously threw `UnsupportedOperationException` for unknown `OpTypeCase` 
values (e.g., `OPTYPE_NOT_SET`). This was an implicit validation that doesn't 
belong in `postStarted()`, whose responsibility is status transition and 
listener event firing. The `case _` branch now falls back to `request.getPlan` 
instead of throwing, since the `plan` variable is only used for generating the 
`statement` text in the listener even [...]
    
    4. **Add early plan validation in `createExecuteHolderAndAttach`** — Since 
`postStarted()` was moved into `executeInternal()` (change 1) and no longer 
validates the plan (change 3), invalid plans that previously failed 
synchronously in `postStarted()` would now fail asynchronously inside the 
execution thread. This means the existing `catch` block in 
`createExecuteHolderAndAttach` — which calls `removeExecuteHolder` to clean up 
the holder — would no longer be triggered for invalid plan [...]
    
    ### Why are the changes needed?
    Bug fix.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Add  new tests.
    I also confirmed that `SparkSessionE2ESuite` mentioned above succeeded.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Kiro CLI / Opus 4.6
    
    Closes #54774 from sarutak/SPARK-53339-2.
    
    Authored-by: Kousuke Saruta <[email protected]>
    Signed-off-by: Kousuke Saruta <[email protected]>
---
 .../connect/execution/ExecuteThreadRunner.scala    |  5 ++++
 .../sql/connect/service/ExecuteEventsManager.scala | 10 +++++---
 .../service/SparkConnectExecutionManager.scala     | 12 ++++++++-
 .../service/ExecuteEventsManagerSuite.scala        | 30 ++++++++++++++++++++++
 4 files changed, 53 insertions(+), 4 deletions(-)

diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
index b7c335c6cfcf..9f606b698d30 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
@@ -193,6 +193,11 @@ private[connect] class ExecuteThreadRunner(executeHolder: 
ExecuteHolder) extends
       return
     }
 
+    // SPARK-53339: Post the Started event here, right after the CAS succeeds, 
to ensure that
+    // postStarted() is never called when interrupt() has already transitioned 
the state to
+    // interrupted. This eliminates the race between postStarted() and 
interrupt().
+    executeHolder.eventsManager.postStarted()
+
     // `withSession` ensures that session-specific artifacts (such as JARs and 
class files) are
     // available during processing.
     executeHolder.sessionHolder.withSession { session =>
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
index fcf01d5d29ab..286163e135d2 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
@@ -188,9 +188,7 @@ case class ExecuteEventsManager(executeHolder: 
ExecuteHolder, clock: Clock) {
       request.getPlan.getOpTypeCase match {
         case proto.Plan.OpTypeCase.COMMAND => request.getPlan.getCommand
         case proto.Plan.OpTypeCase.ROOT => request.getPlan.getRoot
-        case _ =>
-          throw new UnsupportedOperationException(
-            s"${request.getPlan.getOpTypeCase} not supported.")
+        case _ => request.getPlan
       }
 
     val event = SparkListenerConnectOperationStarted(
@@ -248,8 +246,11 @@ case class ExecuteEventsManager(executeHolder: 
ExecuteHolder, clock: Clock) {
    * Post @link 
org.apache.spark.sql.connect.service.SparkListenerConnectOperationCanceled.
    */
   def postCanceled(): Unit = {
+    // SPARK-53339: Pending is included to handle the case where interrupt() 
is called before
+    // postStarted() transitions the status from Pending to Started.
     assertStatus(
       List(
+        ExecuteStatus.Pending,
         ExecuteStatus.Started,
         ExecuteStatus.Analyzed,
         ExecuteStatus.ReadyForExecution,
@@ -269,8 +270,11 @@ case class ExecuteEventsManager(executeHolder: 
ExecuteHolder, clock: Clock) {
    *   The message of the error thrown during the request.
    */
   def postFailed(errorMessage: String): Unit = {
+    // SPARK-53339: Pending is included to handle the case where postStarted() 
itself throws
+    // an exception (e.g., session state check failure) before transitioning 
from Pending.
     assertStatus(
       List(
+        ExecuteStatus.Pending,
         ExecuteStatus.Started,
         ExecuteStatus.Analyzed,
         ExecuteStatus.ReadyForExecution,
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
index bb51438ce90f..2a936b526d96 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
@@ -35,6 +35,7 @@ import 
org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS
 import org.apache.spark.sql.connect.IllegalStateErrors
 import 
org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE,
 CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT, 
CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL}
 import org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender
+import org.apache.spark.sql.connect.planner.InvalidInputErrors
 import org.apache.spark.util.ThreadUtils
 
 // Unique key identifying execution by combination of user, session and 
operation id
@@ -191,7 +192,16 @@ private[connect] class SparkConnectExecutionManager() 
extends Logging {
       responseObserver: StreamObserver[proto.ExecutePlanResponse]): 
ExecuteHolder = {
     val executeHolder = createExecuteHolder(executeKey, request, sessionHolder)
     try {
-      executeHolder.eventsManager.postStarted()
+      // SPARK-53339: Validate the plan before starting the execution thread.
+      // postStarted() was moved into executeInternal(), so invalid plans that 
previously
+      // caused postStarted() to throw (and thus triggered removeExecuteHolder 
in this
+      // catch block) now fail asynchronously inside the execution thread. 
This early
+      // validation ensures that invalid plans are still caught synchronously 
here.
+      request.getPlan.getOpTypeCase match {
+        case proto.Plan.OpTypeCase.ROOT | proto.Plan.OpTypeCase.COMMAND => // 
valid
+        case other =>
+          throw InvalidInputErrors.invalidOneOfField(other, 
request.getPlan.getDescriptorForType)
+      }
       executeHolder.start()
     } catch {
       // Errors raised before the execution holder has finished spawning a 
thread are considered
diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala
index a96d0ab977c5..f5349a48330c 100644
--- 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala
@@ -138,6 +138,36 @@ class ExecuteEventsManagerSuite
         .isInstanceOf[SparkListenerConnectOperationCanceled])
   }
 
+  test("SPARK-53339: post canceled from Pending state") {
+    val events = setupEvents(ExecuteStatus.Pending)
+    events.postCanceled()
+    assert(events.status == ExecuteStatus.Canceled)
+    assert(events.terminationReason.contains(TerminationReason.Canceled))
+  }
+
+  test("SPARK-53339: post failed from Pending state") {
+    val events = setupEvents(ExecuteStatus.Pending)
+    events.postFailed(DEFAULT_ERROR)
+    assert(events.status == ExecuteStatus.Failed)
+    assert(events.terminationReason.contains(TerminationReason.Failed))
+  }
+
+  test("SPARK-53339: Pending to Canceled to Closed transition") {
+    val events = setupEvents(ExecuteStatus.Pending)
+    events.postCanceled()
+    events.postClosed()
+    assert(events.status == ExecuteStatus.Closed)
+    assert(events.terminationReason.contains(TerminationReason.Canceled))
+  }
+
+  test("SPARK-53339: Pending to Failed to Closed transition") {
+    val events = setupEvents(ExecuteStatus.Pending)
+    events.postFailed(DEFAULT_ERROR)
+    events.postClosed()
+    assert(events.status == ExecuteStatus.Closed)
+    assert(events.terminationReason.contains(TerminationReason.Failed))
+  }
+
   test("SPARK-43923: post failed") {
     val events = setupEvents(ExecuteStatus.Started)
     events.postFailed(DEFAULT_ERROR)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to