This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c8ea75d92ee [SPARK-46660][CONNECT] ReattachExecute requests updates 
aliveness of SessionHolder
2c8ea75d92ee is described below

commit 2c8ea75d92ee49284a0eb2aaecb010d1112d5bef
Author: vicennial <venkata.gud...@databricks.com>
AuthorDate: Mon Jan 15 17:35:26 2024 +0900

    [SPARK-46660][CONNECT] ReattachExecute requests updates aliveness of 
SessionHolder
    
    ### What changes were proposed in this pull request?
    
    This PR makes `SparkConnectReattachExecuteHandler` fetch the 
`ExecuteHolder` via the`SessionHolder` which in turn refreshes it's aliveness. 
Further, this makes it consistent with `SparkConnectReleaseExecuteHandler`.
    
    ### Why are the changes needed?
    
    Currently in ReattachExecute, we fetch the ExecuteHolder directly without 
going through the SessionHolder and hence the "aliveness" of the SessionHolder 
is not refreshed.
    
    This would result in long-running queries (which do not send 
`ReleaseExecute` requests in specific) failing because the `SessionHolder` 
would expire from the cache during an active query execution.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it fixes the bug where long-running may fail when their corresponding 
`SessionHolder` is expired during active query execution.
    
    ### How was this patch tested?
    
    New unit test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44670 from vicennial/SPARK-46660.
    
    Authored-by: vicennial <venkata.gud...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../SparkConnectReattachExecuteHandler.scala       | 34 +++++++++++-----------
 .../execution/ReattachableExecuteSuite.scala       | 19 ++++++++++++
 .../service/SparkConnectServiceE2ESuite.scala      | 12 ++++----
 .../sql/tests/connect/client/test_reattach.py      | 14 ++++-----
 4 files changed, 47 insertions(+), 32 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
