This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 51e8634a5883 [SPARK-47380][CONNECT] Ensure on the server side that the SparkSession is the same 51e8634a5883 is described below commit 51e8634a5883f1816bb82c19b6e91c3516eee6c4 Author: Nemanja Boric <nemanja.bo...@databricks.com> AuthorDate: Mon Mar 18 15:44:29 2024 -0400 [SPARK-47380][CONNECT] Ensure on the server side that the SparkSession is the same ### What changes were proposed in this pull request? In this PR we change the client behaviour to send the previously observed server session id so that the server can validate that the client used to talk with this specific session. Previously this was only validated on the client side which made the server actually execute the request for the wrong session before throwing on the client side (once the response from the server was obtained). ### Why are the changes needed? The server can execute the client command on the wrong spark session before client figuring out it's the different session. ### Does this PR introduce _any_ user-facing change? The error message now pops up differently (it used to be a slightly different message when validated on the client). ### How was this patch tested? Existing unit tests, add new unit test, e2e test added, manual testing ### Was this patch authored or co-authored using generative AI tooling? No Closes #45499 from nemanja-boric-databricks/workspace. Authored-by: Nemanja Boric <nemanja.bo...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../src/main/resources/error/error-classes.json | 5 + .../src/main/protobuf/spark/connect/base.proto | 54 ++++ .../sql/connect/client/ResponseValidator.scala | 16 +- .../sql/connect/client/SparkConnectClient.scala | 45 ++- .../service/SparkConnectAddArtifactsHandler.scala | 7 +- .../service/SparkConnectAnalyzeHandler.scala | 7 +- .../SparkConnectArtifactStatusesHandler.scala | 18 +- .../service/SparkConnectConfigHandler.scala | 9 +- .../service/SparkConnectExecutionManager.scala | 9 +- .../SparkConnectFetchErrorDetailsHandler.scala | 6 +- .../service/SparkConnectInterruptHandler.scala | 6 +- .../SparkConnectReattachExecuteHandler.scala | 8 +- .../SparkConnectReleaseExecuteHandler.scala | 4 +- .../SparkConnectReleaseSessionHandler.scala | 2 + .../sql/connect/service/SparkConnectService.scala | 9 +- .../service/SparkConnectSessionManager.scala | 29 +- .../execution/ReattachableExecuteSuite.scala | 8 +- .../connect/planner/SparkConnectServiceSuite.scala | 8 +- .../service/ArtifactStatusesHandlerSuite.scala | 1 + .../service/FetchErrorDetailsHandlerSuite.scala | 12 +- .../service/SparkConnectServiceE2ESuite.scala | 20 ++ .../service/SparkConnectSessionManagerSuite.scala | 38 ++- ...-error-conditions-invalid-handle-error-class.md | 4 + python/pyspark/sql/connect/client/core.py | 12 + python/pyspark/sql/connect/proto/base_pb2.py | 328 ++++++++++----------- python/pyspark/sql/connect/proto/base_pb2.pyi | 210 +++++++++++++ 26 files changed, 655 insertions(+), 220 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 9362b8342abf..b5a0089fb2c8 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -2083,6 +2083,11 @@ "Operation not found." ] }, + "SESSION_CHANGED" : { + "message" : [ + "The existing Spark server driver instance has restarted. Please reconnect." + ] + }, "SESSION_CLOSED" : { "message" : [ "Session was closed." diff --git a/connector/connect/common/src/main/protobuf/spark/connect/base.proto b/connector/connect/common/src/main/protobuf/spark/connect/base.proto index cb9dbe62c193..bcc7edc55550 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto @@ -66,6 +66,12 @@ message AnalyzePlanRequest { // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; + // (Optional) + // + // Server-side generated idempotency key from the previous responses (if any). Server + // can use this to validate that the server side session has not changed. + optional string client_observed_server_side_session_id = 17; + // (Required) User context UserContext user_context = 2; @@ -281,6 +287,12 @@ message ExecutePlanRequest { // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; + // (Optional) + // + // Server-side generated idempotency key from the previous responses (if any). Server + // can use this to validate that the server side session has not changed. + optional string client_observed_server_side_session_id = 8; + // (Required) User context // // user_context.user_id and session+id both identify a unique remote spark session on the @@ -443,6 +455,12 @@ message ConfigRequest { // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; + // (Optional) + // + // Server-side generated idempotency key from the previous responses (if any). Server + // can use this to validate that the server side session has not changed. + optional string client_observed_server_side_session_id = 8; + // (Required) User context UserContext user_context = 2; @@ -536,6 +554,12 @@ message AddArtifactsRequest { // User context UserContext user_context = 2; + // (Optional) + // + // Server-side generated idempotency key from the previous responses (if any). Server + // can use this to validate that the server side session has not changed. + optional string client_observed_server_side_session_id = 7; + // Provides optional information about the client sending the request. This field // can be used for language or version specific information and is only intended for // logging purposes and will not be interpreted by the server. @@ -630,6 +654,12 @@ message ArtifactStatusesRequest { // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; + // (Optional) + // + // Server-side generated idempotency key from the previous responses (if any). Server + // can use this to validate that the server side session has not changed. + optional string client_observed_server_side_session_id = 5; + // User context UserContext user_context = 2; @@ -673,6 +703,12 @@ message InterruptRequest { // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; + // (Optional) + // + // Server-side generated idempotency key from the previous responses (if any). Server + // can use this to validate that the server side session has not changed. + optional string client_observed_server_side_session_id = 7; + // (Required) User context UserContext user_context = 2; @@ -738,6 +774,12 @@ message ReattachExecuteRequest { // This must be an id of existing session. string session_id = 1; + // (Optional) + // + // Server-side generated idempotency key from the previous responses (if any). Server + // can use this to validate that the server side session has not changed. + optional string client_observed_server_side_session_id = 6; + // (Required) User context // // user_context.user_id and session+id both identify a unique remote spark session on the @@ -772,6 +814,12 @@ message ReleaseExecuteRequest { // This must be an id of existing session. string session_id = 1; + // (Optional) + // + // Server-side generated idempotency key from the previous responses (if any). Server + // can use this to validate that the server side session has not changed. + optional string client_observed_server_side_session_id = 7; + // (Required) User context // // user_context.user_id and session+id both identify a unique remote spark session on the @@ -856,6 +904,12 @@ message FetchErrorDetailsRequest { // The id should be a UUID string of the format `00112233-4455-6677-8899-aabbccddeeff`. string session_id = 1; + // (Optional) + // + // Server-side generated idempotency key from the previous responses (if any). Server + // can use this to validate that the server side session has not changed. + optional string client_observed_server_side_session_id = 5; + // User context UserContext user_context = 2; diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ResponseValidator.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ResponseValidator.scala index 22c5505e7d45..29272c96132b 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ResponseValidator.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ResponseValidator.scala @@ -21,8 +21,8 @@ import io.grpc.stub.StreamObserver import org.apache.spark.internal.Logging -// This is common logic to be shared between different stub instances to validate responses as -// seen by the client. +// This is common logic to be shared between different stub instances to keep the server-side +// session id and to validate responses as seen by the client. class ResponseValidator extends Logging { // Server side session ID, used to detect if the server side session changed. This is set upon @@ -30,6 +30,18 @@ class ResponseValidator extends Logging { // do not use server-side streaming. private var serverSideSessionId: Option[String] = None + // Returns the server side session ID, used to send it back to the server in the follow-up + // requests so the server can validate it session id against the previous requests. + def getServerSideSessionId: Option[String] = serverSideSessionId + + /** + * Hijacks the stored server side session ID with the given suffix. Used for testing to make + * sure that server is validating the session ID. + */ + private[sql] def hijackServerSideSessionIdForTesting(suffix: String): Unit = { + serverSideSessionId = Some(serverSideSessionId.getOrElse("") + suffix) + } + def verifyResponse[RespT <: GeneratedMessageV3](fn: => RespT): RespT = { val response = fn val field = response.getDescriptorForType.findFieldByName("server_side_session_id") diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala index cd1dfbd2e734..746aaca6f559 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala @@ -63,6 +63,14 @@ private[sql] class SparkConnectClient( // a new client will create a new session ID. private[sql] val sessionId: String = configuration.sessionId.getOrElse(UUID.randomUUID.toString) + /** + * Hijacks the stored server side session ID with the given suffix. Used for testing to make + * sure that server is validating the session ID. + */ + private[sql] def hijackServerSideSessionIdForTesting(suffix: String) = { + stubState.responseValidator.hijackServerSideSessionIdForTesting(suffix) + } + private[sql] val artifactManager: ArtifactManager = { new ArtifactManager(configuration, sessionId, bstub, stub) } @@ -73,6 +81,14 @@ private[sql] class SparkConnectClient( private[sql] def uploadAllClassFileArtifacts(): Unit = artifactManager.uploadAllClassFileArtifacts() + /** + * Returns the server-side session id obtained from the first request, if there was a request + * already. + */ + private def serverSideSessionId: Option[String] = { + stubState.responseValidator.getServerSideSessionId + } + /** * Dispatch the [[proto.AnalyzePlanRequest]] to the Spark Connect server. * @return @@ -99,11 +115,11 @@ private[sql] class SparkConnectClient( .setSessionId(sessionId) .setClientType(userAgent) .addAllTags(tags.get.toSeq.asJava) - .build() + serverSideSessionId.foreach(session => request.setClientObservedServerSideSessionId(session)) if (configuration.useReattachableExecute) { - bstub.executePlanReattachable(request) + bstub.executePlanReattachable(request.build()) } else { - bstub.executePlan(request) + bstub.executePlan(request.build()) } } @@ -119,8 +135,8 @@ private[sql] class SparkConnectClient( .setSessionId(sessionId) .setClientType(userAgent) .setUserContext(userContext) - .build() - bstub.config(request) + serverSideSessionId.foreach(session => request.setClientObservedServerSideSessionId(session)) + bstub.config(request.build()) } /** @@ -207,8 +223,8 @@ private[sql] class SparkConnectClient( .setUserContext(userContext) .setSessionId(sessionId) .setClientType(userAgent) - .build() - analyze(request) + serverSideSessionId.foreach(session => request.setClientObservedServerSideSessionId(session)) + analyze(request.build()) } private[sql] def interruptAll(): proto.InterruptResponse = { @@ -218,8 +234,8 @@ private[sql] class SparkConnectClient( .setSessionId(sessionId) .setClientType(userAgent) .setInterruptType(proto.InterruptRequest.InterruptType.INTERRUPT_TYPE_ALL) - .build() - bstub.interrupt(request) + serverSideSessionId.foreach(session => request.setClientObservedServerSideSessionId(session)) + bstub.interrupt(request.build()) } private[sql] def interruptTag(tag: String): proto.InterruptResponse = { @@ -230,8 +246,8 @@ private[sql] class SparkConnectClient( .setClientType(userAgent) .setInterruptType(proto.InterruptRequest.InterruptType.INTERRUPT_TYPE_TAG) .setOperationTag(tag) - .build() - bstub.interrupt(request) + serverSideSessionId.foreach(session => request.setClientObservedServerSideSessionId(session)) + bstub.interrupt(request.build()) } private[sql] def interruptOperation(id: String): proto.InterruptResponse = { @@ -242,8 +258,8 @@ private[sql] class SparkConnectClient( .setClientType(userAgent) .setInterruptType(proto.InterruptRequest.InterruptType.INTERRUPT_TYPE_OPERATION_ID) .setOperationId(id) - .build() - bstub.interrupt(request) + serverSideSessionId.foreach(session => request.setClientObservedServerSideSessionId(session)) + bstub.interrupt(request.build()) } private[sql] def releaseSession(): proto.ReleaseSessionResponse = { @@ -252,8 +268,7 @@ private[sql] class SparkConnectClient( .setUserContext(userContext) .setSessionId(sessionId) .setClientType(userAgent) - .build() - bstub.releaseSession(request) + bstub.releaseSession(request.build()) } private[this] val tags = new InheritableThreadLocal[mutable.Set[String]] { diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala index ea3b578be3b0..b0d9337c6448 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala @@ -53,9 +53,14 @@ class SparkConnectAddArtifactsHandler(val responseObserver: StreamObserver[AddAr override def onNext(req: AddArtifactsRequest): Unit = try { if (this.holder == null) { + val previousSessionId = req.hasClientObservedServerSideSessionId match { + case true => Some(req.getClientObservedServerSideSessionId) + case false => None + } this.holder = SparkConnectService.getOrCreateIsolatedSession( req.getUserContext.getUserId, - req.getSessionId) + req.getSessionId, + previousSessionId) } if (req.hasBeginChunk) { diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala index 7a701aea1b78..3dfd29d6a8c6 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala @@ -34,9 +34,14 @@ private[connect] class SparkConnectAnalyzeHandler( extends Logging { def handle(request: proto.AnalyzePlanRequest): Unit = { + val previousSessionId = request.hasClientObservedServerSideSessionId match { + case true => Some(request.getClientObservedServerSideSessionId) + case false => None + } val sessionHolder = SparkConnectService.getOrCreateIsolatedSession( request.getUserContext.getUserId, - request.getSessionId) + request.getSessionId, + previousSessionId) // `withSession` ensures that session-specific artifacts (such as JARs and class files) are // available during processing (such as deserialization). sessionHolder.withSession { _ => diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectArtifactStatusesHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectArtifactStatusesHandler.scala index 78def077f2dd..38bb29efc536 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectArtifactStatusesHandler.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectArtifactStatusesHandler.scala @@ -28,17 +28,28 @@ class SparkConnectArtifactStatusesHandler( val responseObserver: StreamObserver[proto.ArtifactStatusesResponse]) extends Logging { - protected def cacheExists(userId: String, sessionId: String, hash: String): Boolean = { + protected def cacheExists( + userId: String, + sessionId: String, + previouslySeenSessionId: Option[String], + hash: String): Boolean = { val session = SparkConnectService - .getOrCreateIsolatedSession(userId, sessionId) + .getOrCreateIsolatedSession(userId, sessionId, previouslySeenSessionId) .session val blockManager = session.sparkContext.env.blockManager blockManager.getStatus(CacheId(session.sessionUUID, hash)).isDefined } def handle(request: proto.ArtifactStatusesRequest): Unit = { + val previousSessionId = request.hasClientObservedServerSideSessionId match { + case true => Some(request.getClientObservedServerSideSessionId) + case false => None + } val holder = SparkConnectService - .getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getSessionId) + .getOrCreateIsolatedSession( + request.getUserContext.getUserId, + request.getSessionId, + previousSessionId) val builder = proto.ArtifactStatusesResponse.newBuilder() builder.setSessionId(holder.sessionId) @@ -49,6 +60,7 @@ class SparkConnectArtifactStatusesHandler( cacheExists( userId = request.getUserContext.getUserId, sessionId = request.getSessionId, + previouslySeenSessionId = previousSessionId, hash = name.stripPrefix("cache/")) } else false builder.putStatuses(name, status.setExists(exists).build()) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala index 8a7ce6d7b48f..6d663867e958 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala @@ -30,9 +30,16 @@ class SparkConnectConfigHandler(responseObserver: StreamObserver[proto.ConfigRes extends Logging { def handle(request: proto.ConfigRequest): Unit = { + val previousSessionId = request.hasClientObservedServerSideSessionId match { + case true => Some(request.getClientObservedServerSideSessionId) + case false => None + } val holder = SparkConnectService - .getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getSessionId) + .getOrCreateIsolatedSession( + request.getUserContext.getUserId, + request.getSessionId, + previousSessionId) val session = holder.session val builder = request.getOperation.getOpTypeCase match { diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala index 58e235f15085..e52cfe64a090 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala @@ -67,8 +67,15 @@ private[connect] class SparkConnectExecutionManager() extends Logging { * Create a new ExecuteHolder and register it with this global manager and with its session. */ private[connect] def createExecuteHolder(request: proto.ExecutePlanRequest): ExecuteHolder = { + val previousSessionId = request.hasClientObservedServerSideSessionId match { + case true => Some(request.getClientObservedServerSideSessionId) + case false => None + } val sessionHolder = SparkConnectService - .getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getSessionId) + .getOrCreateIsolatedSession( + request.getUserContext.getUserId, + request.getSessionId, + previousSessionId) val executeHolder = new ExecuteHolder(request, sessionHolder) executionsLock.synchronized { // Check if the operation already exists, both in active executions, and in the graveyard diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectFetchErrorDetailsHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectFetchErrorDetailsHandler.scala index b5a3c986d169..e461877d8454 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectFetchErrorDetailsHandler.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectFetchErrorDetailsHandler.scala @@ -32,9 +32,13 @@ class SparkConnectFetchErrorDetailsHandler( responseObserver: StreamObserver[proto.FetchErrorDetailsResponse]) { def handle(v: proto.FetchErrorDetailsRequest): Unit = { + val previousSessionId = v.hasClientObservedServerSideSessionId match { + case true => Some(v.getClientObservedServerSideSessionId) + case false => None + } val sessionHolder = SparkConnectService - .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId) + .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId, previousSessionId) val response = Option(sessionHolder.errorIdToError.getIfPresent(v.getErrorId)) .map { error => diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterruptHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterruptHandler.scala index 9e1ab16208f2..ae38e55d3c67 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterruptHandler.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterruptHandler.scala @@ -28,9 +28,13 @@ class SparkConnectInterruptHandler(responseObserver: StreamObserver[proto.Interr extends Logging { def handle(v: proto.InterruptRequest): Unit = { + val previousSessionId = v.hasClientObservedServerSideSessionId match { + case true => Some(v.getClientObservedServerSideSessionId) + case false => None + } val sessionHolder = SparkConnectService - .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId) + .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId, previousSessionId) val interruptedIds = v.getInterruptType match { case proto.InterruptRequest.InterruptType.INTERRUPT_TYPE_ALL => diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala index ecad8c9c73a1..534937f84eae 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala @@ -29,8 +29,14 @@ class SparkConnectReattachExecuteHandler( extends Logging { def handle(v: proto.ReattachExecuteRequest): Unit = { + val previousSessionId = v.hasClientObservedServerSideSessionId match { + case true => Some(v.getClientObservedServerSideSessionId) + case false => None + } val sessionHolder = SparkConnectService.sessionManager - .getIsolatedSession(SessionKey(v.getUserContext.getUserId, v.getSessionId)) + .getIsolatedSession( + SessionKey(v.getUserContext.getUserId, v.getSessionId), + previousSessionId) val executeHolder = sessionHolder.executeHolder(v.getOperationId).getOrElse { if (SparkConnectService.executionManager diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseExecuteHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseExecuteHandler.scala index 88c1456602d3..a2dbf3b2eec9 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseExecuteHandler.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseExecuteHandler.scala @@ -28,8 +28,10 @@ class SparkConnectReleaseExecuteHandler( extends Logging { def handle(v: proto.ReleaseExecuteRequest): Unit = { + // We do not validate the spark session for ReleaseExecute on the server, + // leaving the validation only to happen on the client side. val sessionHolder = SparkConnectService.sessionManager - .getIsolatedSession(SessionKey(v.getUserContext.getUserId, v.getSessionId)) + .getIsolatedSession(SessionKey(v.getUserContext.getUserId, v.getSessionId), None) val responseBuilder = proto.ReleaseExecuteResponse .newBuilder() diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseSessionHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseSessionHandler.scala index c8a3ceab674f..ec7a7f3bd242 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseSessionHandler.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseSessionHandler.scala @@ -33,8 +33,10 @@ class SparkConnectReleaseSessionHandler( // If the session doesn't exist, this will just be a noop. val key = SessionKey(v.getUserContext.getUserId, v.getSessionId) // if the session is present, update the server-side session ID. + // Note we do not validate the previously seen server-side session id. val maybeSession = SparkConnectService.sessionManager.getIsolatedSessionIfPresent(key) maybeSession.foreach(f => responseBuilder.setServerSideSessionId(f.serverSessionId)) + SparkConnectService.sessionManager.closeSession(key) responseObserver.onNext(responseBuilder.build()) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index e96e5dfcac08..9324e8e6c5f1 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -303,8 +303,13 @@ object SparkConnectService extends Logging { /** * Based on the userId and sessionId, find or create a new SparkSession. */ - def getOrCreateIsolatedSession(userId: String, sessionId: String): SessionHolder = { - sessionManager.getOrCreateIsolatedSession(SessionKey(userId, sessionId)) + def getOrCreateIsolatedSession( + userId: String, + sessionId: String, + previoslyObservedSessionId: Option[String]): SessionHolder = { + sessionManager.getOrCreateIsolatedSession( + SessionKey(userId, sessionId), + previoslyObservedSessionId) } /** diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala index 4da728b95a33..f8febbccfa6f 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala @@ -53,11 +53,24 @@ class SparkConnectSessionManager extends Logging { /** Executor for the periodic maintenance */ private var scheduledExecutor: Option[ScheduledExecutorService] = None + private def validateSessionId( + key: SessionKey, + sessionUUID: String, + previouslyObservedSessionId: String) = { + if (sessionUUID != previouslyObservedSessionId) { + throw new SparkSQLException( + errorClass = "INVALID_HANDLE.SESSION_CHANGED", + messageParameters = Map("handle" -> key.sessionId)) + } + } + /** * Based on the userId and sessionId, find or create a new SparkSession. */ - private[connect] def getOrCreateIsolatedSession(key: SessionKey): SessionHolder = { - getSession( + private[connect] def getOrCreateIsolatedSession( + key: SessionKey, + previouslyObservedSesssionId: Option[String]): SessionHolder = { + val holder = getSession( key, Some(() => { // Executed under sessionsState lock in getSession, to guard against concurrent removal @@ -67,13 +80,18 @@ class SparkConnectSessionManager extends Logging { holder.initializeSession() holder })) + previouslyObservedSesssionId.foreach(sessionId => + validateSessionId(key, holder.session.sessionUUID, sessionId)) + holder } /** * Based on the userId and sessionId, find an existing SparkSession or throw error. */ - private[connect] def getIsolatedSession(key: SessionKey): SessionHolder = { - getSession( + private[connect] def getIsolatedSession( + key: SessionKey, + previouslyObservedSesssionId: Option[String]): SessionHolder = { + val holder = getSession( key, Some(() => { logDebug(s"Session not found: $key") @@ -87,6 +105,9 @@ class SparkConnectSessionManager extends Logging { messageParameters = Map("handle" -> key.sessionId)) } })) + previouslyObservedSesssionId.foreach(sessionId => + validateSessionId(key, holder.session.sessionUUID, sessionId)) + holder } /** diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala index 3e22dc5c3fad..25e6cc48a199 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala @@ -300,7 +300,9 @@ class ReattachableExecuteSuite extends SparkConnectServerTest { test("SPARK-46186 interrupt directly after query start") { // register a sleep udf in the session val serverSession = - SparkConnectService.getOrCreateIsolatedSession(defaultUserId, defaultSessionId).session + SparkConnectService + .getOrCreateIsolatedSession(defaultUserId, defaultSessionId, None) + .session serverSession.udf.register( "sleep", ((ms: Int) => { @@ -384,7 +386,9 @@ class ReattachableExecuteSuite extends SparkConnectServerTest { test("long sleeping query") { // register udf directly on the server, we're not testing client UDFs here... val serverSession = - SparkConnectService.getOrCreateIsolatedSession(defaultUserId, defaultSessionId).session + SparkConnectService + .getOrCreateIsolatedSession(defaultUserId, defaultSessionId, None) + .session serverSession.udf.register("sleep", ((ms: Int) => { Thread.sleep(ms); ms })) // query will be sleeping and not returning results, while having multiple reattach withSparkEnvConfs( diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala index 18886fde7e0e..dafcaa9e0225 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala @@ -553,7 +553,7 @@ class SparkConnectServiceSuite val instance = new SparkConnectService(false) // Add an always crashing UDF - val session = SparkConnectService.getOrCreateIsolatedSession("c1", sessionId).session + val session = SparkConnectService.getOrCreateIsolatedSession("c1", sessionId, None).session val sleep: Long => Long = { time => Thread.sleep(time) time @@ -624,7 +624,7 @@ class SparkConnectServiceSuite val instance = new SparkConnectService(false) // Add an always crashing UDF - val session = SparkConnectService.getOrCreateIsolatedSession("c1", sessionId).session + val session = SparkConnectService.getOrCreateIsolatedSession("c1", sessionId, None).session val instaKill: Long => Long = { _ => throw new Exception("Kaboom") } @@ -818,7 +818,7 @@ class SparkConnectServiceSuite when(restartedQuery.id).thenReturn(DEFAULT_UUID) when(restartedQuery.runId).thenReturn(DEFAULT_UUID) SparkConnectService.streamingSessionManager.registerNewStreamingQuery( - SparkConnectService.getOrCreateIsolatedSession("c1", sessionId), + SparkConnectService.getOrCreateIsolatedSession("c1", sessionId, None), restartedQuery) f(verifyEvents) } @@ -904,7 +904,7 @@ class SparkConnectServiceSuite case e: SparkListenerConnectOperationStarted => semaphoreStarted.release() val sessionHolder = - SparkConnectService.getOrCreateIsolatedSession(e.userId, e.sessionId) + SparkConnectService.getOrCreateIsolatedSession(e.userId, e.sessionId, None) executeHolder = sessionHolder.executeHolder(e.operationId) case _ => } diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ArtifactStatusesHandlerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ArtifactStatusesHandlerSuite.scala index 8fabcf61cb6f..7ce3ff46f553 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ArtifactStatusesHandlerSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ArtifactStatusesHandlerSuite.scala @@ -48,6 +48,7 @@ class ArtifactStatusesHandlerSuite extends SharedSparkSession with ResourceHelpe override protected def cacheExists( userId: String, sessionId: String, + previoslySeenSessionId: Option[String], hash: String): Boolean = { exist.contains(hash) } diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/FetchErrorDetailsHandlerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/FetchErrorDetailsHandlerSuite.scala index ebcd1de60057..33315682bd73 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/FetchErrorDetailsHandlerSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/FetchErrorDetailsHandlerSuite.scala @@ -75,7 +75,7 @@ class FetchErrorDetailsHandlerSuite extends SharedSparkSession with ResourceHelp val errorId = UUID.randomUUID().toString() val sessionHolder = SparkConnectService - .getOrCreateIsolatedSession(userId, sessionId) + .getOrCreateIsolatedSession(userId, sessionId, None) sessionHolder.errorIdToError.put(errorId, testError) @@ -125,7 +125,7 @@ class FetchErrorDetailsHandlerSuite extends SharedSparkSession with ResourceHelp val errorId = UUID.randomUUID().toString() SparkConnectService - .getOrCreateIsolatedSession(userId, sessionId) + .getOrCreateIsolatedSession(userId, sessionId, None) .errorIdToError .put(errorId, testError) @@ -138,7 +138,7 @@ class FetchErrorDetailsHandlerSuite extends SharedSparkSession with ResourceHelp assert( SparkConnectService - .getOrCreateIsolatedSession(userId, sessionId) + .getOrCreateIsolatedSession(userId, sessionId, None) .errorIdToError .size() == 0) } @@ -149,7 +149,7 @@ class FetchErrorDetailsHandlerSuite extends SharedSparkSession with ResourceHelp val errorId = UUID.randomUUID().toString() SparkConnectService - .getOrCreateIsolatedSession(userId, sessionId) + .getOrCreateIsolatedSession(userId, sessionId, None) .errorIdToError .put(errorId, testError) @@ -175,7 +175,7 @@ class FetchErrorDetailsHandlerSuite extends SharedSparkSession with ResourceHelp val errorId = UUID.randomUUID().toString() SparkConnectService - .getOrCreateIsolatedSession(userId, sessionId) + .getOrCreateIsolatedSession(userId, sessionId, None) .errorIdToError .put(errorId, testError) @@ -192,7 +192,7 @@ class FetchErrorDetailsHandlerSuite extends SharedSparkSession with ResourceHelp val errorId = UUID.randomUUID().toString() SparkConnectService - .getOrCreateIsolatedSession(userId, sessionId) + .getOrCreateIsolatedSession(userId, sessionId, None) .errorIdToError .put(errorId, testError) diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala index 7776148077fc..33560cd53f6b 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala @@ -202,4 +202,24 @@ class SparkConnectServiceE2ESuite extends SparkConnectServerTest { } } } + + test("SessionValidation: server validates that the client is talking to the same session.") { + val sessionId = UUID.randomUUID.toString() + val userId = "Y" + withClient(sessionId = sessionId, userId = userId) { client => + // this will create the session, and then ReleaseSession at the end of withClient. + val query = client.execute(buildPlan("SELECT 1")) + query.hasNext // trigger execution + // Same session id. + val new_query = client.execute(buildPlan("SELECT 1 + 1")) + new_query.hasNext // trigger execution + // Change the server session id in the client for testing and try to run something. + client.hijackServerSideSessionIdForTesting("-testing") + val queryError = intercept[SparkException] { + val newest_query = client.execute(buildPlan("SELECT 1 + 1 + 1")) + newest_query.hasNext + } + assert(queryError.getMessage.contains("INVALID_HANDLE.SESSION_CHANGED")) + } + } } diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManagerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManagerSuite.scala index fadbd9fa502e..42bb93de05e2 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManagerSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManagerSuite.scala @@ -35,7 +35,7 @@ class SparkConnectSessionManagerSuite extends SharedSparkSession with BeforeAndA test("sessionId needs to be an UUID") { val key = SessionKey("user", "not an uuid") val exGetOrCreate = intercept[SparkSQLException] { - SparkConnectService.sessionManager.getOrCreateIsolatedSession(key) + SparkConnectService.sessionManager.getOrCreateIsolatedSession(key, None) } assert(exGetOrCreate.getErrorClass == "INVALID_HANDLE.FORMAT") } @@ -44,33 +44,51 @@ class SparkConnectSessionManagerSuite extends SharedSparkSession with BeforeAndA "getOrCreateIsolatedSession/getIsolatedSession/getIsolatedSessionIfPresent " + "gets the existing session") { val key = SessionKey("user", UUID.randomUUID().toString) - val sessionHolder = SparkConnectService.sessionManager.getOrCreateIsolatedSession(key) + val sessionHolder = SparkConnectService.sessionManager.getOrCreateIsolatedSession(key, None) val sessionGetOrCreate = - SparkConnectService.sessionManager.getOrCreateIsolatedSession(key) + SparkConnectService.sessionManager.getOrCreateIsolatedSession(key, None) assert(sessionGetOrCreate === sessionHolder) - val sessionGet = SparkConnectService.sessionManager.getIsolatedSession(key) + val sessionGet = SparkConnectService.sessionManager.getIsolatedSession(key, None) assert(sessionGet === sessionHolder) val sessionGetIfPresent = SparkConnectService.sessionManager.getIsolatedSessionIfPresent(key) assert(sessionGetIfPresent.get === sessionHolder) } + test("client-observed session id validation works") { + val key = SessionKey("user", UUID.randomUUID().toString) + val sessionHolder = SparkConnectService.sessionManager.getOrCreateIsolatedSession(key, None) + // Works if the client doesn't set the observed session id. + SparkConnectService.sessionManager.getOrCreateIsolatedSession(key, None) + // Works with the correct existing session id. + SparkConnectService.sessionManager.getOrCreateIsolatedSession( + key, + Some(sessionHolder.session.sessionUUID)) + // Fails with the different session id. + val exGet = intercept[SparkSQLException] { + SparkConnectService.sessionManager.getOrCreateIsolatedSession( + key, + Some(sessionHolder.session.sessionUUID + "invalid")) + } + assert(exGet.getErrorClass == "INVALID_HANDLE.SESSION_CHANGED") + } + test( "getOrCreateIsolatedSession/getIsolatedSession/getIsolatedSessionIfPresent " + "doesn't recreate closed session") { val key = SessionKey("user", UUID.randomUUID().toString) - val sessionHolder = SparkConnectService.sessionManager.getOrCreateIsolatedSession(key) + val sessionHolder = SparkConnectService.sessionManager.getOrCreateIsolatedSession(key, None) SparkConnectService.sessionManager.closeSession(key) val exGetOrCreate = intercept[SparkSQLException] { - SparkConnectService.sessionManager.getOrCreateIsolatedSession(key) + SparkConnectService.sessionManager.getOrCreateIsolatedSession(key, None) } assert(exGetOrCreate.getErrorClass == "INVALID_HANDLE.SESSION_CLOSED") val exGet = intercept[SparkSQLException] { - SparkConnectService.sessionManager.getIsolatedSession(key) + SparkConnectService.sessionManager.getIsolatedSession(key, None) } assert(exGet.getErrorClass == "INVALID_HANDLE.SESSION_CLOSED") @@ -82,7 +100,7 @@ class SparkConnectSessionManagerSuite extends SharedSparkSession with BeforeAndA val key = SessionKey("user", UUID.randomUUID().toString) val exGet = intercept[SparkSQLException] { - SparkConnectService.sessionManager.getIsolatedSession(key) + SparkConnectService.sessionManager.getIsolatedSession(key, None) } assert(exGet.getErrorClass == "INVALID_HANDLE.SESSION_NOT_FOUND") @@ -92,7 +110,7 @@ class SparkConnectSessionManagerSuite extends SharedSparkSession with BeforeAndA test("SessionHolder with custom expiration time is not cleaned up due to inactivity") { val key = SessionKey("user", UUID.randomUUID().toString) - val sessionHolder = SparkConnectService.sessionManager.getOrCreateIsolatedSession(key) + val sessionHolder = SparkConnectService.sessionManager.getOrCreateIsolatedSession(key, None) assert( SparkConnectService.sessionManager.listActiveSessions.exists( @@ -117,7 +135,7 @@ class SparkConnectSessionManagerSuite extends SharedSparkSession with BeforeAndA test("SessionHolder is recorded with status closed after close") { val key = SessionKey("user", UUID.randomUUID().toString) - val sessionHolder = SparkConnectService.sessionManager.getOrCreateIsolatedSession(key) + val sessionHolder = SparkConnectService.sessionManager.getOrCreateIsolatedSession(key, None) val activeSessionInfo = SparkConnectService.sessionManager.listActiveSessions.find( _.sessionId == sessionHolder.sessionId) diff --git a/docs/sql-error-conditions-invalid-handle-error-class.md b/docs/sql-error-conditions-invalid-handle-error-class.md index ef21dd6479bf..953a8e00865d 100644 --- a/docs/sql-error-conditions-invalid-handle-error-class.md +++ b/docs/sql-error-conditions-invalid-handle-error-class.md @@ -46,6 +46,10 @@ Operation already exists. Operation not found. +## SESSION_CHANGED + +The existing Spark server driver instance has restarted. Please reconnect. + ## SESSION_CLOSED Session was closed. diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 2a1f25d83f47..3474d7078564 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -1054,6 +1054,8 @@ class SparkConnectClient(object): client_type=self._builder.userAgent, tags=list(self.get_tags()), ) + if self._server_session_id is not None: + req.client_observed_server_side_session_id = self._server_session_id if self._user_id: req.user_context.user_id = self._user_id return req @@ -1061,6 +1063,8 @@ class SparkConnectClient(object): def _analyze_plan_request_with_metadata(self) -> pb2.AnalyzePlanRequest: req = pb2.AnalyzePlanRequest() req.session_id = self._session_id + if self._server_session_id is not None: + req.client_observed_server_side_session_id = self._server_session_id req.client_type = self._builder.userAgent if self._user_id: req.user_context.user_id = self._user_id @@ -1381,6 +1385,8 @@ class SparkConnectClient(object): def _config_request_with_metadata(self) -> pb2.ConfigRequest: req = pb2.ConfigRequest() req.session_id = self._session_id + if self._server_session_id is not None: + req.client_observed_server_side_session_id = self._server_session_id req.client_type = self._builder.userAgent if self._user_id: req.user_context.user_id = self._user_id @@ -1416,6 +1422,8 @@ class SparkConnectClient(object): The result of the config call. """ req = self._config_request_with_metadata() + if self._server_session_id is not None: + req.client_observed_server_side_session_id = self._server_session_id req.operation.CopyFrom(operation) try: for attempt in self._retrying(): @@ -1432,6 +1440,8 @@ class SparkConnectClient(object): ) -> pb2.InterruptRequest: req = pb2.InterruptRequest() req.session_id = self._session_id + if self._server_session_id is not None: + req.client_observed_server_side_session_id = self._server_session_id req.client_type = self._builder.userAgent if interrupt_type == "all": req.interrupt_type = pb2.InterruptRequest.InterruptType.INTERRUPT_TYPE_ALL @@ -1591,6 +1601,8 @@ class SparkConnectClient(object): client_type=self._builder.userAgent, error_id=info.metadata["errorId"], ) + if self._server_session_id is not None: + req.client_observed_server_side_session_id = self._server_session_id if self._user_id: req.user_context.user_id = self._user_id diff --git a/python/pyspark/sql/connect/proto/base_pb2.py b/python/pyspark/sql/connect/proto/base_pb2.py index 1941900ae69d..af42f9b73628 100644 --- a/python/pyspark/sql/connect/proto/base_pb2.py +++ b/python/pyspark/sql/connect/proto/base_pb2.py @@ -37,7 +37,7 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17 [...] + b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17 [...] ) _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) @@ -58,167 +58,167 @@ if _descriptor._USE_C_DESCRIPTORS == False: _USERCONTEXT._serialized_start = 337 _USERCONTEXT._serialized_end = 459 _ANALYZEPLANREQUEST._serialized_start = 462 - _ANALYZEPLANREQUEST._serialized_end = 2883 - _ANALYZEPLANREQUEST_SCHEMA._serialized_start = 1657 - _ANALYZEPLANREQUEST_SCHEMA._serialized_end = 1706 - _ANALYZEPLANREQUEST_EXPLAIN._serialized_start = 1709 - _ANALYZEPLANREQUEST_EXPLAIN._serialized_end = 2024 - _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_start = 1852 - _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_end = 2024 - _ANALYZEPLANREQUEST_TREESTRING._serialized_start = 2026 - _ANALYZEPLANREQUEST_TREESTRING._serialized_end = 2116 - _ANALYZEPLANREQUEST_ISLOCAL._serialized_start = 2118 - _ANALYZEPLANREQUEST_ISLOCAL._serialized_end = 2168 - _ANALYZEPLANREQUEST_ISSTREAMING._serialized_start = 2170 - _ANALYZEPLANREQUEST_ISSTREAMING._serialized_end = 2224 - _ANALYZEPLANREQUEST_INPUTFILES._serialized_start = 2226 - _ANALYZEPLANREQUEST_INPUTFILES._serialized_end = 2279 - _ANALYZEPLANREQUEST_SPARKVERSION._serialized_start = 2281 - _ANALYZEPLANREQUEST_SPARKVERSION._serialized_end = 2295 - _ANALYZEPLANREQUEST_DDLPARSE._serialized_start = 2297 - _ANALYZEPLANREQUEST_DDLPARSE._serialized_end = 2338 - _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_start = 2340 - _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_end = 2461 - _ANALYZEPLANREQUEST_SEMANTICHASH._serialized_start = 2463 - _ANALYZEPLANREQUEST_SEMANTICHASH._serialized_end = 2518 - _ANALYZEPLANREQUEST_PERSIST._serialized_start = 2521 - _ANALYZEPLANREQUEST_PERSIST._serialized_end = 2672 - _ANALYZEPLANREQUEST_UNPERSIST._serialized_start = 2674 - _ANALYZEPLANREQUEST_UNPERSIST._serialized_end = 2784 - _ANALYZEPLANREQUEST_GETSTORAGELEVEL._serialized_start = 2786 - _ANALYZEPLANREQUEST_GETSTORAGELEVEL._serialized_end = 2856 - _ANALYZEPLANRESPONSE._serialized_start = 2886 - _ANALYZEPLANRESPONSE._serialized_end = 4628 - _ANALYZEPLANRESPONSE_SCHEMA._serialized_start = 4047 - _ANALYZEPLANRESPONSE_SCHEMA._serialized_end = 4104 - _ANALYZEPLANRESPONSE_EXPLAIN._serialized_start = 4106 - _ANALYZEPLANRESPONSE_EXPLAIN._serialized_end = 4154 - _ANALYZEPLANRESPONSE_TREESTRING._serialized_start = 4156 - _ANALYZEPLANRESPONSE_TREESTRING._serialized_end = 4201 - _ANALYZEPLANRESPONSE_ISLOCAL._serialized_start = 4203 - _ANALYZEPLANRESPONSE_ISLOCAL._serialized_end = 4239 - _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_start = 4241 - _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_end = 4289 - _ANALYZEPLANRESPONSE_INPUTFILES._serialized_start = 4291 - _ANALYZEPLANRESPONSE_INPUTFILES._serialized_end = 4325 - _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_start = 4327 - _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_end = 4367 - _ANALYZEPLANRESPONSE_DDLPARSE._serialized_start = 4369 - _ANALYZEPLANRESPONSE_DDLPARSE._serialized_end = 4428 - _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_start = 4430 - _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_end = 4469 - _ANALYZEPLANRESPONSE_SEMANTICHASH._serialized_start = 4471 - _ANALYZEPLANRESPONSE_SEMANTICHASH._serialized_end = 4509 - _ANALYZEPLANRESPONSE_PERSIST._serialized_start = 2521 - _ANALYZEPLANRESPONSE_PERSIST._serialized_end = 2530 - _ANALYZEPLANRESPONSE_UNPERSIST._serialized_start = 2674 - _ANALYZEPLANRESPONSE_UNPERSIST._serialized_end = 2685 - _ANALYZEPLANRESPONSE_GETSTORAGELEVEL._serialized_start = 4535 - _ANALYZEPLANRESPONSE_GETSTORAGELEVEL._serialized_end = 4618 - _EXECUTEPLANREQUEST._serialized_start = 4631 - _EXECUTEPLANREQUEST._serialized_end = 5175 - _EXECUTEPLANREQUEST_REQUESTOPTION._serialized_start = 4977 - _EXECUTEPLANREQUEST_REQUESTOPTION._serialized_end = 5142 - _EXECUTEPLANRESPONSE._serialized_start = 5178 - _EXECUTEPLANRESPONSE._serialized_end = 7391 - _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_start = 6527 - _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_end = 6598 - _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 6600 - _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 6718 - _EXECUTEPLANRESPONSE_METRICS._serialized_start = 6721 - _EXECUTEPLANRESPONSE_METRICS._serialized_end = 7238 - _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 6816 - _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 7148 - _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 7025 - _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 7148 - _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 7150 - _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 7238 - _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_start = 7240 - _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_end = 7356 - _EXECUTEPLANRESPONSE_RESULTCOMPLETE._serialized_start = 7358 - _EXECUTEPLANRESPONSE_RESULTCOMPLETE._serialized_end = 7374 - _KEYVALUE._serialized_start = 7393 - _KEYVALUE._serialized_end = 7458 - _CONFIGREQUEST._serialized_start = 7461 - _CONFIGREQUEST._serialized_end = 8489 - _CONFIGREQUEST_OPERATION._serialized_start = 7681 - _CONFIGREQUEST_OPERATION._serialized_end = 8179 - _CONFIGREQUEST_SET._serialized_start = 8181 - _CONFIGREQUEST_SET._serialized_end = 8233 - _CONFIGREQUEST_GET._serialized_start = 8235 - _CONFIGREQUEST_GET._serialized_end = 8260 - _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 8262 - _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 8325 - _CONFIGREQUEST_GETOPTION._serialized_start = 8327 - _CONFIGREQUEST_GETOPTION._serialized_end = 8358 - _CONFIGREQUEST_GETALL._serialized_start = 8360 - _CONFIGREQUEST_GETALL._serialized_end = 8408 - _CONFIGREQUEST_UNSET._serialized_start = 8410 - _CONFIGREQUEST_UNSET._serialized_end = 8437 - _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 8439 - _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 8473 - _CONFIGRESPONSE._serialized_start = 8492 - _CONFIGRESPONSE._serialized_end = 8667 - _ADDARTIFACTSREQUEST._serialized_start = 8670 - _ADDARTIFACTSREQUEST._serialized_end = 9541 - _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 9057 - _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 9110 - _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 9112 - _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 9223 - _ADDARTIFACTSREQUEST_BATCH._serialized_start = 9225 - _ADDARTIFACTSREQUEST_BATCH._serialized_end = 9318 - _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 9321 - _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 9514 - _ADDARTIFACTSRESPONSE._serialized_start = 9544 - _ADDARTIFACTSRESPONSE._serialized_end = 9816 - _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 9735 - _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 9816 - _ARTIFACTSTATUSESREQUEST._serialized_start = 9819 - _ARTIFACTSTATUSESREQUEST._serialized_end = 10014 - _ARTIFACTSTATUSESRESPONSE._serialized_start = 10017 - _ARTIFACTSTATUSESRESPONSE._serialized_end = 10369 - _ARTIFACTSTATUSESRESPONSE_STATUSESENTRY._serialized_start = 10212 - _ARTIFACTSTATUSESRESPONSE_STATUSESENTRY._serialized_end = 10327 - _ARTIFACTSTATUSESRESPONSE_ARTIFACTSTATUS._serialized_start = 10329 - _ARTIFACTSTATUSESRESPONSE_ARTIFACTSTATUS._serialized_end = 10369 - _INTERRUPTREQUEST._serialized_start = 10372 - _INTERRUPTREQUEST._serialized_end = 10844 - _INTERRUPTREQUEST_INTERRUPTTYPE._serialized_start = 10687 - _INTERRUPTREQUEST_INTERRUPTTYPE._serialized_end = 10815 - _INTERRUPTRESPONSE._serialized_start = 10847 - _INTERRUPTRESPONSE._serialized_end = 10991 - _REATTACHOPTIONS._serialized_start = 10993 - _REATTACHOPTIONS._serialized_end = 11046 - _REATTACHEXECUTEREQUEST._serialized_start = 11049 - _REATTACHEXECUTEREQUEST._serialized_end = 11324 - _RELEASEEXECUTEREQUEST._serialized_start = 11327 - _RELEASEEXECUTEREQUEST._serialized_end = 11781 - _RELEASEEXECUTEREQUEST_RELEASEALL._serialized_start = 11693 - _RELEASEEXECUTEREQUEST_RELEASEALL._serialized_end = 11705 - _RELEASEEXECUTEREQUEST_RELEASEUNTIL._serialized_start = 11707 - _RELEASEEXECUTEREQUEST_RELEASEUNTIL._serialized_end = 11754 - _RELEASEEXECUTERESPONSE._serialized_start = 11784 - _RELEASEEXECUTERESPONSE._serialized_end = 11949 - _RELEASESESSIONREQUEST._serialized_start = 11952 - _RELEASESESSIONREQUEST._serialized_end = 12123 - _RELEASESESSIONRESPONSE._serialized_start = 12125 - _RELEASESESSIONRESPONSE._serialized_end = 12233 - _FETCHERRORDETAILSREQUEST._serialized_start = 12236 - _FETCHERRORDETAILSREQUEST._serialized_end = 12437 - _FETCHERRORDETAILSRESPONSE._serialized_start = 12440 - _FETCHERRORDETAILSRESPONSE._serialized_end = 13995 - _FETCHERRORDETAILSRESPONSE_STACKTRACEELEMENT._serialized_start = 12669 - _FETCHERRORDETAILSRESPONSE_STACKTRACEELEMENT._serialized_end = 12843 - _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT._serialized_start = 12846 - _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT._serialized_end = 13214 - _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT_CONTEXTTYPE._serialized_start = 13177 - _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT_CONTEXTTYPE._serialized_end = 13214 - _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE._serialized_start = 13217 - _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE._serialized_end = 13626 - _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_start = 13528 - _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_end = 13596 - _FETCHERRORDETAILSRESPONSE_ERROR._serialized_start = 13629 - _FETCHERRORDETAILSRESPONSE_ERROR._serialized_end = 13976 - _SPARKCONNECTSERVICE._serialized_start = 13998 - _SPARKCONNECTSERVICE._serialized_end = 14944 + _ANALYZEPLANREQUEST._serialized_end = 3014 + _ANALYZEPLANREQUEST_SCHEMA._serialized_start = 1745 + _ANALYZEPLANREQUEST_SCHEMA._serialized_end = 1794 + _ANALYZEPLANREQUEST_EXPLAIN._serialized_start = 1797 + _ANALYZEPLANREQUEST_EXPLAIN._serialized_end = 2112 + _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_start = 1940 + _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_end = 2112 + _ANALYZEPLANREQUEST_TREESTRING._serialized_start = 2114 + _ANALYZEPLANREQUEST_TREESTRING._serialized_end = 2204 + _ANALYZEPLANREQUEST_ISLOCAL._serialized_start = 2206 + _ANALYZEPLANREQUEST_ISLOCAL._serialized_end = 2256 + _ANALYZEPLANREQUEST_ISSTREAMING._serialized_start = 2258 + _ANALYZEPLANREQUEST_ISSTREAMING._serialized_end = 2312 + _ANALYZEPLANREQUEST_INPUTFILES._serialized_start = 2314 + _ANALYZEPLANREQUEST_INPUTFILES._serialized_end = 2367 + _ANALYZEPLANREQUEST_SPARKVERSION._serialized_start = 2369 + _ANALYZEPLANREQUEST_SPARKVERSION._serialized_end = 2383 + _ANALYZEPLANREQUEST_DDLPARSE._serialized_start = 2385 + _ANALYZEPLANREQUEST_DDLPARSE._serialized_end = 2426 + _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_start = 2428 + _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_end = 2549 + _ANALYZEPLANREQUEST_SEMANTICHASH._serialized_start = 2551 + _ANALYZEPLANREQUEST_SEMANTICHASH._serialized_end = 2606 + _ANALYZEPLANREQUEST_PERSIST._serialized_start = 2609 + _ANALYZEPLANREQUEST_PERSIST._serialized_end = 2760 + _ANALYZEPLANREQUEST_UNPERSIST._serialized_start = 2762 + _ANALYZEPLANREQUEST_UNPERSIST._serialized_end = 2872 + _ANALYZEPLANREQUEST_GETSTORAGELEVEL._serialized_start = 2874 + _ANALYZEPLANREQUEST_GETSTORAGELEVEL._serialized_end = 2944 + _ANALYZEPLANRESPONSE._serialized_start = 3017 + _ANALYZEPLANRESPONSE._serialized_end = 4759 + _ANALYZEPLANRESPONSE_SCHEMA._serialized_start = 4178 + _ANALYZEPLANRESPONSE_SCHEMA._serialized_end = 4235 + _ANALYZEPLANRESPONSE_EXPLAIN._serialized_start = 4237 + _ANALYZEPLANRESPONSE_EXPLAIN._serialized_end = 4285 + _ANALYZEPLANRESPONSE_TREESTRING._serialized_start = 4287 + _ANALYZEPLANRESPONSE_TREESTRING._serialized_end = 4332 + _ANALYZEPLANRESPONSE_ISLOCAL._serialized_start = 4334 + _ANALYZEPLANRESPONSE_ISLOCAL._serialized_end = 4370 + _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_start = 4372 + _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_end = 4420 + _ANALYZEPLANRESPONSE_INPUTFILES._serialized_start = 4422 + _ANALYZEPLANRESPONSE_INPUTFILES._serialized_end = 4456 + _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_start = 4458 + _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_end = 4498 + _ANALYZEPLANRESPONSE_DDLPARSE._serialized_start = 4500 + _ANALYZEPLANRESPONSE_DDLPARSE._serialized_end = 4559 + _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_start = 4561 + _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_end = 4600 + _ANALYZEPLANRESPONSE_SEMANTICHASH._serialized_start = 4602 + _ANALYZEPLANRESPONSE_SEMANTICHASH._serialized_end = 4640 + _ANALYZEPLANRESPONSE_PERSIST._serialized_start = 2609 + _ANALYZEPLANRESPONSE_PERSIST._serialized_end = 2618 + _ANALYZEPLANRESPONSE_UNPERSIST._serialized_start = 2762 + _ANALYZEPLANRESPONSE_UNPERSIST._serialized_end = 2773 + _ANALYZEPLANRESPONSE_GETSTORAGELEVEL._serialized_start = 4666 + _ANALYZEPLANRESPONSE_GETSTORAGELEVEL._serialized_end = 4749 + _EXECUTEPLANREQUEST._serialized_start = 4762 + _EXECUTEPLANREQUEST._serialized_end = 5437 + _EXECUTEPLANREQUEST_REQUESTOPTION._serialized_start = 5196 + _EXECUTEPLANREQUEST_REQUESTOPTION._serialized_end = 5361 + _EXECUTEPLANRESPONSE._serialized_start = 5440 + _EXECUTEPLANRESPONSE._serialized_end = 7653 + _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_start = 6789 + _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_end = 6860 + _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 6862 + _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 6980 + _EXECUTEPLANRESPONSE_METRICS._serialized_start = 6983 + _EXECUTEPLANRESPONSE_METRICS._serialized_end = 7500 + _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 7078 + _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 7410 + _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 7287 + _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 7410 + _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 7412 + _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 7500 + _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_start = 7502 + _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_end = 7618 + _EXECUTEPLANRESPONSE_RESULTCOMPLETE._serialized_start = 7620 + _EXECUTEPLANRESPONSE_RESULTCOMPLETE._serialized_end = 7636 + _KEYVALUE._serialized_start = 7655 + _KEYVALUE._serialized_end = 7720 + _CONFIGREQUEST._serialized_start = 7723 + _CONFIGREQUEST._serialized_end = 8882 + _CONFIGREQUEST_OPERATION._serialized_start = 8031 + _CONFIGREQUEST_OPERATION._serialized_end = 8529 + _CONFIGREQUEST_SET._serialized_start = 8531 + _CONFIGREQUEST_SET._serialized_end = 8583 + _CONFIGREQUEST_GET._serialized_start = 8585 + _CONFIGREQUEST_GET._serialized_end = 8610 + _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 8612 + _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 8675 + _CONFIGREQUEST_GETOPTION._serialized_start = 8677 + _CONFIGREQUEST_GETOPTION._serialized_end = 8708 + _CONFIGREQUEST_GETALL._serialized_start = 8710 + _CONFIGREQUEST_GETALL._serialized_end = 8758 + _CONFIGREQUEST_UNSET._serialized_start = 8760 + _CONFIGREQUEST_UNSET._serialized_end = 8787 + _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 8789 + _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 8823 + _CONFIGRESPONSE._serialized_start = 8885 + _CONFIGRESPONSE._serialized_end = 9060 + _ADDARTIFACTSREQUEST._serialized_start = 9063 + _ADDARTIFACTSREQUEST._serialized_end = 10065 + _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 9538 + _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 9591 + _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 9593 + _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 9704 + _ADDARTIFACTSREQUEST_BATCH._serialized_start = 9706 + _ADDARTIFACTSREQUEST_BATCH._serialized_end = 9799 + _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 9802 + _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 9995 + _ADDARTIFACTSRESPONSE._serialized_start = 10068 + _ADDARTIFACTSRESPONSE._serialized_end = 10340 + _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 10259 + _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 10340 + _ARTIFACTSTATUSESREQUEST._serialized_start = 10343 + _ARTIFACTSTATUSESREQUEST._serialized_end = 10669 + _ARTIFACTSTATUSESRESPONSE._serialized_start = 10672 + _ARTIFACTSTATUSESRESPONSE._serialized_end = 11024 + _ARTIFACTSTATUSESRESPONSE_STATUSESENTRY._serialized_start = 10867 + _ARTIFACTSTATUSESRESPONSE_STATUSESENTRY._serialized_end = 10982 + _ARTIFACTSTATUSESRESPONSE_ARTIFACTSTATUS._serialized_start = 10984 + _ARTIFACTSTATUSESRESPONSE_ARTIFACTSTATUS._serialized_end = 11024 + _INTERRUPTREQUEST._serialized_start = 11027 + _INTERRUPTREQUEST._serialized_end = 11630 + _INTERRUPTREQUEST_INTERRUPTTYPE._serialized_start = 11430 + _INTERRUPTREQUEST_INTERRUPTTYPE._serialized_end = 11558 + _INTERRUPTRESPONSE._serialized_start = 11633 + _INTERRUPTRESPONSE._serialized_end = 11777 + _REATTACHOPTIONS._serialized_start = 11779 + _REATTACHOPTIONS._serialized_end = 11832 + _REATTACHEXECUTEREQUEST._serialized_start = 11835 + _REATTACHEXECUTEREQUEST._serialized_end = 12241 + _RELEASEEXECUTEREQUEST._serialized_start = 12244 + _RELEASEEXECUTEREQUEST._serialized_end = 12829 + _RELEASEEXECUTEREQUEST_RELEASEALL._serialized_start = 12698 + _RELEASEEXECUTEREQUEST_RELEASEALL._serialized_end = 12710 + _RELEASEEXECUTEREQUEST_RELEASEUNTIL._serialized_start = 12712 + _RELEASEEXECUTEREQUEST_RELEASEUNTIL._serialized_end = 12759 + _RELEASEEXECUTERESPONSE._serialized_start = 12832 + _RELEASEEXECUTERESPONSE._serialized_end = 12997 + _RELEASESESSIONREQUEST._serialized_start = 13000 + _RELEASESESSIONREQUEST._serialized_end = 13171 + _RELEASESESSIONRESPONSE._serialized_start = 13173 + _RELEASESESSIONRESPONSE._serialized_end = 13281 + _FETCHERRORDETAILSREQUEST._serialized_start = 13284 + _FETCHERRORDETAILSREQUEST._serialized_end = 13616 + _FETCHERRORDETAILSRESPONSE._serialized_start = 13619 + _FETCHERRORDETAILSRESPONSE._serialized_end = 15174 + _FETCHERRORDETAILSRESPONSE_STACKTRACEELEMENT._serialized_start = 13848 + _FETCHERRORDETAILSRESPONSE_STACKTRACEELEMENT._serialized_end = 14022 + _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT._serialized_start = 14025 + _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT._serialized_end = 14393 + _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT_CONTEXTTYPE._serialized_start = 14356 + _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT_CONTEXTTYPE._serialized_end = 14393 + _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE._serialized_start = 14396 + _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE._serialized_end = 14805 + _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_start = 14707 + _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_end = 14775 + _FETCHERRORDETAILSRESPONSE_ERROR._serialized_start = 14808 + _FETCHERRORDETAILSRESPONSE_ERROR._serialized_end = 15155 + _SPARKCONNECTSERVICE._serialized_start = 15177 + _SPARKCONNECTSERVICE._serialized_end = 16123 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi b/python/pyspark/sql/connect/proto/base_pb2.pyi index 5ed2d207aca5..86dec5711ece 100644 --- a/python/pyspark/sql/connect/proto/base_pb2.pyi +++ b/python/pyspark/sql/connect/proto/base_pb2.pyi @@ -478,6 +478,7 @@ class AnalyzePlanRequest(google.protobuf.message.Message): ) -> None: ... SESSION_ID_FIELD_NUMBER: builtins.int + CLIENT_OBSERVED_SERVER_SIDE_SESSION_ID_FIELD_NUMBER: builtins.int USER_CONTEXT_FIELD_NUMBER: builtins.int CLIENT_TYPE_FIELD_NUMBER: builtins.int SCHEMA_FIELD_NUMBER: builtins.int @@ -501,6 +502,12 @@ class AnalyzePlanRequest(google.protobuf.message.Message): collate streaming responses from different queries within the dedicated session. The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` """ + client_observed_server_side_session_id: builtins.str + """(Optional) + + Server-side generated idempotency key from the previous responses (if any). Server + can use this to validate that the server side session has not changed. + """ @property def user_context(self) -> global___UserContext: """(Required) User context""" @@ -539,6 +546,7 @@ class AnalyzePlanRequest(google.protobuf.message.Message): self, *, session_id: builtins.str = ..., + client_observed_server_side_session_id: builtins.str | None = ..., user_context: global___UserContext | None = ..., client_type: builtins.str | None = ..., schema: global___AnalyzePlanRequest.Schema | None = ..., @@ -558,10 +566,14 @@ class AnalyzePlanRequest(google.protobuf.message.Message): def HasField( self, field_name: typing_extensions.Literal[ + "_client_observed_server_side_session_id", + b"_client_observed_server_side_session_id", "_client_type", b"_client_type", "analyze", b"analyze", + "client_observed_server_side_session_id", + b"client_observed_server_side_session_id", "client_type", b"client_type", "ddl_parse", @@ -597,10 +609,14 @@ class AnalyzePlanRequest(google.protobuf.message.Message): def ClearField( self, field_name: typing_extensions.Literal[ + "_client_observed_server_side_session_id", + b"_client_observed_server_side_session_id", "_client_type", b"_client_type", "analyze", b"analyze", + "client_observed_server_side_session_id", + b"client_observed_server_side_session_id", "client_type", b"client_type", "ddl_parse", @@ -636,6 +652,13 @@ class AnalyzePlanRequest(google.protobuf.message.Message): ], ) -> None: ... @typing.overload + def WhichOneof( + self, + oneof_group: typing_extensions.Literal[ + "_client_observed_server_side_session_id", b"_client_observed_server_side_session_id" + ], + ) -> typing_extensions.Literal["client_observed_server_side_session_id"] | None: ... + @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal["_client_type", b"_client_type"] ) -> typing_extensions.Literal["client_type"] | None: ... @@ -1060,6 +1083,7 @@ class ExecutePlanRequest(google.protobuf.message.Message): ) -> typing_extensions.Literal["reattach_options", "extension"] | None: ... SESSION_ID_FIELD_NUMBER: builtins.int + CLIENT_OBSERVED_SERVER_SIDE_SESSION_ID_FIELD_NUMBER: builtins.int USER_CONTEXT_FIELD_NUMBER: builtins.int OPERATION_ID_FIELD_NUMBER: builtins.int PLAN_FIELD_NUMBER: builtins.int @@ -1074,6 +1098,12 @@ class ExecutePlanRequest(google.protobuf.message.Message): collate streaming responses from different queries within the dedicated session. The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` """ + client_observed_server_side_session_id: builtins.str + """(Optional) + + Server-side generated idempotency key from the previous responses (if any). Server + can use this to validate that the server side session has not changed. + """ @property def user_context(self) -> global___UserContext: """(Required) User context @@ -1116,6 +1146,7 @@ class ExecutePlanRequest(google.protobuf.message.Message): self, *, session_id: builtins.str = ..., + client_observed_server_side_session_id: builtins.str | None = ..., user_context: global___UserContext | None = ..., operation_id: builtins.str | None = ..., plan: global___Plan | None = ..., @@ -1127,10 +1158,14 @@ class ExecutePlanRequest(google.protobuf.message.Message): def HasField( self, field_name: typing_extensions.Literal[ + "_client_observed_server_side_session_id", + b"_client_observed_server_side_session_id", "_client_type", b"_client_type", "_operation_id", b"_operation_id", + "client_observed_server_side_session_id", + b"client_observed_server_side_session_id", "client_type", b"client_type", "operation_id", @@ -1144,10 +1179,14 @@ class ExecutePlanRequest(google.protobuf.message.Message): def ClearField( self, field_name: typing_extensions.Literal[ + "_client_observed_server_side_session_id", + b"_client_observed_server_side_session_id", "_client_type", b"_client_type", "_operation_id", b"_operation_id", + "client_observed_server_side_session_id", + b"client_observed_server_side_session_id", "client_type", b"client_type", "operation_id", @@ -1165,6 +1204,13 @@ class ExecutePlanRequest(google.protobuf.message.Message): ], ) -> None: ... @typing.overload + def WhichOneof( + self, + oneof_group: typing_extensions.Literal[ + "_client_observed_server_side_session_id", b"_client_observed_server_side_session_id" + ], + ) -> typing_extensions.Literal["client_observed_server_side_session_id"] | None: ... + @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal["_client_type", b"_client_type"] ) -> typing_extensions.Literal["client_type"] | None: ... @@ -1836,6 +1882,7 @@ class ConfigRequest(google.protobuf.message.Message): def ClearField(self, field_name: typing_extensions.Literal["keys", b"keys"]) -> None: ... SESSION_ID_FIELD_NUMBER: builtins.int + CLIENT_OBSERVED_SERVER_SIDE_SESSION_ID_FIELD_NUMBER: builtins.int USER_CONTEXT_FIELD_NUMBER: builtins.int OPERATION_FIELD_NUMBER: builtins.int CLIENT_TYPE_FIELD_NUMBER: builtins.int @@ -1847,6 +1894,12 @@ class ConfigRequest(google.protobuf.message.Message): collate streaming responses from different queries within the dedicated session. The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` """ + client_observed_server_side_session_id: builtins.str + """(Optional) + + Server-side generated idempotency key from the previous responses (if any). Server + can use this to validate that the server side session has not changed. + """ @property def user_context(self) -> global___UserContext: """(Required) User context""" @@ -1862,6 +1915,7 @@ class ConfigRequest(google.protobuf.message.Message): self, *, session_id: builtins.str = ..., + client_observed_server_side_session_id: builtins.str | None = ..., user_context: global___UserContext | None = ..., operation: global___ConfigRequest.Operation | None = ..., client_type: builtins.str | None = ..., @@ -1869,8 +1923,12 @@ class ConfigRequest(google.protobuf.message.Message): def HasField( self, field_name: typing_extensions.Literal[ + "_client_observed_server_side_session_id", + b"_client_observed_server_side_session_id", "_client_type", b"_client_type", + "client_observed_server_side_session_id", + b"client_observed_server_side_session_id", "client_type", b"client_type", "operation", @@ -1882,8 +1940,12 @@ class ConfigRequest(google.protobuf.message.Message): def ClearField( self, field_name: typing_extensions.Literal[ + "_client_observed_server_side_session_id", + b"_client_observed_server_side_session_id", "_client_type", b"_client_type", + "client_observed_server_side_session_id", + b"client_observed_server_side_session_id", "client_type", b"client_type", "operation", @@ -1894,6 +1956,14 @@ class ConfigRequest(google.protobuf.message.Message): b"user_context", ], ) -> None: ... + @typing.overload + def WhichOneof( + self, + oneof_group: typing_extensions.Literal[ + "_client_observed_server_side_session_id", b"_client_observed_server_side_session_id" + ], + ) -> typing_extensions.Literal["client_observed_server_side_session_id"] | None: ... + @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal["_client_type", b"_client_type"] ) -> typing_extensions.Literal["client_type"] | None: ... @@ -2090,6 +2160,7 @@ class AddArtifactsRequest(google.protobuf.message.Message): SESSION_ID_FIELD_NUMBER: builtins.int USER_CONTEXT_FIELD_NUMBER: builtins.int + CLIENT_OBSERVED_SERVER_SIDE_SESSION_ID_FIELD_NUMBER: builtins.int CLIENT_TYPE_FIELD_NUMBER: builtins.int BATCH_FIELD_NUMBER: builtins.int BEGIN_CHUNK_FIELD_NUMBER: builtins.int @@ -2105,6 +2176,12 @@ class AddArtifactsRequest(google.protobuf.message.Message): @property def user_context(self) -> global___UserContext: """User context""" + client_observed_server_side_session_id: builtins.str + """(Optional) + + Server-side generated idempotency key from the previous responses (if any). Server + can use this to validate that the server side session has not changed. + """ client_type: builtins.str """Provides optional information about the client sending the request. This field can be used for language or version specific information and is only intended for @@ -2128,6 +2205,7 @@ class AddArtifactsRequest(google.protobuf.message.Message): *, session_id: builtins.str = ..., user_context: global___UserContext | None = ..., + client_observed_server_side_session_id: builtins.str | None = ..., client_type: builtins.str | None = ..., batch: global___AddArtifactsRequest.Batch | None = ..., begin_chunk: global___AddArtifactsRequest.BeginChunkedArtifact | None = ..., @@ -2136,6 +2214,8 @@ class AddArtifactsRequest(google.protobuf.message.Message): def HasField( self, field_name: typing_extensions.Literal[ + "_client_observed_server_side_session_id", + b"_client_observed_server_side_session_id", "_client_type", b"_client_type", "batch", @@ -2144,6 +2224,8 @@ class AddArtifactsRequest(google.protobuf.message.Message): b"begin_chunk", "chunk", b"chunk", + "client_observed_server_side_session_id", + b"client_observed_server_side_session_id", "client_type", b"client_type", "payload", @@ -2155,6 +2237,8 @@ class AddArtifactsRequest(google.protobuf.message.Message): def ClearField( self, field_name: typing_extensions.Literal[ + "_client_observed_server_side_session_id", + b"_client_observed_server_side_session_id", "_client_type", b"_client_type", "batch", @@ -2163,6 +2247,8 @@ class AddArtifactsRequest(google.protobuf.message.Message): b"begin_chunk", "chunk", b"chunk", + "client_observed_server_side_session_id", + b"client_observed_server_side_session_id", "client_type", b"client_type", "payload", @@ -2174,6 +2260,13 @@ class AddArtifactsRequest(google.protobuf.message.Message): ], ) -> None: ... @typing.overload + def WhichOneof( + self, + oneof_group: typing_extensions.Literal[ + "_client_observed_server_side_session_id", b"_client_observed_server_side_session_id" + ], + ) -> typing_extensions.Literal["client_observed_server_side_session_id"] | None: ... + @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal["_client_type", b"_client_type"] ) -> typing_extensions.Literal["client_type"] | None: ... @@ -2262,6 +2355,7 @@ class ArtifactStatusesRequest(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor SESSION_ID_FIELD_NUMBER: builtins.int + CLIENT_OBSERVED_SERVER_SIDE_SESSION_ID_FIELD_NUMBER: builtins.int USER_CONTEXT_FIELD_NUMBER: builtins.int CLIENT_TYPE_FIELD_NUMBER: builtins.int NAMES_FIELD_NUMBER: builtins.int @@ -2273,6 +2367,12 @@ class ArtifactStatusesRequest(google.protobuf.message.Message): collate streaming responses from different queries within the dedicated session. The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` """ + client_observed_server_side_session_id: builtins.str + """(Optional) + + Server-side generated idempotency key from the previous responses (if any). Server + can use this to validate that the server side session has not changed. + """ @property def user_context(self) -> global___UserContext: """User context""" @@ -2296,6 +2396,7 @@ class ArtifactStatusesRequest(google.protobuf.message.Message): self, *, session_id: builtins.str = ..., + client_observed_server_side_session_id: builtins.str | None = ..., user_context: global___UserContext | None = ..., client_type: builtins.str | None = ..., names: collections.abc.Iterable[builtins.str] | None = ..., @@ -2303,8 +2404,12 @@ class ArtifactStatusesRequest(google.protobuf.message.Message): def HasField( self, field_name: typing_extensions.Literal[ + "_client_observed_server_side_session_id", + b"_client_observed_server_side_session_id", "_client_type", b"_client_type", + "client_observed_server_side_session_id", + b"client_observed_server_side_session_id", "client_type", b"client_type", "user_context", @@ -2314,8 +2419,12 @@ class ArtifactStatusesRequest(google.protobuf.message.Message): def ClearField( self, field_name: typing_extensions.Literal[ + "_client_observed_server_side_session_id", + b"_client_observed_server_side_session_id", "_client_type", b"_client_type", + "client_observed_server_side_session_id", + b"client_observed_server_side_session_id", "client_type", b"client_type", "names", @@ -2326,6 +2435,14 @@ class ArtifactStatusesRequest(google.protobuf.message.Message): b"user_context", ], ) -> None: ... + @typing.overload + def WhichOneof( + self, + oneof_group: typing_extensions.Literal[ + "_client_observed_server_side_session_id", b"_client_observed_server_side_session_id" + ], + ) -> typing_extensions.Literal["client_observed_server_side_session_id"] | None: ... + @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal["_client_type", b"_client_type"] ) -> typing_extensions.Literal["client_type"] | None: ... @@ -2447,6 +2564,7 @@ class InterruptRequest(google.protobuf.message.Message): """Interrupt the running execution within the session with the provided operation_id.""" SESSION_ID_FIELD_NUMBER: builtins.int + CLIENT_OBSERVED_SERVER_SIDE_SESSION_ID_FIELD_NUMBER: builtins.int USER_CONTEXT_FIELD_NUMBER: builtins.int CLIENT_TYPE_FIELD_NUMBER: builtins.int INTERRUPT_TYPE_FIELD_NUMBER: builtins.int @@ -2460,6 +2578,12 @@ class InterruptRequest(google.protobuf.message.Message): collate streaming responses from different queries within the dedicated session. The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` """ + client_observed_server_side_session_id: builtins.str + """(Optional) + + Server-side generated idempotency key from the previous responses (if any). Server + can use this to validate that the server side session has not changed. + """ @property def user_context(self) -> global___UserContext: """(Required) User context""" @@ -2478,6 +2602,7 @@ class InterruptRequest(google.protobuf.message.Message): self, *, session_id: builtins.str = ..., + client_observed_server_side_session_id: builtins.str | None = ..., user_context: global___UserContext | None = ..., client_type: builtins.str | None = ..., interrupt_type: global___InterruptRequest.InterruptType.ValueType = ..., @@ -2487,8 +2612,12 @@ class InterruptRequest(google.protobuf.message.Message): def HasField( self, field_name: typing_extensions.Literal[ + "_client_observed_server_side_session_id", + b"_client_observed_server_side_session_id", "_client_type", b"_client_type", + "client_observed_server_side_session_id", + b"client_observed_server_side_session_id", "client_type", b"client_type", "interrupt", @@ -2504,8 +2633,12 @@ class InterruptRequest(google.protobuf.message.Message): def ClearField( self, field_name: typing_extensions.Literal[ + "_client_observed_server_side_session_id", + b"_client_observed_server_side_session_id", "_client_type", b"_client_type", + "client_observed_server_side_session_id", + b"client_observed_server_side_session_id", "client_type", b"client_type", "interrupt", @@ -2523,6 +2656,13 @@ class InterruptRequest(google.protobuf.message.Message): ], ) -> None: ... @typing.overload + def WhichOneof( + self, + oneof_group: typing_extensions.Literal[ + "_client_observed_server_side_session_id", b"_client_observed_server_side_session_id" + ], + ) -> typing_extensions.Literal["client_observed_server_side_session_id"] | None: ... + @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal["_client_type", b"_client_type"] ) -> typing_extensions.Literal["client_type"] | None: ... @@ -2602,6 +2742,7 @@ class ReattachExecuteRequest(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor SESSION_ID_FIELD_NUMBER: builtins.int + CLIENT_OBSERVED_SERVER_SIDE_SESSION_ID_FIELD_NUMBER: builtins.int USER_CONTEXT_FIELD_NUMBER: builtins.int OPERATION_ID_FIELD_NUMBER: builtins.int CLIENT_TYPE_FIELD_NUMBER: builtins.int @@ -2612,6 +2753,12 @@ class ReattachExecuteRequest(google.protobuf.message.Message): The session_id of the request to reattach to. This must be an id of existing session. """ + client_observed_server_side_session_id: builtins.str + """(Optional) + + Server-side generated idempotency key from the previous responses (if any). Server + can use this to validate that the server side session has not changed. + """ @property def user_context(self) -> global___UserContext: """(Required) User context @@ -2643,6 +2790,7 @@ class ReattachExecuteRequest(google.protobuf.message.Message): self, *, session_id: builtins.str = ..., + client_observed_server_side_session_id: builtins.str | None = ..., user_context: global___UserContext | None = ..., operation_id: builtins.str = ..., client_type: builtins.str | None = ..., @@ -2651,10 +2799,14 @@ class ReattachExecuteRequest(google.protobuf.message.Message): def HasField( self, field_name: typing_extensions.Literal[ + "_client_observed_server_side_session_id", + b"_client_observed_server_side_session_id", "_client_type", b"_client_type", "_last_response_id", b"_last_response_id", + "client_observed_server_side_session_id", + b"client_observed_server_side_session_id", "client_type", b"client_type", "last_response_id", @@ -2666,10 +2818,14 @@ class ReattachExecuteRequest(google.protobuf.message.Message): def ClearField( self, field_name: typing_extensions.Literal[ + "_client_observed_server_side_session_id", + b"_client_observed_server_side_session_id", "_client_type", b"_client_type", "_last_response_id", b"_last_response_id", + "client_observed_server_side_session_id", + b"client_observed_server_side_session_id", "client_type", b"client_type", "last_response_id", @@ -2683,6 +2839,13 @@ class ReattachExecuteRequest(google.protobuf.message.Message): ], ) -> None: ... @typing.overload + def WhichOneof( + self, + oneof_group: typing_extensions.Literal[ + "_client_observed_server_side_session_id", b"_client_observed_server_side_session_id" + ], + ) -> typing_extensions.Literal["client_observed_server_side_session_id"] | None: ... + @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal["_client_type", b"_client_type"] ) -> typing_extensions.Literal["client_type"] | None: ... @@ -2729,6 +2892,7 @@ class ReleaseExecuteRequest(google.protobuf.message.Message): ) -> None: ... SESSION_ID_FIELD_NUMBER: builtins.int + CLIENT_OBSERVED_SERVER_SIDE_SESSION_ID_FIELD_NUMBER: builtins.int USER_CONTEXT_FIELD_NUMBER: builtins.int OPERATION_ID_FIELD_NUMBER: builtins.int CLIENT_TYPE_FIELD_NUMBER: builtins.int @@ -2740,6 +2904,12 @@ class ReleaseExecuteRequest(google.protobuf.message.Message): The session_id of the request to reattach to. This must be an id of existing session. """ + client_observed_server_side_session_id: builtins.str + """(Optional) + + Server-side generated idempotency key from the previous responses (if any). Server + can use this to validate that the server side session has not changed. + """ @property def user_context(self) -> global___UserContext: """(Required) User context @@ -2765,6 +2935,7 @@ class ReleaseExecuteRequest(google.protobuf.message.Message): self, *, session_id: builtins.str = ..., + client_observed_server_side_session_id: builtins.str | None = ..., user_context: global___UserContext | None = ..., operation_id: builtins.str = ..., client_type: builtins.str | None = ..., @@ -2774,8 +2945,12 @@ class ReleaseExecuteRequest(google.protobuf.message.Message): def HasField( self, field_name: typing_extensions.Literal[ + "_client_observed_server_side_session_id", + b"_client_observed_server_side_session_id", "_client_type", b"_client_type", + "client_observed_server_side_session_id", + b"client_observed_server_side_session_id", "client_type", b"client_type", "release", @@ -2791,8 +2966,12 @@ class ReleaseExecuteRequest(google.protobuf.message.Message): def ClearField( self, field_name: typing_extensions.Literal[ + "_client_observed_server_side_session_id", + b"_client_observed_server_side_session_id", "_client_type", b"_client_type", + "client_observed_server_side_session_id", + b"client_observed_server_side_session_id", "client_type", b"client_type", "operation_id", @@ -2810,6 +2989,13 @@ class ReleaseExecuteRequest(google.protobuf.message.Message): ], ) -> None: ... @typing.overload + def WhichOneof( + self, + oneof_group: typing_extensions.Literal[ + "_client_observed_server_side_session_id", b"_client_observed_server_side_session_id" + ], + ) -> typing_extensions.Literal["client_observed_server_side_session_id"] | None: ... + @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal["_client_type", b"_client_type"] ) -> typing_extensions.Literal["client_type"] | None: ... @@ -2964,6 +3150,7 @@ class FetchErrorDetailsRequest(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor SESSION_ID_FIELD_NUMBER: builtins.int + CLIENT_OBSERVED_SERVER_SIDE_SESSION_ID_FIELD_NUMBER: builtins.int USER_CONTEXT_FIELD_NUMBER: builtins.int ERROR_ID_FIELD_NUMBER: builtins.int CLIENT_TYPE_FIELD_NUMBER: builtins.int @@ -2972,6 +3159,12 @@ class FetchErrorDetailsRequest(google.protobuf.message.Message): The session_id specifies a Spark session for a user identified by user_context.user_id. The id should be a UUID string of the format `00112233-4455-6677-8899-aabbccddeeff`. """ + client_observed_server_side_session_id: builtins.str + """(Optional) + + Server-side generated idempotency key from the previous responses (if any). Server + can use this to validate that the server side session has not changed. + """ @property def user_context(self) -> global___UserContext: """User context""" @@ -2988,6 +3181,7 @@ class FetchErrorDetailsRequest(google.protobuf.message.Message): self, *, session_id: builtins.str = ..., + client_observed_server_side_session_id: builtins.str | None = ..., user_context: global___UserContext | None = ..., error_id: builtins.str = ..., client_type: builtins.str | None = ..., @@ -2995,8 +3189,12 @@ class FetchErrorDetailsRequest(google.protobuf.message.Message): def HasField( self, field_name: typing_extensions.Literal[ + "_client_observed_server_side_session_id", + b"_client_observed_server_side_session_id", "_client_type", b"_client_type", + "client_observed_server_side_session_id", + b"client_observed_server_side_session_id", "client_type", b"client_type", "user_context", @@ -3006,8 +3204,12 @@ class FetchErrorDetailsRequest(google.protobuf.message.Message): def ClearField( self, field_name: typing_extensions.Literal[ + "_client_observed_server_side_session_id", + b"_client_observed_server_side_session_id", "_client_type", b"_client_type", + "client_observed_server_side_session_id", + b"client_observed_server_side_session_id", "client_type", b"client_type", "error_id", @@ -3018,6 +3220,14 @@ class FetchErrorDetailsRequest(google.protobuf.message.Message): b"user_context", ], ) -> None: ... + @typing.overload + def WhichOneof( + self, + oneof_group: typing_extensions.Literal[ + "_client_observed_server_side_session_id", b"_client_observed_server_side_session_id" + ], + ) -> typing_extensions.Literal["client_observed_server_side_session_id"] | None: ... + @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal["_client_type", b"_client_type"] ) -> typing_extensions.Literal["client_type"] | None: ... --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org