This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 08023c08800d [SPARK-50176][CONNECT][FOLLOWUP][3.5] Fix 
ReattachableExecuteSuite failure
08023c08800d is described below

commit 08023c08800da9700a4491cb7e979b4287dbfb46
Author: Changgyoo Park <[email protected]>
AuthorDate: Mon Nov 4 10:17:39 2024 -0800

    [SPARK-50176][CONNECT][FOLLOWUP][3.5] Fix ReattachableExecuteSuite failure
    
    ### What changes were proposed in this pull request?
    
    https://github.com/apache/spark/pull/48725 closes a session completely 
during ReattachableExecuteSuite causing 'sleep' to be unavailable in subsequent 
test cases.
    
    https://github.com/apache/spark/pull/43546 fixes the issue by re-creating 
the 'sleep' udf in each test case needing the udf, and this PR back-ports part 
of it.
    
    ### Why are the changes needed?
    
    In order to make the test green again.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #48745 from changgyoopark-db/SPARK-50176-3.5.
    
    Authored-by: Changgyoo Park <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../sql/connect/service/SparkConnectReattachExecuteHandler.scala      | 4 +++-
 .../scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala   | 4 ----
 .../apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala | 4 ++++
 3 files changed, 7 insertions(+), 5 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
index deb29e49367a..b8363c15e6ba 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
@@ -30,8 +30,10 @@ class SparkConnectReattachExecuteHandler(
 
   def handle(v: proto.ReattachExecuteRequest): Unit = {
     // An exception will be raised if the session is not available.
-    val _sessionHolder =
+    val sessionHolder =
       SparkConnectService.getIsolatedSession(v.getUserContext.getUserId, 
v.getSessionId)
+    assert(sessionHolder != null)
+
     val executeHolder = SparkConnectService.executionManager
       .getExecuteHolder(ExecuteKey(v.getUserContext.getUserId, v.getSessionId, 
v.getOperationId))
       .getOrElse {
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
index eddd1c6be72b..234ee526d438 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
@@ -52,10 +52,6 @@ trait SparkConnectServerTest extends SharedSparkSession {
     withSparkEnvConfs((Connect.CONNECT_GRPC_BINDING_PORT.key, 
serverPort.toString)) {
       SparkConnectService.start(spark.sparkContext)
     }
-    // register udf directly on the server, we're not testing client UDFs 
here...
-    val serverSession =
-      SparkConnectService.getOrCreateIsolatedSession(defaultUserId, 
defaultSessionId).session
-    serverSession.udf.register("sleep", ((ms: Int) => { Thread.sleep(ms); ms 
}))
   }
 
   override def afterAll(): Unit = {
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
index 28c1a8e4e109..f828e45e6a6c 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
@@ -367,6 +367,10 @@ class ReattachableExecuteSuite extends 
SparkConnectServerTest {
   }
 
   test("long sleeping query") {
+    // register udf directly on the server, we're not testing client UDFs 
here...
+    val serverSession =
+      SparkConnectService.getOrCreateIsolatedSession(defaultUserId, 
defaultSessionId).session
+    serverSession.udf.register("sleep", ((ms: Int) => { Thread.sleep(ms); ms 
}))
     // query will be sleeping and not returning results, while having multiple 
reattach
     withSparkEnvConfs(
       (Connect.CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION.key, 
"1s")) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to