This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 08023c08800d [SPARK-50176][CONNECT][FOLLOWUP][3.5] Fix
ReattachableExecuteSuite failure
08023c08800d is described below
commit 08023c08800da9700a4491cb7e979b4287dbfb46
Author: Changgyoo Park <[email protected]>
AuthorDate: Mon Nov 4 10:17:39 2024 -0800
[SPARK-50176][CONNECT][FOLLOWUP][3.5] Fix ReattachableExecuteSuite failure
### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/48725 closes a session completely
during ReattachableExecuteSuite causing 'sleep' to be unavailable in subsequent
test cases.
https://github.com/apache/spark/pull/43546 fixes the issue by re-creating
the 'sleep' udf in each test case needing the udf, and this PR back-ports part
of it.
### Why are the changes needed?
In order to make the test green again.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48745 from changgyoopark-db/SPARK-50176-3.5.
Authored-by: Changgyoo Park <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../sql/connect/service/SparkConnectReattachExecuteHandler.scala | 4 +++-
.../scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala | 4 ----
.../apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala | 4 ++++
3 files changed, 7 insertions(+), 5 deletions(-)
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
index deb29e49367a..b8363c15e6ba 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
@@ -30,8 +30,10 @@ class SparkConnectReattachExecuteHandler(
def handle(v: proto.ReattachExecuteRequest): Unit = {
// An exception will be raised if the session is not available.
- val _sessionHolder =
+ val sessionHolder =
SparkConnectService.getIsolatedSession(v.getUserContext.getUserId,
v.getSessionId)
+ assert(sessionHolder != null)
+
val executeHolder = SparkConnectService.executionManager
.getExecuteHolder(ExecuteKey(v.getUserContext.getUserId, v.getSessionId,
v.getOperationId))
.getOrElse {
diff --git
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
index eddd1c6be72b..234ee526d438 100644
---
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
+++
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
@@ -52,10 +52,6 @@ trait SparkConnectServerTest extends SharedSparkSession {
withSparkEnvConfs((Connect.CONNECT_GRPC_BINDING_PORT.key,
serverPort.toString)) {
SparkConnectService.start(spark.sparkContext)
}
- // register udf directly on the server, we're not testing client UDFs
here...
- val serverSession =
- SparkConnectService.getOrCreateIsolatedSession(defaultUserId,
defaultSessionId).session
- serverSession.udf.register("sleep", ((ms: Int) => { Thread.sleep(ms); ms
}))
}
override def afterAll(): Unit = {
diff --git
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
index 28c1a8e4e109..f828e45e6a6c 100644
---
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
+++
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
@@ -367,6 +367,10 @@ class ReattachableExecuteSuite extends
SparkConnectServerTest {
}
test("long sleeping query") {
+ // register udf directly on the server, we're not testing client UDFs
here...
+ val serverSession =
+ SparkConnectService.getOrCreateIsolatedSession(defaultUserId,
defaultSessionId).session
+ serverSession.udf.register("sleep", ((ms: Int) => { Thread.sleep(ms); ms
}))
// query will be sleeping and not returning results, while having multiple
reattach
withSparkEnvConfs(
(Connect.CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION.key,
"1s")) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]