This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 0229c0ea451c [SPARK-50176][CONNECT][3.5] Disallow reattaching after 
the session is closed
0229c0ea451c is described below

commit 0229c0ea451c953e78cd273d9de2bc05962470a5
Author: changgyoopark-db <[email protected]>
AuthorDate: Fri Nov 1 12:03:21 2024 +0900

    [SPARK-50176][CONNECT][3.5] Disallow reattaching after the session is closed
    
    ### What changes were proposed in this pull request?
    
    Disallow cursors from reattaching corresponding ExecuteHolders after the 
session is closed.
    In order to prevent a session with a long-running query from being closed, 
the session is always accessed when reattaching.
    
    https://github.com/apache/spark/pull/44670 resolves this issue in Spark 
4.0.0.
    
    ### Why are the changes needed?
    
    SPARK-50176. Sessions with long running queries are susceptible to cache 
eviction, causing trouble when the client tries to reattach to the execution.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    org.apache.spark.sql.connect.execution.ReattachableExecuteSuite
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #48725 from changgyoopark-db/SPARK-50176-3.5.
    
    Authored-by: changgyoopark-db <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../service/SparkConnectReattachExecuteHandler.scala |  3 +++
 .../sql/connect/service/SparkConnectService.scala    |  7 +++++++
 .../connect/execution/ReattachableExecuteSuite.scala | 20 ++++++++++++++++++++
 3 files changed, 30 insertions(+)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
index 393b832de878..deb29e49367a 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
@@ -29,6 +29,9 @@ class SparkConnectReattachExecuteHandler(
     extends Logging {
 
   def handle(v: proto.ReattachExecuteRequest): Unit = {
+    // An exception will be raised if the session is not available.
+    val _sessionHolder =
+      SparkConnectService.getIsolatedSession(v.getUserContext.getUserId, 
v.getSessionId)
     val executeHolder = SparkConnectService.executionManager
       .getExecuteHolder(ExecuteKey(v.getUserContext.getUserId, v.getSessionId, 
v.getOperationId))
       .getOrElse {
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index e8af2acfd2e2..edf8147eff85 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -360,6 +360,13 @@ object SparkConnectService extends Logging {
     userSessionMapping.invalidateAll()
   }
 
+  /**
+   * Used for testing
+   */
+  private[connect] def invalidateSession(userId: String, sessionId: String): 
Unit = {
+    userSessionMapping.invalidate((userId, sessionId))
+  }
+
   /**
    * Used for testing.
    */
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
index 06cd1a5666b6..28c1a8e4e109 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
@@ -56,6 +56,26 @@ class ReattachableExecuteSuite extends 
SparkConnectServerTest {
     }
   }
 
+  test("reattach after connection expired") {
+    withClient { client =>
+      val iter = client.execute(buildPlan(MEDIUM_RESULTS_QUERY))
+      val operationId = getReattachableIterator(iter).operationId
+      // open the iterator
+      iter.next()
+
+      SparkConnectService.invalidateSession(defaultUserId, defaultSessionId)
+      withRawBlockingStub { stub =>
+        val iter2 = 
stub.reattachExecute(buildReattachExecuteRequest(operationId, None))
+
+        // session closed, bound to fail
+        val e = intercept[StatusRuntimeException] {
+          while (iter2.hasNext) iter2.next()
+        }
+        assert(e.getMessage.contains("INVALID_HANDLE.SESSION_NOT_FOUND"))
+      }
+    }
+  }
+
   test("raw interrupted RPC results in INVALID_CURSOR.DISCONNECTED error") {
     withRawBlockingStub { stub =>
       val iter = 
stub.executePlan(buildExecutePlanRequest(buildPlan(MEDIUM_RESULTS_QUERY)))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to