This is an automated email from the ASF dual-hosted git repository.
sarutak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 09979af0d9cd [SPARK-53339][CONNECT] Fix interrupt on pending
operations by moving `postStarted()` and allowing Pending to Canceled/Failed
transition
09979af0d9cd is described below
commit 09979af0d9cd4c20fd0ed082362273d15fe739d1
Author: Kousuke Saruta <[email protected]>
AuthorDate: Mon Mar 16 16:52:48 2026 +0900
[SPARK-53339][CONNECT] Fix interrupt on pending operations by moving
`postStarted()` and allowing Pending to Canceled/Failed transition
### What changes were proposed in this pull request?
This PR aims to solve SPARK-53339 using a different approach than #52083.
The issue is that interrupting an operation in `Pending` state causes an
`IllegalStateException` and leaves the operation in a broken state where
subsequent interrupts never work.
The root cause is that in
`SparkConnectExecutionManager#createExecuteHolderAndAttach`, there was a window
between `createExecuteHolder` (which registers the operation) and
`postStarted()` where the operation was registered but still in `Pending`
state. If an interrupt arrived during this window:
1. `ExecuteThreadRunner#interrupt()` transitioned `state` from `notStarted`
to `interrupted` via CAS
2. `ErrorUtils.handleError` was called with `isInterrupted=true`, which
called `postCanceled()`
3. `postCanceled()` threw `IllegalStateException` because `Pending` was not
in its allowed source statuses
4. All subsequent interrupts for the same operation failed silently because
`ExecuteThreadRunner.state` was already in the terminal `interrupted` state
This issue can be reproduced by inserting `Thread.sleep(100)` into
`SparkConnectExecutionManager#createExecuteHolderAndAttach` like as follows:
```
val executeHolder = createExecuteHolder(executeKey, request,
sessionHolder)
try {
+ Thread.sleep(1000)
executeHolder.eventsManager.postStarted()
executeHolder.start()
} catch {
```
And then run a test `interrupt all - background queries, foreground
interrupt` in `SparkSessionE2ESuite`.
```
$ build/sbt 'connect-client-jvm/testOnly
org.apache.spark.sql.connect.SparkSessionE2ESuite -- -z "interrupt all -
background queries, foreground interrupt"'
```
The fix consists of:
1. **Move `postStarted()` into `ExecuteThreadRunner#executeInternal()`** —
Previously, `postStarted()` was called in `createExecuteHolderAndAttach` before
`start()`, creating a window where an interrupt could race with the status
transition. By moving `postStarted()` to right after the `notStarted ->
started` CAS in `executeInternal()`, the status transition and the CAS are now
sequenced — if interrupt wins the CAS (`notStarted -> interrupted`),
`postStarted()` is never called.
2. **Allow `Pending -> Canceled` and `Pending -> Failed` transitions** —
When interrupt wins the CAS before `postStarted()` is called,
`ExecuteEventsManager._status` is still `Pending`. The `postCanceled()` call
from `ErrorUtils.handleError` needs to transition from `Pending` to `Canceled`.
Similarly, `postFailed()` needs to handle the case where `postStarted()` itself
throws an exception (e.g., session state check failure) while `_status` is
still `Pending`.
3. **Remove plan validation from `postStarted()`** — `postStarted()`
previously threw `UnsupportedOperationException` for unknown `OpTypeCase`
values (e.g., `OPTYPE_NOT_SET`). This was an implicit validation that doesn't
belong in `postStarted()`, whose responsibility is status transition and
listener event firing. The `case _` branch now falls back to `request.getPlan`
instead of throwing, since the `plan` variable is only used for generating the
`statement` text in the listener even [...]
4. **Add early plan validation in `createExecuteHolderAndAttach`** — Since
`postStarted()` was moved into `executeInternal()` (change 1) and no longer
validates the plan (change 3), invalid plans that previously failed
synchronously in `postStarted()` would now fail asynchronously inside the
execution thread. This means the existing `catch` block in
`createExecuteHolderAndAttach` — which calls `removeExecuteHolder` to clean up
the holder — would no longer be triggered for invalid plan [...]
### Why are the changes needed?
Bug fix.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add new tests.
I also confirmed that `SparkSessionE2ESuite` mentioned above succeeded.
### Was this patch authored or co-authored using generative AI tooling?
Kiro CLI / Opus 4.6
Closes #54774 from sarutak/SPARK-53339-2.
Authored-by: Kousuke Saruta <[email protected]>
Signed-off-by: Kousuke Saruta <[email protected]>
---
.../connect/execution/ExecuteThreadRunner.scala | 5 ++++
.../sql/connect/service/ExecuteEventsManager.scala | 10 +++++---
.../service/SparkConnectExecutionManager.scala | 12 ++++++++-
.../service/ExecuteEventsManagerSuite.scala | 30 ++++++++++++++++++++++
4 files changed, 53 insertions(+), 4 deletions(-)
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
index b7c335c6cfcf..9f606b698d30 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
@@ -193,6 +193,11 @@ private[connect] class ExecuteThreadRunner(executeHolder:
ExecuteHolder) extends
return
}
+ // SPARK-53339: Post the Started event here, right after the CAS succeeds,
to ensure that
+ // postStarted() is never called when interrupt() has already transitioned
the state to
+ // interrupted. This eliminates the race between postStarted() and
interrupt().
+ executeHolder.eventsManager.postStarted()
+
// `withSession` ensures that session-specific artifacts (such as JARs and
class files) are
// available during processing.
executeHolder.sessionHolder.withSession { session =>
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
index fcf01d5d29ab..286163e135d2 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
@@ -188,9 +188,7 @@ case class ExecuteEventsManager(executeHolder:
ExecuteHolder, clock: Clock) {
request.getPlan.getOpTypeCase match {
case proto.Plan.OpTypeCase.COMMAND => request.getPlan.getCommand
case proto.Plan.OpTypeCase.ROOT => request.getPlan.getRoot
- case _ =>
- throw new UnsupportedOperationException(
- s"${request.getPlan.getOpTypeCase} not supported.")
+ case _ => request.getPlan
}
val event = SparkListenerConnectOperationStarted(
@@ -248,8 +246,11 @@ case class ExecuteEventsManager(executeHolder:
ExecuteHolder, clock: Clock) {
* Post @link
org.apache.spark.sql.connect.service.SparkListenerConnectOperationCanceled.
*/
def postCanceled(): Unit = {
+ // SPARK-53339: Pending is included to handle the case where interrupt()
is called before
+ // postStarted() transitions the status from Pending to Started.
assertStatus(
List(
+ ExecuteStatus.Pending,
ExecuteStatus.Started,
ExecuteStatus.Analyzed,
ExecuteStatus.ReadyForExecution,
@@ -269,8 +270,11 @@ case class ExecuteEventsManager(executeHolder:
ExecuteHolder, clock: Clock) {
* The message of the error thrown during the request.
*/
def postFailed(errorMessage: String): Unit = {
+ // SPARK-53339: Pending is included to handle the case where postStarted()
itself throws
+ // an exception (e.g., session state check failure) before transitioning
from Pending.
assertStatus(
List(
+ ExecuteStatus.Pending,
ExecuteStatus.Started,
ExecuteStatus.Analyzed,
ExecuteStatus.ReadyForExecution,
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
index bb51438ce90f..2a936b526d96 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
@@ -35,6 +35,7 @@ import
org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS
import org.apache.spark.sql.connect.IllegalStateErrors
import
org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE,
CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT,
CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL}
import org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender
+import org.apache.spark.sql.connect.planner.InvalidInputErrors
import org.apache.spark.util.ThreadUtils
// Unique key identifying execution by combination of user, session and
operation id
@@ -191,7 +192,16 @@ private[connect] class SparkConnectExecutionManager()
extends Logging {
responseObserver: StreamObserver[proto.ExecutePlanResponse]):
ExecuteHolder = {
val executeHolder = createExecuteHolder(executeKey, request, sessionHolder)
try {
- executeHolder.eventsManager.postStarted()
+ // SPARK-53339: Validate the plan before starting the execution thread.
+ // postStarted() was moved into executeInternal(), so invalid plans that
previously
+ // caused postStarted() to throw (and thus triggered removeExecuteHolder
in this
+ // catch block) now fail asynchronously inside the execution thread.
This early
+ // validation ensures that invalid plans are still caught synchronously
here.
+ request.getPlan.getOpTypeCase match {
+ case proto.Plan.OpTypeCase.ROOT | proto.Plan.OpTypeCase.COMMAND => //
valid
+ case other =>
+ throw InvalidInputErrors.invalidOneOfField(other,
request.getPlan.getDescriptorForType)
+ }
executeHolder.start()
} catch {
// Errors raised before the execution holder has finished spawning a
thread are considered
diff --git
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala
index a96d0ab977c5..f5349a48330c 100644
---
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala
+++
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala
@@ -138,6 +138,36 @@ class ExecuteEventsManagerSuite
.isInstanceOf[SparkListenerConnectOperationCanceled])
}
+ test("SPARK-53339: post canceled from Pending state") {
+ val events = setupEvents(ExecuteStatus.Pending)
+ events.postCanceled()
+ assert(events.status == ExecuteStatus.Canceled)
+ assert(events.terminationReason.contains(TerminationReason.Canceled))
+ }
+
+ test("SPARK-53339: post failed from Pending state") {
+ val events = setupEvents(ExecuteStatus.Pending)
+ events.postFailed(DEFAULT_ERROR)
+ assert(events.status == ExecuteStatus.Failed)
+ assert(events.terminationReason.contains(TerminationReason.Failed))
+ }
+
+ test("SPARK-53339: Pending to Canceled to Closed transition") {
+ val events = setupEvents(ExecuteStatus.Pending)
+ events.postCanceled()
+ events.postClosed()
+ assert(events.status == ExecuteStatus.Closed)
+ assert(events.terminationReason.contains(TerminationReason.Canceled))
+ }
+
+ test("SPARK-53339: Pending to Failed to Closed transition") {
+ val events = setupEvents(ExecuteStatus.Pending)
+ events.postFailed(DEFAULT_ERROR)
+ events.postClosed()
+ assert(events.status == ExecuteStatus.Closed)
+ assert(events.terminationReason.contains(TerminationReason.Failed))
+ }
+
test("SPARK-43923: post failed") {
val events = setupEvents(ExecuteStatus.Started)
events.postFailed(DEFAULT_ERROR)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]