This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0ea9d9e71be7 [SPARK-47249][CONNECT] Fix bug where all connect
executions are considered abandoned regardless of their actual status
0ea9d9e71be7 is described below
commit 0ea9d9e71be70d52fb2d5b44a9c85ee01e4c3d46
Author: vicennial <[email protected]>
AuthorDate: Mon Mar 4 09:42:17 2024 +0900
[SPARK-47249][CONNECT] Fix bug where all connect executions are considered
abandoned regardless of their actual status
### What changes were proposed in this pull request?
Adds a guard to check if the execution was abandoned before putting it into
the `abandonedTombstones` cache
### Why are the changes needed?
Fixes a bug where all connect executions end up in the
`abandonedTombstones` cache due to a missing `if` guard.
### Does this PR introduce _any_ user-facing change?
Yes, it fixes previously incorrect behaviour. A user would hit an
`OPERATION_ABANDONED` error if they attempted to reattach to an operation that
has already been completed (for some particular reason) when it should not
error out (or at least, with the mentioned error).
### How was this patch tested?
New unit test.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #45358 from vicennial/SPARK-47249.
Authored-by: vicennial <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../sql/connect/service/SparkConnectExecutionManager.scala | 6 ++++--
.../sql/connect/execution/ReattachableExecuteSuite.scala | 12 ++++++++++++
2 files changed, 16 insertions(+), 2 deletions(-)
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
index 85fb150b3171..58e235f15085 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
@@ -120,8 +120,10 @@ private[connect] class SparkConnectExecutionManager()
extends Logging {
// close the execution outside the lock
executeHolder.foreach { e =>
e.close()
- // Update in abandonedTombstones: above it wasn't yet updated with
closedTime etc.
- abandonedTombstones.put(key, e.getExecuteInfo)
+ if (abandoned) {
+ // Update in abandonedTombstones: above it wasn't yet updated with
closedTime etc.
+ abandonedTombstones.put(key, e.getExecuteInfo)
+ }
}
}
diff --git
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
index cf1a2d5032af..3e22dc5c3fad 100644
---
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
+++
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
@@ -413,4 +413,16 @@ class ReattachableExecuteSuite extends
SparkConnectServerTest {
assert(newAccessTime > lastAccessTime, "reattach should update session
holder access time")
}
}
+
+ test("SPARK-47249: non-abandoned executions are not added to tombstone cache
upon close") {
+ val dummyOpId = UUID.randomUUID().toString
+ val dummyRequest =
+ buildExecutePlanRequest(buildPlan("select * from range(1)"), operationId
= dummyOpId)
+ val manager = SparkConnectService.executionManager
+ val holder = manager.createExecuteHolder(dummyRequest)
+ holder.eventsManager.postStarted()
+ manager.removeExecuteHolder(holder.key, abandoned = false)
+ val abandonedExecutions = manager.listAbandonedExecutions
+ assert(abandonedExecutions.forall(_.operationId != dummyOpId))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]