index 393b832de878..ecad8c9c73a1 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
@@ -29,25 +29,25 @@ class SparkConnectReattachExecuteHandler(
     extends Logging {
 
   def handle(v: proto.ReattachExecuteRequest): Unit = {
-    val executeHolder = SparkConnectService.executionManager
-      .getExecuteHolder(ExecuteKey(v.getUserContext.getUserId, v.getSessionId, 
v.getOperationId))
-      .getOrElse {
-        if (SparkConnectService.executionManager
-            .getAbandonedTombstone(
-              ExecuteKey(v.getUserContext.getUserId, v.getSessionId, 
v.getOperationId))
-            .isDefined) {
-          logDebug(s"Reattach operation abandoned: ${v.getOperationId}")
-          throw new SparkSQLException(
-            errorClass = "INVALID_HANDLE.OPERATION_ABANDONED",
-            messageParameters = Map("handle" -> v.getOperationId))
+    val sessionHolder = SparkConnectService.sessionManager
+      .getIsolatedSession(SessionKey(v.getUserContext.getUserId, 
v.getSessionId))
 
-        } else {
-          logDebug(s"Reattach operation not found: ${v.getOperationId}")
-          throw new SparkSQLException(
-            errorClass = "INVALID_HANDLE.OPERATION_NOT_FOUND",
-            messageParameters = Map("handle" -> v.getOperationId))
-        }
+    val executeHolder = 
sessionHolder.executeHolder(v.getOperationId).getOrElse {
+      if (SparkConnectService.executionManager
+          .getAbandonedTombstone(
+            ExecuteKey(v.getUserContext.getUserId, v.getSessionId, 
v.getOperationId))
+          .isDefined) {
+        logDebug(s"Reattach operation abandoned: ${v.getOperationId}")
+        throw new SparkSQLException(
+          errorClass = "INVALID_HANDLE.OPERATION_ABANDONED",
+          messageParameters = Map("handle" -> v.getOperationId))
+      } else {
+        logDebug(s"Reattach operation not found: ${v.getOperationId}")
+        throw new SparkSQLException(
+          errorClass = "INVALID_HANDLE.OPERATION_NOT_FOUND",
+          messageParameters = Map("handle" -> v.getOperationId))
       }
+    }
     if (!executeHolder.reattachable) {
       logWarning(s"Reattach to not reattachable operation.")
       throw new SparkSQLException(
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
index f80229c61980..cf1a2d5032af 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
@@ -394,4 +394,23 @@ class ReattachableExecuteSuite extends 
SparkConnectServerTest {
       assertEventuallyNoActiveExecutions()
     }
   }
+
+  test("SPARK-46660: reattach updates aliveness of session holder") {
+    withRawBlockingStub { stub =>
+      val operationId = UUID.randomUUID().toString
+      val iter = stub.executePlan(
+        buildExecutePlanRequest(buildPlan(MEDIUM_RESULTS_QUERY), operationId = 
operationId))
+      iter.next() // open the iterator, guarantees that the RPC reached the 
server
+
+      val executionHolder = getExecutionHolder
+      val lastAccessTime = 
executionHolder.sessionHolder.getSessionHolderInfo.lastAccessTimeMs
+
+      // send reattach
+      val iter2 = 
stub.reattachExecute(buildReattachExecuteRequest(operationId, None))
+      iter2.next() // open the iterator, guarantees that the RPC reached the 
server
+      val newAccessTime = 
executionHolder.sessionHolder.getSessionHolderInfo.lastAccessTimeMs
+
+      assert(newAccessTime > lastAccessTime, "reattach should update session 
holder access time")
+    }
+  }
 }
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
index c0b7eaf5823d..7776148077fc 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
@@ -66,22 +66,22 @@ class SparkConnectServiceE2ESuite extends 
SparkConnectServerTest {
       // query1 and query2 could get either an:
       // OPERATION_CANCELED if it happens fast - when closing the session 
interrupted the queries,
       // and that error got pushed to the client buffers before the client got 
disconnected.
-      // OPERATION_ABANDONED if it happens slow - when closing the session 
interrupted the client
-      // RPCs before it pushed out the error above. The client would then get 
an
+      // INVALID_HANDLE.SESSION_CLOSED if it happens slow - when closing the 
session interrupted the
+      // client RPCs before it pushed out the error above. The client would 
then get an
       // INVALID_CURSOR.DISCONNECTED, which it will retry with a 
ReattachExecute, and then get an
-      // INVALID_HANDLE.OPERATION_ABANDONED.
+      // INVALID_HANDLE.SESSION_CLOSED.
       val query1Error = intercept[SparkException] {
         while (query1.hasNext) query1.next()
       }
       assert(
         query1Error.getMessage.contains("OPERATION_CANCELED") ||
-          
query1Error.getMessage.contains("INVALID_HANDLE.OPERATION_ABANDONED"))
+          query1Error.getMessage.contains("INVALID_HANDLE.SESSION_CLOSED"))
       val query2Error = intercept[SparkException] {
         while (query2.hasNext) query2.next()
       }
       assert(
         query2Error.getMessage.contains("OPERATION_CANCELED") ||
-          
query2Error.getMessage.contains("INVALID_HANDLE.OPERATION_ABANDONED"))
+          query2Error.getMessage.contains("INVALID_HANDLE.SESSION_CLOSED"))
 
       // No other requests should be allowed in the session, failing with 
SESSION_CLOSED
       val requestError = intercept[SparkException] {
@@ -115,7 +115,7 @@ class SparkConnectServiceE2ESuite extends 
SparkConnectServerTest {
         }
         assert(
           queryAError.getMessage.contains("OPERATION_CANCELED") ||
-            
queryAError.getMessage.contains("INVALID_HANDLE.OPERATION_ABANDONED"))
+            queryAError.getMessage.contains("INVALID_HANDLE.SESSION_CLOSED"))
 
         // B's query can run.
         while (queryB.hasNext) queryB.next()
diff --git a/python/pyspark/sql/tests/connect/client/test_reattach.py 
b/python/pyspark/sql/tests/connect/client/test_reattach.py
index 5f2cb3c49377..cea0be7008cc 100644
--- a/python/pyspark/sql/tests/connect/client/test_reattach.py
+++ b/python/pyspark/sql/tests/connect/client/test_reattach.py
@@ -84,10 +84,10 @@ class SparkConnectReattachTestCase(ReusedConnectTestCase, 
SQLTestUtils, PandasOn
         # query1 and query2 could get either an:
         # OPERATION_CANCELED if it happens fast - when closing the session 
interrupted the queries,
         # and that error got pushed to the client buffers before the client 
got disconnected.
-        # OPERATION_ABANDONED if it happens slow - when closing the session 
interrupted the client
-        # RPCs before it pushed out the error above. The client would then get 
an
+        # INVALID_HANDLE.SESSION_CLOSED if it happens slow - when closing the 
session interrupted
+        # the client RPCs before it pushed out the error above. The client 
would then get an
         # INVALID_CURSOR.DISCONNECTED, which it will retry with a 
ReattachExecute, and then get an
-        # INVALID_HANDLE.OPERATION_ABANDONED.
+        # INVALID_HANDLE.SESSION_CLOSED.
 
         def check_error(q):
             try:
@@ -97,14 +97,10 @@ class SparkConnectReattachTestCase(ReusedConnectTestCase, 
SQLTestUtils, PandasOn
 
         e = check_error(query1)
         self.assertIsNotNone(e, "An exception has to be thrown")
-        self.assertTrue(
-            "OPERATION_CANCELED" in str(e) or 
"INVALID_HANDLE.OPERATION_ABANDONED" in str(e)
-        )
+        self.assertTrue("OPERATION_CANCELED" in str(e) or 
"INVALID_HANDLE.SESSION_CLOSED" in str(e))
         e = check_error(query2)
         self.assertIsNotNone(e, "An exception has to be thrown")
-        self.assertTrue(
-            "OPERATION_CANCELED" in str(e) or 
"INVALID_HANDLE.OPERATION_ABANDONED" in str(e)
-        )
+        self.assertTrue("OPERATION_CANCELED" in str(e) or 
"INVALID_HANDLE.SESSION_CLOSED" in str(e))
 
         # query3 has not been submitted before, so it should now fail with 
SESSION_CLOSED
         e = check_error(query3)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to