This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new d077a7a85fd7 [SPARK-52397][CONNECT] Idempotent ExecutePlan: the second ExecutePlan with same operationId and plan reattaches d077a7a85fd7 is described below commit d077a7a85fd721b109122a6c1375b718eeb8a306 Author: Xi Lyu <xi....@databricks.com> AuthorDate: Thu Jun 5 10:04:58 2025 -0400 [SPARK-52397][CONNECT] Idempotent ExecutePlan: the second ExecutePlan with same operationId and plan reattaches ### What changes were proposed in this pull request? In Spark Connect, queries can fail with the error INVALID_HANDLE.OPERATION_ALREADY_EXISTS, when a client retries an ExecutePlan RPC—often due to transient network issues—causing the server to receive the same request multiple times. Since each ExecutePlan request includes an operation_id, the server interprets the duplicate as an attempt to create an already existing operation, which results in the OPERATION_ALREADY_EXISTS exception. This behavior interrupts query execution and breaks [...] To resolve this, the PR introduces idempotent handling of ExecutePlan on the server side. When a request with a previously seen operation_id and the same plan is received, instead of returning an error, the server now reattaches the response stream to the already running execution associated with that operation. This ensures that retries due to network flakiness no longer result in failed queries, thereby improving the resilience and robustness of query executions. ### Why are the changes needed? It will improve the stability of Spark Connect in case of transient network issues. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Yes, new tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51084 from xi-db/fix_operation_already_exists. Authored-by: Xi Lyu <xi....@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> (cherry picked from commit 273a5e79ec73db2e1460bdf77ca77ebbb174e083) Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../service/SparkConnectExecutePlanHandler.scala | 37 +++++---- .../service/SparkConnectExecutionManager.scala | 92 +++++++++++++++++++--- .../SparkConnectReattachExecuteHandler.scala | 28 ++----- .../execution/ReattachableExecuteSuite.scala | 51 ++++++++++++ 4 files changed, 159 insertions(+), 49 deletions(-) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala index 73a20e448be8..027f4517cf3b 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala @@ -19,32 +19,35 @@ package org.apache.spark.sql.connect.service import io.grpc.stub.StreamObserver +import org.apache.spark.SparkSQLException import org.apache.spark.connect.proto import org.apache.spark.internal.Logging -import org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender class SparkConnectExecutePlanHandler(responseObserver: StreamObserver[proto.ExecutePlanResponse]) extends Logging { def handle(v: proto.ExecutePlanRequest): Unit = { - val executeHolder = SparkConnectService.executionManager.createExecuteHolder(v) - try { - executeHolder.eventsManager.postStarted() - executeHolder.start() - } catch { - // Errors raised before the execution holder has finished spawning a thread are considered - // plan execution failure, and the client should not try reattaching it afterwards. - case t: Throwable => - SparkConnectService.executionManager.removeExecuteHolder(executeHolder.key) - throw t + val previousSessionId = v.hasClientObservedServerSideSessionId match { + case true => Some(v.getClientObservedServerSideSessionId) + case false => None } + val sessionHolder = SparkConnectService + .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId, previousSessionId) + val executeKey = ExecuteKey(v, sessionHolder) - try { - val responseSender = - new ExecuteGrpcResponseSender[proto.ExecutePlanResponse](executeHolder, responseObserver) - executeHolder.runGrpcResponseSender(responseSender) - } finally { - executeHolder.afterInitialRPC() + SparkConnectService.executionManager.getExecuteHolder(executeKey) match { + case None => + // Create a new execute holder and attach to it. + SparkConnectService.executionManager + .createExecuteHolderAndAttach(executeKey, v, sessionHolder, responseObserver) + case Some(executeHolder) if executeHolder.request.getPlan.equals(v.getPlan) => + // If the execute holder already exists with the same plan, reattach to it. + SparkConnectService.executionManager + .reattachExecuteHolder(executeHolder, responseObserver, None) + case Some(_) => + throw new SparkSQLException( + errorClass = "INVALID_HANDLE.OPERATION_ALREADY_EXISTS", + messageParameters = Map("handle" -> executeKey.operationId)) } } } diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala index c158de48574f..32080912ab70 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala @@ -26,12 +26,14 @@ import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal import com.google.common.cache.CacheBuilder +import io.grpc.stub.StreamObserver import org.apache.spark.{SparkEnv, SparkSQLException} import org.apache.spark.connect.proto import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS import org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE, CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT, CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL} +import org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender import org.apache.spark.util.ThreadUtils // Unique key identifying execution by combination of user, session and operation id @@ -83,17 +85,10 @@ private[connect] class SparkConnectExecutionManager() extends Logging { /** * Create a new ExecuteHolder and register it with this global manager and with its session. */ - private[connect] def createExecuteHolder(request: proto.ExecutePlanRequest): ExecuteHolder = { - val previousSessionId = request.hasClientObservedServerSideSessionId match { - case true => Some(request.getClientObservedServerSideSessionId) - case false => None - } - val sessionHolder = SparkConnectService - .getOrCreateIsolatedSession( - request.getUserContext.getUserId, - request.getSessionId, - previousSessionId) - val executeKey = ExecuteKey(request, sessionHolder) + private[connect] def createExecuteHolder( + executeKey: ExecuteKey, + request: proto.ExecutePlanRequest, + sessionHolder: SessionHolder): ExecuteHolder = { val executeHolder = executions.compute( executeKey, (executeKey, oldExecuteHolder) => { @@ -123,6 +118,20 @@ private[connect] class SparkConnectExecutionManager() extends Logging { executeHolder } + /** + * Create a new ExecuteHolder and register it with this global manager and with its session. + */ + private[connect] def createExecuteHolder(v: proto.ExecutePlanRequest): ExecuteHolder = { + val previousSessionId = v.hasClientObservedServerSideSessionId match { + case true => Some(v.getClientObservedServerSideSessionId) + case false => None + } + val sessionHolder = SparkConnectService + .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId, previousSessionId) + val executeKey = ExecuteKey(v, sessionHolder) + createExecuteHolder(executeKey, v, sessionHolder) + } + /** * Remove an ExecuteHolder from this global manager and from its session. Interrupt the * execution if still running, free all resources. @@ -159,6 +168,67 @@ private[connect] class SparkConnectExecutionManager() extends Logging { Option(executions.get(key)) } + /** + * Create a new ExecuteHolder, register it with this global manager and with its session, and + * attach the given response observer to it. + */ + private[connect] def createExecuteHolderAndAttach( + executeKey: ExecuteKey, + request: proto.ExecutePlanRequest, + sessionHolder: SessionHolder, + responseObserver: StreamObserver[proto.ExecutePlanResponse]): ExecuteHolder = { + val executeHolder = createExecuteHolder(executeKey, request, sessionHolder) + try { + executeHolder.eventsManager.postStarted() + executeHolder.start() + } catch { + // Errors raised before the execution holder has finished spawning a thread are considered + // plan execution failure, and the client should not try reattaching it afterwards. + case t: Throwable => + removeExecuteHolder(executeHolder.key) + throw t + } + + try { + val responseSender = + new ExecuteGrpcResponseSender[proto.ExecutePlanResponse](executeHolder, responseObserver) + executeHolder.runGrpcResponseSender(responseSender) + } finally { + executeHolder.afterInitialRPC() + } + executeHolder + } + + /** + * Reattach the given response observer to the given ExecuteHolder. + */ + private[connect] def reattachExecuteHolder( + executeHolder: ExecuteHolder, + responseObserver: StreamObserver[proto.ExecutePlanResponse], + lastConsumedResponseId: Option[String]): Unit = { + if (!executeHolder.reattachable) { + logWarning(log"Reattach to not reattachable operation.") + throw new SparkSQLException( + errorClass = "INVALID_CURSOR.NOT_REATTACHABLE", + messageParameters = Map.empty) + } else if (executeHolder.isOrphan()) { + logWarning(log"Reattach to an orphan operation.") + removeExecuteHolder(executeHolder.key) + throw new IllegalStateException("Operation was orphaned because of an internal error.") + } + + val responseSender = + new ExecuteGrpcResponseSender[proto.ExecutePlanResponse](executeHolder, responseObserver) + lastConsumedResponseId match { + case Some(lastResponseId) => + // start from response after lastResponseId + executeHolder.runGrpcResponseSender(responseSender, lastResponseId) + case None => + // start from the start of the stream. + executeHolder.runGrpcResponseSender(responseSender) + } + } + private[connect] def removeAllExecutionsForSession(key: SessionKey): Unit = { executions.forEach((_, executeHolder) => { if (executeHolder.sessionHolder.key == key) { diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala index 824fa54120b2..653093411045 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala @@ -22,8 +22,6 @@ import io.grpc.stub.StreamObserver import org.apache.spark.SparkSQLException import org.apache.spark.connect.proto import org.apache.spark.internal.Logging -import org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender -import org.apache.spark.sql.connect.service.ExecuteKey class SparkConnectReattachExecuteHandler( responseObserver: StreamObserver[proto.ExecutePlanResponse]) @@ -57,25 +55,13 @@ class SparkConnectReattachExecuteHandler( messageParameters = Map("handle" -> v.getOperationId)) } } - if (!executeHolder.reattachable) { - logWarning(s"Reattach to not reattachable operation.") - throw new SparkSQLException( - errorClass = "INVALID_CURSOR.NOT_REATTACHABLE", - messageParameters = Map.empty) - } else if (executeHolder.isOrphan()) { - logWarning("Reattach to an orphan operation.") - SparkConnectService.executionManager.removeExecuteHolder(executeHolder.key) - throw new IllegalStateException("Operation was orphaned because of an internal error.") - } - val responseSender = - new ExecuteGrpcResponseSender[proto.ExecutePlanResponse](executeHolder, responseObserver) - if (v.hasLastResponseId) { - // start from response after lastResponseId - executeHolder.runGrpcResponseSender(responseSender, v.getLastResponseId) - } else { - // start from the start of the stream. - executeHolder.runGrpcResponseSender(responseSender) - } + SparkConnectService.executionManager.reattachExecuteHolder( + executeHolder, + responseObserver, + v.hasLastResponseId match { + case true => Some(v.getLastResponseId) + case false => None + }) } } diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala index 7cab12871300..d6c6d9fa04fe 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala @@ -452,4 +452,55 @@ class ReattachableExecuteSuite extends SparkConnectServerTest { assert(re.getMessage.contains("INVALID_HANDLE.OPERATION_NOT_FOUND")) } } + + test("ExecutePlan RPC is idempotent: second ExecutePlan with same operationId reattaches") { + withRawBlockingStub { stub => + val operationId = UUID.randomUUID().toString + val iter = stub.executePlan( + buildExecutePlanRequest(buildPlan(MEDIUM_RESULTS_QUERY), operationId = operationId)) + // open the iterator, guarantees that the RPC reached the server, + // and get the responseId of the first response + val firstResponseId = iter.next().getResponseId + + // send execute plan again, it will attach to the same execution, + // instead of throwing INVALID_HANDLE.OPERATION_ALREADY_EXISTS error + val iter2 = stub.executePlan( + buildExecutePlanRequest(buildPlan(MEDIUM_RESULTS_QUERY), operationId = operationId)) + // the first response should be the same as the one we got before, because + // lastConsumedResponseId is unset and the server will start from the start of the stream. + assert(iter2.next().getResponseId == firstResponseId) + + // should result in INVALID_CURSOR.DISCONNECTED error on the original iterator + val e = intercept[StatusRuntimeException] { + while (iter.hasNext) iter.next() + } + assert(e.getMessage.contains("INVALID_CURSOR.DISCONNECTED")) + + // the second iterator should be able to continue + while (iter2.hasNext) { + iter2.next() + } + } + } + + test("The second ExecutePlan with same operationId but a different plan will fail") { + withRawBlockingStub { stub => + val operationId = UUID.randomUUID().toString + val iter = stub.executePlan( + buildExecutePlanRequest(buildPlan(MEDIUM_RESULTS_QUERY), operationId = operationId)) + // open the iterator, guarantees that the RPC reached the server, + // and get the responseId of the first response + iter.next() + + // send execute plan has the same operation id but different plan, + // it will fail with INVALID_HANDLE.OPERATION_ALREADY_EXISTS error + val SMALL_RESULTS_QUERY = "select * from range(1000)" + val iter2 = stub.executePlan( + buildExecutePlanRequest(buildPlan(SMALL_RESULTS_QUERY), operationId = operationId)) + val e = intercept[StatusRuntimeException] { + iter2.next() + } + assert(e.getMessage.contains("INVALID_HANDLE.OPERATION_ALREADY_EXISTS")) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org