This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d2bedf4aa16 [SPARK-48056][CONNECT][FOLLOW-UP] Scala Client re-execute 
plan if a SESSION_NOT_FOUND error is raised and no partial response was received
2d2bedf4aa16 is described below

commit 2d2bedf4aa16adfc2f45c192c4b7b954788b3acd
Author: Changgyoo Park <[email protected]>
AuthorDate: Fri Jun 14 16:31:37 2024 +0800

    [SPARK-48056][CONNECT][FOLLOW-UP] Scala Client re-execute plan if a 
SESSION_NOT_FOUND error is raised and no partial response was received
    
    ### What changes were proposed in this pull request?
    
    This change lets a Scala Spark Connect client reattempt execution of a plan 
when it receives a SESSION_NOT_FOUND error from the Spark Connect service if it 
has not received any partial responses.
    
    This is a Scala version of the previous fix of the same issue - 
https://github.com/apache/spark/pull/46297.
    
    ### Why are the changes needed?
    
    Spark Connect clients often get a spurious error from the Spark Connect 
service if the service is busy or the network is congested. This error leads to 
a situation where the client immediately attempts to reattach without the 
service being aware of the client; this leads to a query failure.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Prevoiusly, a Scala Spark Connect client would fail with the error code 
"INVALID_HANDLE.SESSION_NOT_FOUND" in the very first attempt to make a request 
to the service, but with this change, the client will automatically retry.
    
    ### How was this patch tested?
    
    Attached unit test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #46971 from changgyoopark-db/SPARK-48056.
    
    Authored-by: Changgyoo Park <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 .../connect/client/SparkConnectClientSuite.scala   | 28 ++++++++++++++++++++++
 .../ExecutePlanResponseReattachableIterator.scala  | 14 +++++++----
 2 files changed, 38 insertions(+), 4 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
index 55f962b2a52c..46aeaeff43d2 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
@@ -530,6 +530,25 @@ class SparkConnectClientSuite extends ConnectFunSuite with 
BeforeAndAfterEach {
     assert(reattachableIter.resultComplete)
   }
 
+  test("SPARK-48056: Client execute gets INVALID_HANDLE.SESSION_NOT_FOUND and 
proceeds") {
+    startDummyServer(0)
+    client = SparkConnectClient
+      .builder()
+      .connectionString(s"sc://localhost:${server.getPort}")
+      .enableReattachableExecute()
+      .build()
+    service.errorToThrowOnExecute = Some(
+      new StatusRuntimeException(
+        Status.INTERNAL.withDescription("INVALID_HANDLE.SESSION_NOT_FOUND")))
+
+    val plan = buildPlan("select * from range(1)")
+    val iter = client.execute(plan)
+    val reattachableIter =
+      ExecutePlanResponseReattachableIterator.fromIterator(iter)
+    reattachableIter.foreach(_ => ())
+    assert(reattachableIter.resultComplete)
+  }
+
   test("GRPC stub unary call throws error immediately") {
     // Spark Connect error retry handling depends on the error being returned 
from the unary
     // call immediately.
@@ -609,6 +628,8 @@ class DummySparkConnectService() extends 
SparkConnectServiceGrpc.SparkConnectSer
   private val inputArtifactRequests: mutable.ListBuffer[AddArtifactsRequest] =
     mutable.ListBuffer.empty
 
+  var errorToThrowOnExecute: Option[Throwable] = None
+
   private[sql] def getAndClearLatestInputPlan(): proto.Plan = {
     val plan = inputPlan
     inputPlan = null
@@ -624,6 +645,13 @@ class DummySparkConnectService() extends 
SparkConnectServiceGrpc.SparkConnectSer
   override def executePlan(
       request: ExecutePlanRequest,
       responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+    if (errorToThrowOnExecute.isDefined) {
+      val error = errorToThrowOnExecute.get
+      errorToThrowOnExecute = None
+      responseObserver.onError(error)
+      return
+    }
+
     // Reply with a dummy response using the same client ID
     val requestSessionId = request.getSessionId
     val operationId = if (request.hasOperationId) {
diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
index 74f13272a365..f3c13c9c2c4d 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
@@ -42,7 +42,8 @@ import 
org.apache.spark.sql.connect.client.GrpcRetryHandler.RetryException
  * ReattachExecute request. ReattachExecute request is provided the responseId 
of last returned
  * ExecutePlanResponse on the iterator to return a new iterator from server 
that continues after
  * that. If the initial ExecutePlan did not even reach the server, and hence 
reattach fails with
- * INVALID_HANDLE.OPERATION_NOT_FOUND, we attempt to retry ExecutePlan.
+ * INVALID_HANDLE.OPERATION_NOT_FOUND or INVALID_HANDLE.SESSION_NOT_FOUND, we 
attempt to retry
+ * ExecutePlan.
  *
  * In reattachable execute the server does buffer some responses in case the 
client needs to
  * backtrack. To let server release this buffer sooner, this iterator 
asynchronously sends
@@ -66,7 +67,8 @@ class ExecutePlanResponseReattachableIterator(
     // Add operation id, if not present.
     // with operationId set by the client, the client can use it to try to 
reattach on error
     // even before getting the first response. If the operation in fact didn't 
even reach the
-    // server, that will end with INVALID_HANDLE.OPERATION_NOT_FOUND error.
+    // server, that will end with INVALID_HANDLE.OPERATION_NOT_FOUND or
+    // INVALID_HANDLE.SESSION_NOT_FOUND error.
     UUID.randomUUID.toString
   }
 
@@ -234,10 +236,14 @@ class ExecutePlanResponseReattachableIterator(
     } catch {
       case ex: StatusRuntimeException
           if Option(StatusProto.fromThrowable(ex))
-            
.exists(_.getMessage.contains("INVALID_HANDLE.OPERATION_NOT_FOUND")) =>
+            .exists(ex => {
+              ex.getMessage.contains("INVALID_HANDLE.OPERATION_NOT_FOUND") ||
+              ex.getMessage.contains("INVALID_HANDLE.SESSION_NOT_FOUND")
+            }) =>
         if (lastReturnedResponseId.isDefined) {
           throw new IllegalStateException(
-            "OPERATION_NOT_FOUND on the server but responses were already 
received from it.",
+            "OPERATION_NOT_FOUND/SESSION_NOT_FOUND on the server but responses 
were already " +
+              "received from it.",
             ex)
         }
         // Try a new ExecutePlan, and throw upstream for retry.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to