This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 0229c0ea451c [SPARK-50176][CONNECT][3.5] Disallow reattaching after
the session is closed
0229c0ea451c is described below
commit 0229c0ea451c953e78cd273d9de2bc05962470a5
Author: changgyoopark-db <[email protected]>
AuthorDate: Fri Nov 1 12:03:21 2024 +0900
[SPARK-50176][CONNECT][3.5] Disallow reattaching after the session is closed
### What changes were proposed in this pull request?
Disallow cursors from reattaching corresponding ExecuteHolders after the
session is closed.
In order to prevent a session with a long-running query from being closed,
the session is always accessed when reattaching.
https://github.com/apache/spark/pull/44670 resolves this issue in Spark
4.0.0.
### Why are the changes needed?
SPARK-50176. Sessions with long running queries are susceptible to cache
eviction, causing trouble when the client tries to reattach to the execution.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
org.apache.spark.sql.connect.execution.ReattachableExecuteSuite
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48725 from changgyoopark-db/SPARK-50176-3.5.
Authored-by: changgyoopark-db <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../service/SparkConnectReattachExecuteHandler.scala | 3 +++
.../sql/connect/service/SparkConnectService.scala | 7 +++++++
.../connect/execution/ReattachableExecuteSuite.scala | 20 ++++++++++++++++++++
3 files changed, 30 insertions(+)
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
index 393b832de878..deb29e49367a 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
@@ -29,6 +29,9 @@ class SparkConnectReattachExecuteHandler(
extends Logging {
def handle(v: proto.ReattachExecuteRequest): Unit = {
+ // An exception will be raised if the session is not available.
+ val _sessionHolder =
+ SparkConnectService.getIsolatedSession(v.getUserContext.getUserId,
v.getSessionId)
val executeHolder = SparkConnectService.executionManager
.getExecuteHolder(ExecuteKey(v.getUserContext.getUserId, v.getSessionId,
v.getOperationId))
.getOrElse {
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index e8af2acfd2e2..edf8147eff85 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -360,6 +360,13 @@ object SparkConnectService extends Logging {
userSessionMapping.invalidateAll()
}
+ /**
+ * Used for testing
+ */
+ private[connect] def invalidateSession(userId: String, sessionId: String):
Unit = {
+ userSessionMapping.invalidate((userId, sessionId))
+ }
+
/**
* Used for testing.
*/
diff --git
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
index 06cd1a5666b6..28c1a8e4e109 100644
---
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
+++
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
@@ -56,6 +56,26 @@ class ReattachableExecuteSuite extends
SparkConnectServerTest {
}
}
+ test("reattach after connection expired") {
+ withClient { client =>
+ val iter = client.execute(buildPlan(MEDIUM_RESULTS_QUERY))
+ val operationId = getReattachableIterator(iter).operationId
+ // open the iterator
+ iter.next()
+
+ SparkConnectService.invalidateSession(defaultUserId, defaultSessionId)
+ withRawBlockingStub { stub =>
+ val iter2 =
stub.reattachExecute(buildReattachExecuteRequest(operationId, None))
+
+ // session closed, bound to fail
+ val e = intercept[StatusRuntimeException] {
+ while (iter2.hasNext) iter2.next()
+ }
+ assert(e.getMessage.contains("INVALID_HANDLE.SESSION_NOT_FOUND"))
+ }
+ }
+ }
+
test("raw interrupted RPC results in INVALID_CURSOR.DISCONNECTED error") {
withRawBlockingStub { stub =>
val iter =
stub.executePlan(buildExecutePlanRequest(buildPlan(MEDIUM_RESULTS_QUERY)))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]