This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new d077a7a85fd7 [SPARK-52397][CONNECT] Idempotent ExecutePlan: the second 
ExecutePlan with same operationId and plan reattaches
d077a7a85fd7 is described below

commit d077a7a85fd721b109122a6c1375b718eeb8a306
Author: Xi Lyu <xi....@databricks.com>
AuthorDate: Thu Jun 5 10:04:58 2025 -0400

    [SPARK-52397][CONNECT] Idempotent ExecutePlan: the second ExecutePlan with 
same operationId and plan reattaches
    
    ### What changes were proposed in this pull request?
    
    In Spark Connect, queries can fail with the error 
INVALID_HANDLE.OPERATION_ALREADY_EXISTS, when a client retries an ExecutePlan 
RPC—often due to transient network issues—causing the server to receive the 
same request multiple times. Since each ExecutePlan request includes an 
operation_id, the server interprets the duplicate as an attempt to create an 
already existing operation, which results in the OPERATION_ALREADY_EXISTS 
exception. This behavior interrupts query execution and breaks [...]
    
    To resolve this, the PR introduces idempotent handling of ExecutePlan on 
the server side. When a request with a previously seen operation_id and the 
same plan is received, instead of returning an error, the server now reattaches 
the response stream to the already running execution associated with that 
operation. This ensures that retries due to network flakiness no longer result 
in failed queries, thereby improving the resilience and robustness of query 
executions.
    
    ### Why are the changes needed?
    
    It will improve the stability of Spark Connect in case of transient network 
issues.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Yes, new tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #51084 from xi-db/fix_operation_already_exists.
    
    Authored-by: Xi Lyu <xi....@databricks.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
    (cherry picked from commit 273a5e79ec73db2e1460bdf77ca77ebbb174e083)
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../service/SparkConnectExecutePlanHandler.scala   | 37 +++++----
 .../service/SparkConnectExecutionManager.scala     | 92 +++++++++++++++++++---
 .../SparkConnectReattachExecuteHandler.scala       | 28 ++-----
 .../execution/ReattachableExecuteSuite.scala       | 51 ++++++++++++
 4 files changed, 159 insertions(+), 49 deletions(-)

diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala
index 73a20e448be8..027f4517cf3b 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala
@@ -19,32 +19,35 @@ package org.apache.spark.sql.connect.service
 
 import io.grpc.stub.StreamObserver
 
+import org.apache.spark.SparkSQLException
 import org.apache.spark.connect.proto
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender
 
 class SparkConnectExecutePlanHandler(responseObserver: 
StreamObserver[proto.ExecutePlanResponse])
     extends Logging {
 
   def handle(v: proto.ExecutePlanRequest): Unit = {
-    val executeHolder = 
SparkConnectService.executionManager.createExecuteHolder(v)
-    try {
-      executeHolder.eventsManager.postStarted()
-      executeHolder.start()
-    } catch {
-      // Errors raised before the execution holder has finished spawning a 
thread are considered
-      // plan execution failure, and the client should not try reattaching it 
afterwards.
-      case t: Throwable =>
-        
SparkConnectService.executionManager.removeExecuteHolder(executeHolder.key)
-        throw t
+    val previousSessionId = v.hasClientObservedServerSideSessionId match {
+      case true => Some(v.getClientObservedServerSideSessionId)
+      case false => None
     }
+    val sessionHolder = SparkConnectService
+      .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId, 
previousSessionId)
+    val executeKey = ExecuteKey(v, sessionHolder)
 
-    try {
-      val responseSender =
-        new 
ExecuteGrpcResponseSender[proto.ExecutePlanResponse](executeHolder, 
responseObserver)
-      executeHolder.runGrpcResponseSender(responseSender)
-    } finally {
-      executeHolder.afterInitialRPC()
+    SparkConnectService.executionManager.getExecuteHolder(executeKey) match {
+      case None =>
+        // Create a new execute holder and attach to it.
+        SparkConnectService.executionManager
+          .createExecuteHolderAndAttach(executeKey, v, sessionHolder, 
responseObserver)
+      case Some(executeHolder) if 
executeHolder.request.getPlan.equals(v.getPlan) =>
+        // If the execute holder already exists with the same plan, reattach 
to it.
+        SparkConnectService.executionManager
+          .reattachExecuteHolder(executeHolder, responseObserver, None)
+      case Some(_) =>
+        throw new SparkSQLException(
+          errorClass = "INVALID_HANDLE.OPERATION_ALREADY_EXISTS",
+          messageParameters = Map("handle" -> executeKey.operationId))
     }
   }
 }
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
index c158de48574f..32080912ab70 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
@@ -26,12 +26,14 @@ import scala.jdk.CollectionConverters._
 import scala.util.control.NonFatal
 
 import com.google.common.cache.CacheBuilder
+import io.grpc.stub.StreamObserver
 
 import org.apache.spark.{SparkEnv, SparkSQLException}
 import org.apache.spark.connect.proto
 import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS
 import 
org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE,
 CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT, 
CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL}
+import org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender
 import org.apache.spark.util.ThreadUtils
 
 // Unique key identifying execution by combination of user, session and 
operation id
@@ -83,17 +85,10 @@ private[connect] class SparkConnectExecutionManager() 
extends Logging {
   /**
    * Create a new ExecuteHolder and register it with this global manager and 
with its session.
    */
-  private[connect] def createExecuteHolder(request: proto.ExecutePlanRequest): 
ExecuteHolder = {
-    val previousSessionId = request.hasClientObservedServerSideSessionId match 
{
-      case true => Some(request.getClientObservedServerSideSessionId)
-      case false => None
-    }
-    val sessionHolder = SparkConnectService
-      .getOrCreateIsolatedSession(
-        request.getUserContext.getUserId,
-        request.getSessionId,
-        previousSessionId)
-    val executeKey = ExecuteKey(request, sessionHolder)
+  private[connect] def createExecuteHolder(
+      executeKey: ExecuteKey,
+      request: proto.ExecutePlanRequest,
+      sessionHolder: SessionHolder): ExecuteHolder = {
     val executeHolder = executions.compute(
       executeKey,
       (executeKey, oldExecuteHolder) => {
@@ -123,6 +118,20 @@ private[connect] class SparkConnectExecutionManager() 
extends Logging {
     executeHolder
   }
 
+  /**
+   * Create a new ExecuteHolder and register it with this global manager and 
with its session.
+   */
+  private[connect] def createExecuteHolder(v: proto.ExecutePlanRequest): 
ExecuteHolder = {
+    val previousSessionId = v.hasClientObservedServerSideSessionId match {
+      case true => Some(v.getClientObservedServerSideSessionId)
+      case false => None
+    }
+    val sessionHolder = SparkConnectService
+      .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId, 
previousSessionId)
+    val executeKey = ExecuteKey(v, sessionHolder)
+    createExecuteHolder(executeKey, v, sessionHolder)
+  }
+
   /**
    * Remove an ExecuteHolder from this global manager and from its session. 
Interrupt the
    * execution if still running, free all resources.
@@ -159,6 +168,67 @@ private[connect] class SparkConnectExecutionManager() 
extends Logging {
     Option(executions.get(key))
   }
 
+  /**
+   * Create a new ExecuteHolder, register it with this global manager and with 
its session, and
+   * attach the given response observer to it.
+   */
+  private[connect] def createExecuteHolderAndAttach(
+      executeKey: ExecuteKey,
+      request: proto.ExecutePlanRequest,
+      sessionHolder: SessionHolder,
+      responseObserver: StreamObserver[proto.ExecutePlanResponse]): 
ExecuteHolder = {
+    val executeHolder = createExecuteHolder(executeKey, request, sessionHolder)
+    try {
+      executeHolder.eventsManager.postStarted()
+      executeHolder.start()
+    } catch {
+      // Errors raised before the execution holder has finished spawning a 
thread are considered
+      // plan execution failure, and the client should not try reattaching it 
afterwards.
+      case t: Throwable =>
+        removeExecuteHolder(executeHolder.key)
+        throw t
+    }
+
+    try {
+      val responseSender =
+        new 
ExecuteGrpcResponseSender[proto.ExecutePlanResponse](executeHolder, 
responseObserver)
+      executeHolder.runGrpcResponseSender(responseSender)
+    } finally {
+      executeHolder.afterInitialRPC()
+    }
+    executeHolder
+  }
+
+  /**
+   * Reattach the given response observer to the given ExecuteHolder.
+   */
+  private[connect] def reattachExecuteHolder(
+      executeHolder: ExecuteHolder,
+      responseObserver: StreamObserver[proto.ExecutePlanResponse],
+      lastConsumedResponseId: Option[String]): Unit = {
+    if (!executeHolder.reattachable) {
+      logWarning(log"Reattach to not reattachable operation.")
+      throw new SparkSQLException(
+        errorClass = "INVALID_CURSOR.NOT_REATTACHABLE",
+        messageParameters = Map.empty)
+    } else if (executeHolder.isOrphan()) {
+      logWarning(log"Reattach to an orphan operation.")
+      removeExecuteHolder(executeHolder.key)
+      throw new IllegalStateException("Operation was orphaned because of an 
internal error.")
+    }
+
+    val responseSender =
+      new ExecuteGrpcResponseSender[proto.ExecutePlanResponse](executeHolder, 
responseObserver)
+    lastConsumedResponseId match {
+      case Some(lastResponseId) =>
+        // start from response after lastResponseId
+        executeHolder.runGrpcResponseSender(responseSender, lastResponseId)
+      case None =>
+        // start from the start of the stream.
+        executeHolder.runGrpcResponseSender(responseSender)
+    }
+  }
+
   private[connect] def removeAllExecutionsForSession(key: SessionKey): Unit = {
     executions.forEach((_, executeHolder) => {
       if (executeHolder.sessionHolder.key == key) {
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
index 824fa54120b2..653093411045 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
@@ -22,8 +22,6 @@ import io.grpc.stub.StreamObserver
 import org.apache.spark.SparkSQLException
 import org.apache.spark.connect.proto
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender
-import org.apache.spark.sql.connect.service.ExecuteKey
 
 class SparkConnectReattachExecuteHandler(
     responseObserver: StreamObserver[proto.ExecutePlanResponse])
@@ -57,25 +55,13 @@ class SparkConnectReattachExecuteHandler(
             messageParameters = Map("handle" -> v.getOperationId))
         }
       }
-    if (!executeHolder.reattachable) {
-      logWarning(s"Reattach to not reattachable operation.")
-      throw new SparkSQLException(
-        errorClass = "INVALID_CURSOR.NOT_REATTACHABLE",
-        messageParameters = Map.empty)
-    } else if (executeHolder.isOrphan()) {
-      logWarning("Reattach to an orphan operation.")
-      
SparkConnectService.executionManager.removeExecuteHolder(executeHolder.key)
-      throw new IllegalStateException("Operation was orphaned because of an 
internal error.")
-    }
 
-    val responseSender =
-      new ExecuteGrpcResponseSender[proto.ExecutePlanResponse](executeHolder, 
responseObserver)
-    if (v.hasLastResponseId) {
-      // start from response after lastResponseId
-      executeHolder.runGrpcResponseSender(responseSender, v.getLastResponseId)
-    } else {
-      // start from the start of the stream.
-      executeHolder.runGrpcResponseSender(responseSender)
-    }
+    SparkConnectService.executionManager.reattachExecuteHolder(
+      executeHolder,
+      responseObserver,
+      v.hasLastResponseId match {
+        case true => Some(v.getLastResponseId)
+        case false => None
+      })
   }
 }
diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
index 7cab12871300..d6c6d9fa04fe 100644
--- 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
@@ -452,4 +452,55 @@ class ReattachableExecuteSuite extends 
SparkConnectServerTest {
       assert(re.getMessage.contains("INVALID_HANDLE.OPERATION_NOT_FOUND"))
     }
   }
+
+  test("ExecutePlan RPC is idempotent: second ExecutePlan with same 
operationId reattaches") {
+    withRawBlockingStub { stub =>
+      val operationId = UUID.randomUUID().toString
+      val iter = stub.executePlan(
+        buildExecutePlanRequest(buildPlan(MEDIUM_RESULTS_QUERY), operationId = 
operationId))
+      // open the iterator, guarantees that the RPC reached the server,
+      // and get the responseId of the first response
+      val firstResponseId = iter.next().getResponseId
+
+      // send execute plan again, it will attach to the same execution,
+      // instead of throwing INVALID_HANDLE.OPERATION_ALREADY_EXISTS error
+      val iter2 = stub.executePlan(
+        buildExecutePlanRequest(buildPlan(MEDIUM_RESULTS_QUERY), operationId = 
operationId))
+      // the first response should be the same as the one we got before, 
because
+      // lastConsumedResponseId is unset and the server will start from the 
start of the stream.
+      assert(iter2.next().getResponseId == firstResponseId)
+
+      // should result in INVALID_CURSOR.DISCONNECTED error on the original 
iterator
+      val e = intercept[StatusRuntimeException] {
+        while (iter.hasNext) iter.next()
+      }
+      assert(e.getMessage.contains("INVALID_CURSOR.DISCONNECTED"))
+
+      // the second iterator should be able to continue
+      while (iter2.hasNext) {
+        iter2.next()
+      }
+    }
+  }
+
+  test("The second ExecutePlan with same operationId but a different plan will 
fail") {
+    withRawBlockingStub { stub =>
+      val operationId = UUID.randomUUID().toString
+      val iter = stub.executePlan(
+        buildExecutePlanRequest(buildPlan(MEDIUM_RESULTS_QUERY), operationId = 
operationId))
+      // open the iterator, guarantees that the RPC reached the server,
+      // and get the responseId of the first response
+      iter.next()
+
+      // send execute plan has the same operation id but different plan,
+      // it will fail with INVALID_HANDLE.OPERATION_ALREADY_EXISTS error
+      val SMALL_RESULTS_QUERY = "select * from range(1000)"
+      val iter2 = stub.executePlan(
+        buildExecutePlanRequest(buildPlan(SMALL_RESULTS_QUERY), operationId = 
operationId))
+      val e = intercept[StatusRuntimeException] {
+        iter2.next()
+      }
+      assert(e.getMessage.contains("INVALID_HANDLE.OPERATION_ALREADY_EXISTS"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to