This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 51e8634a5883 [SPARK-47380][CONNECT] Ensure on the server side that the 
SparkSession is the same
51e8634a5883 is described below

commit 51e8634a5883f1816bb82c19b6e91c3516eee6c4
Author: Nemanja Boric <nemanja.bo...@databricks.com>
AuthorDate: Mon Mar 18 15:44:29 2024 -0400

    [SPARK-47380][CONNECT] Ensure on the server side that the SparkSession is 
the same
    
    ### What changes were proposed in this pull request?
    
    In this PR we change the client behaviour to send the previously observed 
server session id so that the server can validate that the client used to talk 
with this specific session. Previously this was only validated on the client 
side which made the server actually execute the request for the wrong session 
before throwing on the client side (once the response from the server was 
obtained).
    
    ### Why are the changes needed?
    The server can execute the client command on the wrong spark session before 
client figuring out it's the different session.
    
    ### Does this PR introduce _any_ user-facing change?
    The error message now pops up differently (it used to be a slightly 
different message when validated on the client).
    
    ### How was this patch tested?
    Existing unit tests, add new unit test, e2e test added, manual testing
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #45499 from nemanja-boric-databricks/workspace.
    
    Authored-by: Nemanja Boric <nemanja.bo...@databricks.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../src/main/resources/error/error-classes.json    |   5 +
 .../src/main/protobuf/spark/connect/base.proto     |  54 ++++
 .../sql/connect/client/ResponseValidator.scala     |  16 +-
 .../sql/connect/client/SparkConnectClient.scala    |  45 ++-
 .../service/SparkConnectAddArtifactsHandler.scala  |   7 +-
 .../service/SparkConnectAnalyzeHandler.scala       |   7 +-
 .../SparkConnectArtifactStatusesHandler.scala      |  18 +-
 .../service/SparkConnectConfigHandler.scala        |   9 +-
 .../service/SparkConnectExecutionManager.scala     |   9 +-
 .../SparkConnectFetchErrorDetailsHandler.scala     |   6 +-
 .../service/SparkConnectInterruptHandler.scala     |   6 +-
 .../SparkConnectReattachExecuteHandler.scala       |   8 +-
 .../SparkConnectReleaseExecuteHandler.scala        |   4 +-
 .../SparkConnectReleaseSessionHandler.scala        |   2 +
 .../sql/connect/service/SparkConnectService.scala  |   9 +-
 .../service/SparkConnectSessionManager.scala       |  29 +-
 .../execution/ReattachableExecuteSuite.scala       |   8 +-
 .../connect/planner/SparkConnectServiceSuite.scala |   8 +-
 .../service/ArtifactStatusesHandlerSuite.scala     |   1 +
 .../service/FetchErrorDetailsHandlerSuite.scala    |  12 +-
 .../service/SparkConnectServiceE2ESuite.scala      |  20 ++
 .../service/SparkConnectSessionManagerSuite.scala  |  38 ++-
 ...-error-conditions-invalid-handle-error-class.md |   4 +
 python/pyspark/sql/connect/client/core.py          |  12 +
 python/pyspark/sql/connect/proto/base_pb2.py       | 328 ++++++++++-----------
 python/pyspark/sql/connect/proto/base_pb2.pyi      | 210 +++++++++++++
 26 files changed, 655 insertions(+), 220 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 9362b8342abf..b5a0089fb2c8 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -2083,6 +2083,11 @@
           "Operation not found."
         ]
       },
+      "SESSION_CHANGED" : {
+        "message" : [
+          "The existing Spark server driver instance has restarted. Please 
reconnect."
+        ]
+      },
       "SESSION_CLOSED" : {
         "message" : [
           "Session was closed."
diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/base.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
index cb9dbe62c193..bcc7edc55550 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
@@ -66,6 +66,12 @@ message AnalyzePlanRequest {
   // The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
   string session_id = 1;
 
+  // (Optional)
+  //
+  // Server-side generated idempotency key from the previous responses (if 
any). Server
+  // can use this to validate that the server side session has not changed.
+  optional string client_observed_server_side_session_id = 17;
+
   // (Required) User context
   UserContext user_context = 2;
 
@@ -281,6 +287,12 @@ message ExecutePlanRequest {
   // The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
   string session_id = 1;
 
+  // (Optional)
+  //
+  // Server-side generated idempotency key from the previous responses (if 
any). Server
+  // can use this to validate that the server side session has not changed.
+  optional string client_observed_server_side_session_id = 8;
+
   // (Required) User context
   //
   // user_context.user_id and session+id both identify a unique remote spark 
session on the
@@ -443,6 +455,12 @@ message ConfigRequest {
   // The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
   string session_id = 1;
 
+  // (Optional)
+  //
+  // Server-side generated idempotency key from the previous responses (if 
any). Server
+  // can use this to validate that the server side session has not changed.
+  optional string client_observed_server_side_session_id = 8;
+
   // (Required) User context
   UserContext user_context = 2;
 
@@ -536,6 +554,12 @@ message AddArtifactsRequest {
   // User context
   UserContext user_context = 2;
 
+  // (Optional)
+  //
+  // Server-side generated idempotency key from the previous responses (if 
any). Server
+  // can use this to validate that the server side session has not changed.
+  optional string client_observed_server_side_session_id = 7;
+
   // Provides optional information about the client sending the request. This 
field
   // can be used for language or version specific information and is only 
intended for
   // logging purposes and will not be interpreted by the server.
@@ -630,6 +654,12 @@ message ArtifactStatusesRequest {
   // The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
   string session_id = 1;
 
+  // (Optional)
+  //
+  // Server-side generated idempotency key from the previous responses (if 
any). Server
+  // can use this to validate that the server side session has not changed.
+  optional string client_observed_server_side_session_id = 5;
+
   // User context
   UserContext user_context = 2;
 
@@ -673,6 +703,12 @@ message InterruptRequest {
   // The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
   string session_id = 1;
 
+  // (Optional)
+  //
+  // Server-side generated idempotency key from the previous responses (if 
any). Server
+  // can use this to validate that the server side session has not changed.
+  optional string client_observed_server_side_session_id = 7;
+
   // (Required) User context
   UserContext user_context = 2;
 
@@ -738,6 +774,12 @@ message ReattachExecuteRequest {
   // This must be an id of existing session.
   string session_id = 1;
 
+  // (Optional)
+  //
+  // Server-side generated idempotency key from the previous responses (if 
any). Server
+  // can use this to validate that the server side session has not changed.
+  optional string client_observed_server_side_session_id = 6;
+
   // (Required) User context
   //
   // user_context.user_id and session+id both identify a unique remote spark 
session on the
@@ -772,6 +814,12 @@ message ReleaseExecuteRequest {
   // This must be an id of existing session.
   string session_id = 1;
 
+  // (Optional)
+  //
+  // Server-side generated idempotency key from the previous responses (if 
any). Server
+  // can use this to validate that the server side session has not changed.
+  optional string client_observed_server_side_session_id = 7;
+
   // (Required) User context
   //
   // user_context.user_id and session+id both identify a unique remote spark 
session on the
@@ -856,6 +904,12 @@ message FetchErrorDetailsRequest {
   // The id should be a UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`.
   string session_id = 1;
 
+  // (Optional)
+  //
+  // Server-side generated idempotency key from the previous responses (if 
any). Server
+  // can use this to validate that the server side session has not changed.
+  optional string client_observed_server_side_session_id = 5;
+
   // User context
   UserContext user_context = 2;
 
diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ResponseValidator.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ResponseValidator.scala
index 22c5505e7d45..29272c96132b 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ResponseValidator.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ResponseValidator.scala
@@ -21,8 +21,8 @@ import io.grpc.stub.StreamObserver
 
 import org.apache.spark.internal.Logging
 
-// This is common logic to be shared between different stub instances to 
validate responses as
-// seen by the client.
+// This is common logic to be shared between different stub instances to keep 
the server-side
+// session id and to validate responses as seen by the client.
 class ResponseValidator extends Logging {
 
   // Server side session ID, used to detect if the server side session 
changed. This is set upon
@@ -30,6 +30,18 @@ class ResponseValidator extends Logging {
   // do not use server-side streaming.
   private var serverSideSessionId: Option[String] = None
 
+  // Returns the server side session ID, used to send it back to the server in 
the follow-up
+  // requests so the server can validate it session id against the previous 
requests.
+  def getServerSideSessionId: Option[String] = serverSideSessionId
+
+  /**
+   * Hijacks the stored server side session ID with the given suffix. Used for 
testing to make
+   * sure that server is validating the session ID.
+   */
+  private[sql] def hijackServerSideSessionIdForTesting(suffix: String): Unit = 
{
+    serverSideSessionId = Some(serverSideSessionId.getOrElse("") + suffix)
+  }
+
   def verifyResponse[RespT <: GeneratedMessageV3](fn: => RespT): RespT = {
     val response = fn
     val field = 
response.getDescriptorForType.findFieldByName("server_side_session_id")
diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index cd1dfbd2e734..746aaca6f559 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -63,6 +63,14 @@ private[sql] class SparkConnectClient(
   // a new client will create a new session ID.
   private[sql] val sessionId: String = 
configuration.sessionId.getOrElse(UUID.randomUUID.toString)
 
+  /**
+   * Hijacks the stored server side session ID with the given suffix. Used for 
testing to make
+   * sure that server is validating the session ID.
+   */
+  private[sql] def hijackServerSideSessionIdForTesting(suffix: String) = {
+    stubState.responseValidator.hijackServerSideSessionIdForTesting(suffix)
+  }
+
   private[sql] val artifactManager: ArtifactManager = {
     new ArtifactManager(configuration, sessionId, bstub, stub)
   }
@@ -73,6 +81,14 @@ private[sql] class SparkConnectClient(
   private[sql] def uploadAllClassFileArtifacts(): Unit =
     artifactManager.uploadAllClassFileArtifacts()
 
+  /**
+   * Returns the server-side session id obtained from the first request, if 
there was a request
+   * already.
+   */
+  private def serverSideSessionId: Option[String] = {
+    stubState.responseValidator.getServerSideSessionId
+  }
+
   /**
    * Dispatch the [[proto.AnalyzePlanRequest]] to the Spark Connect server.
    * @return
@@ -99,11 +115,11 @@ private[sql] class SparkConnectClient(
       .setSessionId(sessionId)
       .setClientType(userAgent)
       .addAllTags(tags.get.toSeq.asJava)
-      .build()
+    serverSideSessionId.foreach(session => 
request.setClientObservedServerSideSessionId(session))
     if (configuration.useReattachableExecute) {
-      bstub.executePlanReattachable(request)
+      bstub.executePlanReattachable(request.build())
     } else {
-      bstub.executePlan(request)
+      bstub.executePlan(request.build())
     }
   }
 
@@ -119,8 +135,8 @@ private[sql] class SparkConnectClient(
       .setSessionId(sessionId)
       .setClientType(userAgent)
       .setUserContext(userContext)
-      .build()
-    bstub.config(request)
+    serverSideSessionId.foreach(session => 
request.setClientObservedServerSideSessionId(session))
+    bstub.config(request.build())
   }
 
   /**
@@ -207,8 +223,8 @@ private[sql] class SparkConnectClient(
       .setUserContext(userContext)
       .setSessionId(sessionId)
       .setClientType(userAgent)
-      .build()
-    analyze(request)
+    serverSideSessionId.foreach(session => 
request.setClientObservedServerSideSessionId(session))
+    analyze(request.build())
   }
 
   private[sql] def interruptAll(): proto.InterruptResponse = {
@@ -218,8 +234,8 @@ private[sql] class SparkConnectClient(
       .setSessionId(sessionId)
       .setClientType(userAgent)
       
.setInterruptType(proto.InterruptRequest.InterruptType.INTERRUPT_TYPE_ALL)
-      .build()
-    bstub.interrupt(request)
+    serverSideSessionId.foreach(session => 
request.setClientObservedServerSideSessionId(session))
+    bstub.interrupt(request.build())
   }
 
   private[sql] def interruptTag(tag: String): proto.InterruptResponse = {
@@ -230,8 +246,8 @@ private[sql] class SparkConnectClient(
       .setClientType(userAgent)
       
.setInterruptType(proto.InterruptRequest.InterruptType.INTERRUPT_TYPE_TAG)
       .setOperationTag(tag)
-      .build()
-    bstub.interrupt(request)
+    serverSideSessionId.foreach(session => 
request.setClientObservedServerSideSessionId(session))
+    bstub.interrupt(request.build())
   }
 
   private[sql] def interruptOperation(id: String): proto.InterruptResponse = {
@@ -242,8 +258,8 @@ private[sql] class SparkConnectClient(
       .setClientType(userAgent)
       
.setInterruptType(proto.InterruptRequest.InterruptType.INTERRUPT_TYPE_OPERATION_ID)
       .setOperationId(id)
-      .build()
-    bstub.interrupt(request)
+    serverSideSessionId.foreach(session => 
request.setClientObservedServerSideSessionId(session))
+    bstub.interrupt(request.build())
   }
 
   private[sql] def releaseSession(): proto.ReleaseSessionResponse = {
@@ -252,8 +268,7 @@ private[sql] class SparkConnectClient(
       .setUserContext(userContext)
       .setSessionId(sessionId)
       .setClientType(userAgent)
-      .build()
-    bstub.releaseSession(request)
+    bstub.releaseSession(request.build())
   }
 
   private[this] val tags = new InheritableThreadLocal[mutable.Set[String]] {
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala
index ea3b578be3b0..b0d9337c6448 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala
@@ -53,9 +53,14 @@ class SparkConnectAddArtifactsHandler(val responseObserver: 
StreamObserver[AddAr
 
   override def onNext(req: AddArtifactsRequest): Unit = try {
     if (this.holder == null) {
+      val previousSessionId = req.hasClientObservedServerSideSessionId match {
+        case true => Some(req.getClientObservedServerSideSessionId)
+        case false => None
+      }
       this.holder = SparkConnectService.getOrCreateIsolatedSession(
         req.getUserContext.getUserId,
-        req.getSessionId)
+        req.getSessionId,
+        previousSessionId)
     }
 
     if (req.hasBeginChunk) {
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
index 7a701aea1b78..3dfd29d6a8c6 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
@@ -34,9 +34,14 @@ private[connect] class SparkConnectAnalyzeHandler(
     extends Logging {
 
   def handle(request: proto.AnalyzePlanRequest): Unit = {
+    val previousSessionId = request.hasClientObservedServerSideSessionId match 
{
+      case true => Some(request.getClientObservedServerSideSessionId)
+      case false => None
+    }
     val sessionHolder = SparkConnectService.getOrCreateIsolatedSession(
       request.getUserContext.getUserId,
-      request.getSessionId)
+      request.getSessionId,
+      previousSessionId)
     // `withSession` ensures that session-specific artifacts (such as JARs and 
class files) are
     // available during processing (such as deserialization).
     sessionHolder.withSession { _ =>
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectArtifactStatusesHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectArtifactStatusesHandler.scala
index 78def077f2dd..38bb29efc536 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectArtifactStatusesHandler.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectArtifactStatusesHandler.scala
@@ -28,17 +28,28 @@ class SparkConnectArtifactStatusesHandler(
     val responseObserver: StreamObserver[proto.ArtifactStatusesResponse])
     extends Logging {
 
-  protected def cacheExists(userId: String, sessionId: String, hash: String): 
Boolean = {
+  protected def cacheExists(
+      userId: String,
+      sessionId: String,
+      previouslySeenSessionId: Option[String],
+      hash: String): Boolean = {
     val session = SparkConnectService
-      .getOrCreateIsolatedSession(userId, sessionId)
+      .getOrCreateIsolatedSession(userId, sessionId, previouslySeenSessionId)
       .session
     val blockManager = session.sparkContext.env.blockManager
     blockManager.getStatus(CacheId(session.sessionUUID, hash)).isDefined
   }
 
   def handle(request: proto.ArtifactStatusesRequest): Unit = {
+    val previousSessionId = request.hasClientObservedServerSideSessionId match 
{
+      case true => Some(request.getClientObservedServerSideSessionId)
+      case false => None
+    }
     val holder = SparkConnectService
-      .getOrCreateIsolatedSession(request.getUserContext.getUserId, 
request.getSessionId)
+      .getOrCreateIsolatedSession(
+        request.getUserContext.getUserId,
+        request.getSessionId,
+        previousSessionId)
 
     val builder = proto.ArtifactStatusesResponse.newBuilder()
     builder.setSessionId(holder.sessionId)
@@ -49,6 +60,7 @@ class SparkConnectArtifactStatusesHandler(
         cacheExists(
           userId = request.getUserContext.getUserId,
           sessionId = request.getSessionId,
+          previouslySeenSessionId = previousSessionId,
           hash = name.stripPrefix("cache/"))
       } else false
       builder.putStatuses(name, status.setExists(exists).build())
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala
index 8a7ce6d7b48f..6d663867e958 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala
@@ -30,9 +30,16 @@ class SparkConnectConfigHandler(responseObserver: 
StreamObserver[proto.ConfigRes
     extends Logging {
 
   def handle(request: proto.ConfigRequest): Unit = {
+    val previousSessionId = request.hasClientObservedServerSideSessionId match 
{
+      case true => Some(request.getClientObservedServerSideSessionId)
+      case false => None
+    }
     val holder =
       SparkConnectService
-        .getOrCreateIsolatedSession(request.getUserContext.getUserId, 
request.getSessionId)
+        .getOrCreateIsolatedSession(
+          request.getUserContext.getUserId,
+          request.getSessionId,
+          previousSessionId)
     val session = holder.session
 
     val builder = request.getOperation.getOpTypeCase match {
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
index 58e235f15085..e52cfe64a090 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
@@ -67,8 +67,15 @@ private[connect] class SparkConnectExecutionManager() 
extends Logging {
    * Create a new ExecuteHolder and register it with this global manager and 
with its session.
    */
   private[connect] def createExecuteHolder(request: proto.ExecutePlanRequest): 
ExecuteHolder = {
+    val previousSessionId = request.hasClientObservedServerSideSessionId match 
{
+      case true => Some(request.getClientObservedServerSideSessionId)
+      case false => None
+    }
     val sessionHolder = SparkConnectService
-      .getOrCreateIsolatedSession(request.getUserContext.getUserId, 
request.getSessionId)
+      .getOrCreateIsolatedSession(
+        request.getUserContext.getUserId,
+        request.getSessionId,
+        previousSessionId)
     val executeHolder = new ExecuteHolder(request, sessionHolder)
     executionsLock.synchronized {
       // Check if the operation already exists, both in active executions, and 
in the graveyard
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectFetchErrorDetailsHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectFetchErrorDetailsHandler.scala
index b5a3c986d169..e461877d8454 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectFetchErrorDetailsHandler.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectFetchErrorDetailsHandler.scala
@@ -32,9 +32,13 @@ class SparkConnectFetchErrorDetailsHandler(
     responseObserver: StreamObserver[proto.FetchErrorDetailsResponse]) {
 
   def handle(v: proto.FetchErrorDetailsRequest): Unit = {
+    val previousSessionId = v.hasClientObservedServerSideSessionId match {
+      case true => Some(v.getClientObservedServerSideSessionId)
+      case false => None
+    }
     val sessionHolder =
       SparkConnectService
-        .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId)
+        .getOrCreateIsolatedSession(v.getUserContext.getUserId, 
v.getSessionId, previousSessionId)
 
     val response = 
Option(sessionHolder.errorIdToError.getIfPresent(v.getErrorId))
       .map { error =>
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterruptHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterruptHandler.scala
index 9e1ab16208f2..ae38e55d3c67 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterruptHandler.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterruptHandler.scala
@@ -28,9 +28,13 @@ class SparkConnectInterruptHandler(responseObserver: 
StreamObserver[proto.Interr
     extends Logging {
 
   def handle(v: proto.InterruptRequest): Unit = {
+    val previousSessionId = v.hasClientObservedServerSideSessionId match {
+      case true => Some(v.getClientObservedServerSideSessionId)
+      case false => None
+    }
     val sessionHolder =
       SparkConnectService
-        .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId)
+        .getOrCreateIsolatedSession(v.getUserContext.getUserId, 
v.getSessionId, previousSessionId)
 
     val interruptedIds = v.getInterruptType match {
       case proto.InterruptRequest.InterruptType.INTERRUPT_TYPE_ALL =>
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
index ecad8c9c73a1..534937f84eae 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
@@ -29,8 +29,14 @@ class SparkConnectReattachExecuteHandler(
     extends Logging {
 
   def handle(v: proto.ReattachExecuteRequest): Unit = {
+    val previousSessionId = v.hasClientObservedServerSideSessionId match {
+      case true => Some(v.getClientObservedServerSideSessionId)
+      case false => None
+    }
     val sessionHolder = SparkConnectService.sessionManager
-      .getIsolatedSession(SessionKey(v.getUserContext.getUserId, 
v.getSessionId))
+      .getIsolatedSession(
+        SessionKey(v.getUserContext.getUserId, v.getSessionId),
+        previousSessionId)
 
     val executeHolder = 
sessionHolder.executeHolder(v.getOperationId).getOrElse {
       if (SparkConnectService.executionManager
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseExecuteHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseExecuteHandler.scala
index 88c1456602d3..a2dbf3b2eec9 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseExecuteHandler.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseExecuteHandler.scala
@@ -28,8 +28,10 @@ class SparkConnectReleaseExecuteHandler(
     extends Logging {
 
   def handle(v: proto.ReleaseExecuteRequest): Unit = {
+    // We do not validate the spark session for ReleaseExecute on the server,
+    // leaving the validation only to happen on the client side.
     val sessionHolder = SparkConnectService.sessionManager
-      .getIsolatedSession(SessionKey(v.getUserContext.getUserId, 
v.getSessionId))
+      .getIsolatedSession(SessionKey(v.getUserContext.getUserId, 
v.getSessionId), None)
 
     val responseBuilder = proto.ReleaseExecuteResponse
       .newBuilder()
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseSessionHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseSessionHandler.scala
index c8a3ceab674f..ec7a7f3bd242 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseSessionHandler.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseSessionHandler.scala
@@ -33,8 +33,10 @@ class SparkConnectReleaseSessionHandler(
     // If the session doesn't exist, this will just be a noop.
     val key = SessionKey(v.getUserContext.getUserId, v.getSessionId)
     // if the session is present, update the server-side session ID.
+    // Note we do not validate the previously seen server-side session id.
     val maybeSession = 
SparkConnectService.sessionManager.getIsolatedSessionIfPresent(key)
     maybeSession.foreach(f => 
responseBuilder.setServerSideSessionId(f.serverSessionId))
+
     SparkConnectService.sessionManager.closeSession(key)
 
     responseObserver.onNext(responseBuilder.build())
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index e96e5dfcac08..9324e8e6c5f1 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -303,8 +303,13 @@ object SparkConnectService extends Logging {
   /**
    * Based on the userId and sessionId, find or create a new SparkSession.
    */
-  def getOrCreateIsolatedSession(userId: String, sessionId: String): 
SessionHolder = {
-    sessionManager.getOrCreateIsolatedSession(SessionKey(userId, sessionId))
+  def getOrCreateIsolatedSession(
+      userId: String,
+      sessionId: String,
+      previoslyObservedSessionId: Option[String]): SessionHolder = {
+    sessionManager.getOrCreateIsolatedSession(
+      SessionKey(userId, sessionId),
+      previoslyObservedSessionId)
   }
 
   /**
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala
index 4da728b95a33..f8febbccfa6f 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala
@@ -53,11 +53,24 @@ class SparkConnectSessionManager extends Logging {
   /** Executor for the periodic maintenance */
   private var scheduledExecutor: Option[ScheduledExecutorService] = None
 
+  private def validateSessionId(
+      key: SessionKey,
+      sessionUUID: String,
+      previouslyObservedSessionId: String) = {
+    if (sessionUUID != previouslyObservedSessionId) {
+      throw new SparkSQLException(
+        errorClass = "INVALID_HANDLE.SESSION_CHANGED",
+        messageParameters = Map("handle" -> key.sessionId))
+    }
+  }
+
   /**
    * Based on the userId and sessionId, find or create a new SparkSession.
    */
-  private[connect] def getOrCreateIsolatedSession(key: SessionKey): 
SessionHolder = {
-    getSession(
+  private[connect] def getOrCreateIsolatedSession(
+      key: SessionKey,
+      previouslyObservedSesssionId: Option[String]): SessionHolder = {
+    val holder = getSession(
       key,
       Some(() => {
         // Executed under sessionsState lock in getSession,  to guard against 
concurrent removal
@@ -67,13 +80,18 @@ class SparkConnectSessionManager extends Logging {
         holder.initializeSession()
         holder
       }))
+    previouslyObservedSesssionId.foreach(sessionId =>
+      validateSessionId(key, holder.session.sessionUUID, sessionId))
+    holder
   }
 
   /**
    * Based on the userId and sessionId, find an existing SparkSession or throw 
error.
    */
-  private[connect] def getIsolatedSession(key: SessionKey): SessionHolder = {
-    getSession(
+  private[connect] def getIsolatedSession(
+      key: SessionKey,
+      previouslyObservedSesssionId: Option[String]): SessionHolder = {
+    val holder = getSession(
       key,
       Some(() => {
         logDebug(s"Session not found: $key")
@@ -87,6 +105,9 @@ class SparkConnectSessionManager extends Logging {
             messageParameters = Map("handle" -> key.sessionId))
         }
       }))
+    previouslyObservedSesssionId.foreach(sessionId =>
+      validateSessionId(key, holder.session.sessionUUID, sessionId))
+    holder
   }
 
   /**
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
index 3e22dc5c3fad..25e6cc48a199 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
@@ -300,7 +300,9 @@ class ReattachableExecuteSuite extends 
SparkConnectServerTest {
   test("SPARK-46186 interrupt directly after query start") {
     // register a sleep udf in the session
     val serverSession =
-      SparkConnectService.getOrCreateIsolatedSession(defaultUserId, 
defaultSessionId).session
+      SparkConnectService
+        .getOrCreateIsolatedSession(defaultUserId, defaultSessionId, None)
+        .session
     serverSession.udf.register(
       "sleep",
       ((ms: Int) => {
@@ -384,7 +386,9 @@ class ReattachableExecuteSuite extends 
SparkConnectServerTest {
   test("long sleeping query") {
     // register udf directly on the server, we're not testing client UDFs 
here...
     val serverSession =
-      SparkConnectService.getOrCreateIsolatedSession(defaultUserId, 
defaultSessionId).session
+      SparkConnectService
+        .getOrCreateIsolatedSession(defaultUserId, defaultSessionId, None)
+        .session
     serverSession.udf.register("sleep", ((ms: Int) => { Thread.sleep(ms); ms 
}))
     // query will be sleeping and not returning results, while having multiple 
reattach
     withSparkEnvConfs(
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
index 18886fde7e0e..dafcaa9e0225 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
@@ -553,7 +553,7 @@ class SparkConnectServiceSuite
       val instance = new SparkConnectService(false)
 
       // Add an always crashing UDF
-      val session = SparkConnectService.getOrCreateIsolatedSession("c1", 
sessionId).session
+      val session = SparkConnectService.getOrCreateIsolatedSession("c1", 
sessionId, None).session
       val sleep: Long => Long = { time =>
         Thread.sleep(time)
         time
@@ -624,7 +624,7 @@ class SparkConnectServiceSuite
       val instance = new SparkConnectService(false)
 
       // Add an always crashing UDF
-      val session = SparkConnectService.getOrCreateIsolatedSession("c1", 
sessionId).session
+      val session = SparkConnectService.getOrCreateIsolatedSession("c1", 
sessionId, None).session
       val instaKill: Long => Long = { _ =>
         throw new Exception("Kaboom")
       }
@@ -818,7 +818,7 @@ class SparkConnectServiceSuite
             when(restartedQuery.id).thenReturn(DEFAULT_UUID)
             when(restartedQuery.runId).thenReturn(DEFAULT_UUID)
             
SparkConnectService.streamingSessionManager.registerNewStreamingQuery(
-              SparkConnectService.getOrCreateIsolatedSession("c1", sessionId),
+              SparkConnectService.getOrCreateIsolatedSession("c1", sessionId, 
None),
               restartedQuery)
             f(verifyEvents)
           }
@@ -904,7 +904,7 @@ class SparkConnectServiceSuite
         case e: SparkListenerConnectOperationStarted =>
           semaphoreStarted.release()
           val sessionHolder =
-            SparkConnectService.getOrCreateIsolatedSession(e.userId, 
e.sessionId)
+            SparkConnectService.getOrCreateIsolatedSession(e.userId, 
e.sessionId, None)
           executeHolder = sessionHolder.executeHolder(e.operationId)
         case _ =>
       }
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ArtifactStatusesHandlerSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ArtifactStatusesHandlerSuite.scala
index 8fabcf61cb6f..7ce3ff46f553 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ArtifactStatusesHandlerSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ArtifactStatusesHandlerSuite.scala
@@ -48,6 +48,7 @@ class ArtifactStatusesHandlerSuite extends SharedSparkSession 
with ResourceHelpe
       override protected def cacheExists(
           userId: String,
           sessionId: String,
+          previoslySeenSessionId: Option[String],
           hash: String): Boolean = {
         exist.contains(hash)
       }
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/FetchErrorDetailsHandlerSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/FetchErrorDetailsHandlerSuite.scala
index ebcd1de60057..33315682bd73 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/FetchErrorDetailsHandlerSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/FetchErrorDetailsHandlerSuite.scala
@@ -75,7 +75,7 @@ class FetchErrorDetailsHandlerSuite extends 
SharedSparkSession with ResourceHelp
       val errorId = UUID.randomUUID().toString()
 
       val sessionHolder = SparkConnectService
-        .getOrCreateIsolatedSession(userId, sessionId)
+        .getOrCreateIsolatedSession(userId, sessionId, None)
 
       sessionHolder.errorIdToError.put(errorId, testError)
 
@@ -125,7 +125,7 @@ class FetchErrorDetailsHandlerSuite extends 
SharedSparkSession with ResourceHelp
     val errorId = UUID.randomUUID().toString()
 
     SparkConnectService
-      .getOrCreateIsolatedSession(userId, sessionId)
+      .getOrCreateIsolatedSession(userId, sessionId, None)
       .errorIdToError
       .put(errorId, testError)
 
@@ -138,7 +138,7 @@ class FetchErrorDetailsHandlerSuite extends 
SharedSparkSession with ResourceHelp
 
     assert(
       SparkConnectService
-        .getOrCreateIsolatedSession(userId, sessionId)
+        .getOrCreateIsolatedSession(userId, sessionId, None)
         .errorIdToError
         .size() == 0)
   }
@@ -149,7 +149,7 @@ class FetchErrorDetailsHandlerSuite extends 
SharedSparkSession with ResourceHelp
       val errorId = UUID.randomUUID().toString()
 
       SparkConnectService
-        .getOrCreateIsolatedSession(userId, sessionId)
+        .getOrCreateIsolatedSession(userId, sessionId, None)
         .errorIdToError
         .put(errorId, testError)
 
@@ -175,7 +175,7 @@ class FetchErrorDetailsHandlerSuite extends 
SharedSparkSession with ResourceHelp
     val errorId = UUID.randomUUID().toString()
 
     SparkConnectService
-      .getOrCreateIsolatedSession(userId, sessionId)
+      .getOrCreateIsolatedSession(userId, sessionId, None)
       .errorIdToError
       .put(errorId, testError)
 
@@ -192,7 +192,7 @@ class FetchErrorDetailsHandlerSuite extends 
SharedSparkSession with ResourceHelp
     val errorId = UUID.randomUUID().toString()
 
     SparkConnectService
-      .getOrCreateIsolatedSession(userId, sessionId)
+      .getOrCreateIsolatedSession(userId, sessionId, None)
       .errorIdToError
       .put(errorId, testError)
 
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
index 7776148077fc..33560cd53f6b 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
@@ -202,4 +202,24 @@ class SparkConnectServiceE2ESuite extends 
SparkConnectServerTest {
       }
     }
   }
+
+  test("SessionValidation: server validates that the client is talking to the 
same session.") {
+    val sessionId = UUID.randomUUID.toString()
+    val userId = "Y"
+    withClient(sessionId = sessionId, userId = userId) { client =>
+      // this will create the session, and then ReleaseSession at the end of 
withClient.
+      val query = client.execute(buildPlan("SELECT 1"))
+      query.hasNext // trigger execution
+      // Same session id.
+      val new_query = client.execute(buildPlan("SELECT 1 + 1"))
+      new_query.hasNext // trigger execution
+      // Change the server session id in the client for testing and try to run 
something.
+      client.hijackServerSideSessionIdForTesting("-testing")
+      val queryError = intercept[SparkException] {
+        val newest_query = client.execute(buildPlan("SELECT 1 + 1 + 1"))
+        newest_query.hasNext
+      }
+      assert(queryError.getMessage.contains("INVALID_HANDLE.SESSION_CHANGED"))
+    }
+  }
 }
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManagerSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManagerSuite.scala
index fadbd9fa502e..42bb93de05e2 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManagerSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManagerSuite.scala
@@ -35,7 +35,7 @@ class SparkConnectSessionManagerSuite extends 
SharedSparkSession with BeforeAndA
   test("sessionId needs to be an UUID") {
     val key = SessionKey("user", "not an uuid")
     val exGetOrCreate = intercept[SparkSQLException] {
-      SparkConnectService.sessionManager.getOrCreateIsolatedSession(key)
+      SparkConnectService.sessionManager.getOrCreateIsolatedSession(key, None)
     }
     assert(exGetOrCreate.getErrorClass == "INVALID_HANDLE.FORMAT")
   }
@@ -44,33 +44,51 @@ class SparkConnectSessionManagerSuite extends 
SharedSparkSession with BeforeAndA
     "getOrCreateIsolatedSession/getIsolatedSession/getIsolatedSessionIfPresent 
" +
       "gets the existing session") {
     val key = SessionKey("user", UUID.randomUUID().toString)
-    val sessionHolder = 
SparkConnectService.sessionManager.getOrCreateIsolatedSession(key)
+    val sessionHolder = 
SparkConnectService.sessionManager.getOrCreateIsolatedSession(key, None)
 
     val sessionGetOrCreate =
-      SparkConnectService.sessionManager.getOrCreateIsolatedSession(key)
+      SparkConnectService.sessionManager.getOrCreateIsolatedSession(key, None)
     assert(sessionGetOrCreate === sessionHolder)
 
-    val sessionGet = SparkConnectService.sessionManager.getIsolatedSession(key)
+    val sessionGet = 
SparkConnectService.sessionManager.getIsolatedSession(key, None)
     assert(sessionGet === sessionHolder)
 
     val sessionGetIfPresent = 
SparkConnectService.sessionManager.getIsolatedSessionIfPresent(key)
     assert(sessionGetIfPresent.get === sessionHolder)
   }
 
+  test("client-observed session id validation works") {
+    val key = SessionKey("user", UUID.randomUUID().toString)
+    val sessionHolder = 
SparkConnectService.sessionManager.getOrCreateIsolatedSession(key, None)
+    // Works if the client doesn't set the observed session id.
+    SparkConnectService.sessionManager.getOrCreateIsolatedSession(key, None)
+    // Works with the correct existing session id.
+    SparkConnectService.sessionManager.getOrCreateIsolatedSession(
+      key,
+      Some(sessionHolder.session.sessionUUID))
+    // Fails with the different session id.
+    val exGet = intercept[SparkSQLException] {
+      SparkConnectService.sessionManager.getOrCreateIsolatedSession(
+        key,
+        Some(sessionHolder.session.sessionUUID + "invalid"))
+    }
+    assert(exGet.getErrorClass == "INVALID_HANDLE.SESSION_CHANGED")
+  }
+
   test(
     "getOrCreateIsolatedSession/getIsolatedSession/getIsolatedSessionIfPresent 
" +
       "doesn't recreate closed session") {
     val key = SessionKey("user", UUID.randomUUID().toString)
-    val sessionHolder = 
SparkConnectService.sessionManager.getOrCreateIsolatedSession(key)
+    val sessionHolder = 
SparkConnectService.sessionManager.getOrCreateIsolatedSession(key, None)
     SparkConnectService.sessionManager.closeSession(key)
 
     val exGetOrCreate = intercept[SparkSQLException] {
-      SparkConnectService.sessionManager.getOrCreateIsolatedSession(key)
+      SparkConnectService.sessionManager.getOrCreateIsolatedSession(key, None)
     }
     assert(exGetOrCreate.getErrorClass == "INVALID_HANDLE.SESSION_CLOSED")
 
     val exGet = intercept[SparkSQLException] {
-      SparkConnectService.sessionManager.getIsolatedSession(key)
+      SparkConnectService.sessionManager.getIsolatedSession(key, None)
     }
     assert(exGet.getErrorClass == "INVALID_HANDLE.SESSION_CLOSED")
 
@@ -82,7 +100,7 @@ class SparkConnectSessionManagerSuite extends 
SharedSparkSession with BeforeAndA
     val key = SessionKey("user", UUID.randomUUID().toString)
 
     val exGet = intercept[SparkSQLException] {
-      SparkConnectService.sessionManager.getIsolatedSession(key)
+      SparkConnectService.sessionManager.getIsolatedSession(key, None)
     }
     assert(exGet.getErrorClass == "INVALID_HANDLE.SESSION_NOT_FOUND")
 
@@ -92,7 +110,7 @@ class SparkConnectSessionManagerSuite extends 
SharedSparkSession with BeforeAndA
 
   test("SessionHolder with custom expiration time is not cleaned up due to 
inactivity") {
     val key = SessionKey("user", UUID.randomUUID().toString)
-    val sessionHolder = 
SparkConnectService.sessionManager.getOrCreateIsolatedSession(key)
+    val sessionHolder = 
SparkConnectService.sessionManager.getOrCreateIsolatedSession(key, None)
 
     assert(
       SparkConnectService.sessionManager.listActiveSessions.exists(
@@ -117,7 +135,7 @@ class SparkConnectSessionManagerSuite extends 
SharedSparkSession with BeforeAndA
 
   test("SessionHolder is recorded with status closed after close") {
     val key = SessionKey("user", UUID.randomUUID().toString)
-    val sessionHolder = 
SparkConnectService.sessionManager.getOrCreateIsolatedSession(key)
+    val sessionHolder = 
SparkConnectService.sessionManager.getOrCreateIsolatedSession(key, None)
 
     val activeSessionInfo = 
SparkConnectService.sessionManager.listActiveSessions.find(
       _.sessionId == sessionHolder.sessionId)
diff --git a/docs/sql-error-conditions-invalid-handle-error-class.md 
b/docs/sql-error-conditions-invalid-handle-error-class.md
index ef21dd6479bf..953a8e00865d 100644
--- a/docs/sql-error-conditions-invalid-handle-error-class.md
+++ b/docs/sql-error-conditions-invalid-handle-error-class.md
@@ -46,6 +46,10 @@ Operation already exists.
 
 Operation not found.
 
+## SESSION_CHANGED
+
+The existing Spark server driver instance has restarted. Please reconnect.
+
 ## SESSION_CLOSED
 
 Session was closed.
diff --git a/python/pyspark/sql/connect/client/core.py 
b/python/pyspark/sql/connect/client/core.py
index 2a1f25d83f47..3474d7078564 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -1054,6 +1054,8 @@ class SparkConnectClient(object):
             client_type=self._builder.userAgent,
             tags=list(self.get_tags()),
         )
+        if self._server_session_id is not None:
+            req.client_observed_server_side_session_id = 
self._server_session_id
         if self._user_id:
             req.user_context.user_id = self._user_id
         return req
@@ -1061,6 +1063,8 @@ class SparkConnectClient(object):
     def _analyze_plan_request_with_metadata(self) -> pb2.AnalyzePlanRequest:
         req = pb2.AnalyzePlanRequest()
         req.session_id = self._session_id
+        if self._server_session_id is not None:
+            req.client_observed_server_side_session_id = 
self._server_session_id
         req.client_type = self._builder.userAgent
         if self._user_id:
             req.user_context.user_id = self._user_id
@@ -1381,6 +1385,8 @@ class SparkConnectClient(object):
     def _config_request_with_metadata(self) -> pb2.ConfigRequest:
         req = pb2.ConfigRequest()
         req.session_id = self._session_id
+        if self._server_session_id is not None:
+            req.client_observed_server_side_session_id = 
self._server_session_id
         req.client_type = self._builder.userAgent
         if self._user_id:
             req.user_context.user_id = self._user_id
@@ -1416,6 +1422,8 @@ class SparkConnectClient(object):
         The result of the config call.
         """
         req = self._config_request_with_metadata()
+        if self._server_session_id is not None:
+            req.client_observed_server_side_session_id = 
self._server_session_id
         req.operation.CopyFrom(operation)
         try:
             for attempt in self._retrying():
@@ -1432,6 +1440,8 @@ class SparkConnectClient(object):
     ) -> pb2.InterruptRequest:
         req = pb2.InterruptRequest()
         req.session_id = self._session_id
+        if self._server_session_id is not None:
+            req.client_observed_server_side_session_id = 
self._server_session_id
         req.client_type = self._builder.userAgent
         if interrupt_type == "all":
             req.interrupt_type = 
pb2.InterruptRequest.InterruptType.INTERRUPT_TYPE_ALL
@@ -1591,6 +1601,8 @@ class SparkConnectClient(object):
             client_type=self._builder.userAgent,
             error_id=info.metadata["errorId"],
         )
+        if self._server_session_id is not None:
+            req.client_observed_server_side_session_id = 
self._server_session_id
         if self._user_id:
             req.user_context.user_id = self._user_id
 
diff --git a/python/pyspark/sql/connect/proto/base_pb2.py 
b/python/pyspark/sql/connect/proto/base_pb2.py
index 1941900ae69d..af42f9b73628 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.py
+++ b/python/pyspark/sql/connect/proto/base_pb2.py
@@ -37,7 +37,7 @@ from pyspark.sql.connect.proto import types_pb2 as 
spark_dot_connect_dot_types__
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01
 
\x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02
 
\x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17
 [...]
+    
b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01
 
\x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02
 
\x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17
 [...]
 )
 
 _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
@@ -58,167 +58,167 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _USERCONTEXT._serialized_start = 337
     _USERCONTEXT._serialized_end = 459
     _ANALYZEPLANREQUEST._serialized_start = 462
-    _ANALYZEPLANREQUEST._serialized_end = 2883
-    _ANALYZEPLANREQUEST_SCHEMA._serialized_start = 1657
-    _ANALYZEPLANREQUEST_SCHEMA._serialized_end = 1706
-    _ANALYZEPLANREQUEST_EXPLAIN._serialized_start = 1709
-    _ANALYZEPLANREQUEST_EXPLAIN._serialized_end = 2024
-    _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_start = 1852
-    _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_end = 2024
-    _ANALYZEPLANREQUEST_TREESTRING._serialized_start = 2026
-    _ANALYZEPLANREQUEST_TREESTRING._serialized_end = 2116
-    _ANALYZEPLANREQUEST_ISLOCAL._serialized_start = 2118
-    _ANALYZEPLANREQUEST_ISLOCAL._serialized_end = 2168
-    _ANALYZEPLANREQUEST_ISSTREAMING._serialized_start = 2170
-    _ANALYZEPLANREQUEST_ISSTREAMING._serialized_end = 2224
-    _ANALYZEPLANREQUEST_INPUTFILES._serialized_start = 2226
-    _ANALYZEPLANREQUEST_INPUTFILES._serialized_end = 2279
-    _ANALYZEPLANREQUEST_SPARKVERSION._serialized_start = 2281
-    _ANALYZEPLANREQUEST_SPARKVERSION._serialized_end = 2295
-    _ANALYZEPLANREQUEST_DDLPARSE._serialized_start = 2297
-    _ANALYZEPLANREQUEST_DDLPARSE._serialized_end = 2338
-    _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_start = 2340
-    _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_end = 2461
-    _ANALYZEPLANREQUEST_SEMANTICHASH._serialized_start = 2463
-    _ANALYZEPLANREQUEST_SEMANTICHASH._serialized_end = 2518
-    _ANALYZEPLANREQUEST_PERSIST._serialized_start = 2521
-    _ANALYZEPLANREQUEST_PERSIST._serialized_end = 2672
-    _ANALYZEPLANREQUEST_UNPERSIST._serialized_start = 2674
-    _ANALYZEPLANREQUEST_UNPERSIST._serialized_end = 2784
-    _ANALYZEPLANREQUEST_GETSTORAGELEVEL._serialized_start = 2786
-    _ANALYZEPLANREQUEST_GETSTORAGELEVEL._serialized_end = 2856
-    _ANALYZEPLANRESPONSE._serialized_start = 2886
-    _ANALYZEPLANRESPONSE._serialized_end = 4628
-    _ANALYZEPLANRESPONSE_SCHEMA._serialized_start = 4047
-    _ANALYZEPLANRESPONSE_SCHEMA._serialized_end = 4104
-    _ANALYZEPLANRESPONSE_EXPLAIN._serialized_start = 4106
-    _ANALYZEPLANRESPONSE_EXPLAIN._serialized_end = 4154
-    _ANALYZEPLANRESPONSE_TREESTRING._serialized_start = 4156
-    _ANALYZEPLANRESPONSE_TREESTRING._serialized_end = 4201
-    _ANALYZEPLANRESPONSE_ISLOCAL._serialized_start = 4203
-    _ANALYZEPLANRESPONSE_ISLOCAL._serialized_end = 4239
-    _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_start = 4241
-    _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_end = 4289
-    _ANALYZEPLANRESPONSE_INPUTFILES._serialized_start = 4291
-    _ANALYZEPLANRESPONSE_INPUTFILES._serialized_end = 4325
-    _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_start = 4327
-    _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_end = 4367
-    _ANALYZEPLANRESPONSE_DDLPARSE._serialized_start = 4369
-    _ANALYZEPLANRESPONSE_DDLPARSE._serialized_end = 4428
-    _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_start = 4430
-    _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_end = 4469
-    _ANALYZEPLANRESPONSE_SEMANTICHASH._serialized_start = 4471
-    _ANALYZEPLANRESPONSE_SEMANTICHASH._serialized_end = 4509
-    _ANALYZEPLANRESPONSE_PERSIST._serialized_start = 2521
-    _ANALYZEPLANRESPONSE_PERSIST._serialized_end = 2530
-    _ANALYZEPLANRESPONSE_UNPERSIST._serialized_start = 2674
-    _ANALYZEPLANRESPONSE_UNPERSIST._serialized_end = 2685
-    _ANALYZEPLANRESPONSE_GETSTORAGELEVEL._serialized_start = 4535
-    _ANALYZEPLANRESPONSE_GETSTORAGELEVEL._serialized_end = 4618
-    _EXECUTEPLANREQUEST._serialized_start = 4631
-    _EXECUTEPLANREQUEST._serialized_end = 5175
-    _EXECUTEPLANREQUEST_REQUESTOPTION._serialized_start = 4977
-    _EXECUTEPLANREQUEST_REQUESTOPTION._serialized_end = 5142
-    _EXECUTEPLANRESPONSE._serialized_start = 5178
-    _EXECUTEPLANRESPONSE._serialized_end = 7391
-    _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_start = 6527
-    _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_end = 6598
-    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 6600
-    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 6718
-    _EXECUTEPLANRESPONSE_METRICS._serialized_start = 6721
-    _EXECUTEPLANRESPONSE_METRICS._serialized_end = 7238
-    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 6816
-    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 7148
-    
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start
 = 7025
-    
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end 
= 7148
-    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 7150
-    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 7238
-    _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_start = 7240
-    _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_end = 7356
-    _EXECUTEPLANRESPONSE_RESULTCOMPLETE._serialized_start = 7358
-    _EXECUTEPLANRESPONSE_RESULTCOMPLETE._serialized_end = 7374
-    _KEYVALUE._serialized_start = 7393
-    _KEYVALUE._serialized_end = 7458
-    _CONFIGREQUEST._serialized_start = 7461
-    _CONFIGREQUEST._serialized_end = 8489
-    _CONFIGREQUEST_OPERATION._serialized_start = 7681
-    _CONFIGREQUEST_OPERATION._serialized_end = 8179
-    _CONFIGREQUEST_SET._serialized_start = 8181
-    _CONFIGREQUEST_SET._serialized_end = 8233
-    _CONFIGREQUEST_GET._serialized_start = 8235
-    _CONFIGREQUEST_GET._serialized_end = 8260
-    _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 8262
-    _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 8325
-    _CONFIGREQUEST_GETOPTION._serialized_start = 8327
-    _CONFIGREQUEST_GETOPTION._serialized_end = 8358
-    _CONFIGREQUEST_GETALL._serialized_start = 8360
-    _CONFIGREQUEST_GETALL._serialized_end = 8408
-    _CONFIGREQUEST_UNSET._serialized_start = 8410
-    _CONFIGREQUEST_UNSET._serialized_end = 8437
-    _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 8439
-    _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 8473
-    _CONFIGRESPONSE._serialized_start = 8492
-    _CONFIGRESPONSE._serialized_end = 8667
-    _ADDARTIFACTSREQUEST._serialized_start = 8670
-    _ADDARTIFACTSREQUEST._serialized_end = 9541
-    _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 9057
-    _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 9110
-    _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 9112
-    _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 9223
-    _ADDARTIFACTSREQUEST_BATCH._serialized_start = 9225
-    _ADDARTIFACTSREQUEST_BATCH._serialized_end = 9318
-    _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 9321
-    _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 9514
-    _ADDARTIFACTSRESPONSE._serialized_start = 9544
-    _ADDARTIFACTSRESPONSE._serialized_end = 9816
-    _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 9735
-    _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 9816
-    _ARTIFACTSTATUSESREQUEST._serialized_start = 9819
-    _ARTIFACTSTATUSESREQUEST._serialized_end = 10014
-    _ARTIFACTSTATUSESRESPONSE._serialized_start = 10017
-    _ARTIFACTSTATUSESRESPONSE._serialized_end = 10369
-    _ARTIFACTSTATUSESRESPONSE_STATUSESENTRY._serialized_start = 10212
-    _ARTIFACTSTATUSESRESPONSE_STATUSESENTRY._serialized_end = 10327
-    _ARTIFACTSTATUSESRESPONSE_ARTIFACTSTATUS._serialized_start = 10329
-    _ARTIFACTSTATUSESRESPONSE_ARTIFACTSTATUS._serialized_end = 10369
-    _INTERRUPTREQUEST._serialized_start = 10372
-    _INTERRUPTREQUEST._serialized_end = 10844
-    _INTERRUPTREQUEST_INTERRUPTTYPE._serialized_start = 10687
-    _INTERRUPTREQUEST_INTERRUPTTYPE._serialized_end = 10815
-    _INTERRUPTRESPONSE._serialized_start = 10847
-    _INTERRUPTRESPONSE._serialized_end = 10991
-    _REATTACHOPTIONS._serialized_start = 10993
-    _REATTACHOPTIONS._serialized_end = 11046
-    _REATTACHEXECUTEREQUEST._serialized_start = 11049
-    _REATTACHEXECUTEREQUEST._serialized_end = 11324
-    _RELEASEEXECUTEREQUEST._serialized_start = 11327
-    _RELEASEEXECUTEREQUEST._serialized_end = 11781
-    _RELEASEEXECUTEREQUEST_RELEASEALL._serialized_start = 11693
-    _RELEASEEXECUTEREQUEST_RELEASEALL._serialized_end = 11705
-    _RELEASEEXECUTEREQUEST_RELEASEUNTIL._serialized_start = 11707
-    _RELEASEEXECUTEREQUEST_RELEASEUNTIL._serialized_end = 11754
-    _RELEASEEXECUTERESPONSE._serialized_start = 11784
-    _RELEASEEXECUTERESPONSE._serialized_end = 11949
-    _RELEASESESSIONREQUEST._serialized_start = 11952
-    _RELEASESESSIONREQUEST._serialized_end = 12123
-    _RELEASESESSIONRESPONSE._serialized_start = 12125
-    _RELEASESESSIONRESPONSE._serialized_end = 12233
-    _FETCHERRORDETAILSREQUEST._serialized_start = 12236
-    _FETCHERRORDETAILSREQUEST._serialized_end = 12437
-    _FETCHERRORDETAILSRESPONSE._serialized_start = 12440
-    _FETCHERRORDETAILSRESPONSE._serialized_end = 13995
-    _FETCHERRORDETAILSRESPONSE_STACKTRACEELEMENT._serialized_start = 12669
-    _FETCHERRORDETAILSRESPONSE_STACKTRACEELEMENT._serialized_end = 12843
-    _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT._serialized_start = 12846
-    _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT._serialized_end = 13214
-    _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT_CONTEXTTYPE._serialized_start = 
13177
-    _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT_CONTEXTTYPE._serialized_end = 13214
-    _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE._serialized_start = 13217
-    _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE._serialized_end = 13626
-    
_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_start
 = 13528
-    
_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_end
 = 13596
-    _FETCHERRORDETAILSRESPONSE_ERROR._serialized_start = 13629
-    _FETCHERRORDETAILSRESPONSE_ERROR._serialized_end = 13976
-    _SPARKCONNECTSERVICE._serialized_start = 13998
-    _SPARKCONNECTSERVICE._serialized_end = 14944
+    _ANALYZEPLANREQUEST._serialized_end = 3014
+    _ANALYZEPLANREQUEST_SCHEMA._serialized_start = 1745
+    _ANALYZEPLANREQUEST_SCHEMA._serialized_end = 1794
+    _ANALYZEPLANREQUEST_EXPLAIN._serialized_start = 1797
+    _ANALYZEPLANREQUEST_EXPLAIN._serialized_end = 2112
+    _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_start = 1940
+    _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_end = 2112
+    _ANALYZEPLANREQUEST_TREESTRING._serialized_start = 2114
+    _ANALYZEPLANREQUEST_TREESTRING._serialized_end = 2204
+    _ANALYZEPLANREQUEST_ISLOCAL._serialized_start = 2206
+    _ANALYZEPLANREQUEST_ISLOCAL._serialized_end = 2256
+    _ANALYZEPLANREQUEST_ISSTREAMING._serialized_start = 2258
+    _ANALYZEPLANREQUEST_ISSTREAMING._serialized_end = 2312
+    _ANALYZEPLANREQUEST_INPUTFILES._serialized_start = 2314
+    _ANALYZEPLANREQUEST_INPUTFILES._serialized_end = 2367
+    _ANALYZEPLANREQUEST_SPARKVERSION._serialized_start = 2369
+    _ANALYZEPLANREQUEST_SPARKVERSION._serialized_end = 2383
+    _ANALYZEPLANREQUEST_DDLPARSE._serialized_start = 2385
+    _ANALYZEPLANREQUEST_DDLPARSE._serialized_end = 2426
+    _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_start = 2428
+    _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_end = 2549
+    _ANALYZEPLANREQUEST_SEMANTICHASH._serialized_start = 2551
+    _ANALYZEPLANREQUEST_SEMANTICHASH._serialized_end = 2606
+    _ANALYZEPLANREQUEST_PERSIST._serialized_start = 2609
+    _ANALYZEPLANREQUEST_PERSIST._serialized_end = 2760
+    _ANALYZEPLANREQUEST_UNPERSIST._serialized_start = 2762
+    _ANALYZEPLANREQUEST_UNPERSIST._serialized_end = 2872
+    _ANALYZEPLANREQUEST_GETSTORAGELEVEL._serialized_start = 2874
+    _ANALYZEPLANREQUEST_GETSTORAGELEVEL._serialized_end = 2944
+    _ANALYZEPLANRESPONSE._serialized_start = 3017
+    _ANALYZEPLANRESPONSE._serialized_end = 4759
+    _ANALYZEPLANRESPONSE_SCHEMA._serialized_start = 4178
+    _ANALYZEPLANRESPONSE_SCHEMA._serialized_end = 4235
+    _ANALYZEPLANRESPONSE_EXPLAIN._serialized_start = 4237
+    _ANALYZEPLANRESPONSE_EXPLAIN._serialized_end = 4285
+    _ANALYZEPLANRESPONSE_TREESTRING._serialized_start = 4287
+    _ANALYZEPLANRESPONSE_TREESTRING._serialized_end = 4332
+    _ANALYZEPLANRESPONSE_ISLOCAL._serialized_start = 4334
+    _ANALYZEPLANRESPONSE_ISLOCAL._serialized_end = 4370
+    _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_start = 4372
+    _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_end = 4420
+    _ANALYZEPLANRESPONSE_INPUTFILES._serialized_start = 4422
+    _ANALYZEPLANRESPONSE_INPUTFILES._serialized_end = 4456
+    _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_start = 4458
+    _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_end = 4498
+    _ANALYZEPLANRESPONSE_DDLPARSE._serialized_start = 4500
+    _ANALYZEPLANRESPONSE_DDLPARSE._serialized_end = 4559
+    _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_start = 4561
+    _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_end = 4600
+    _ANALYZEPLANRESPONSE_SEMANTICHASH._serialized_start = 4602
+    _ANALYZEPLANRESPONSE_SEMANTICHASH._serialized_end = 4640
+    _ANALYZEPLANRESPONSE_PERSIST._serialized_start = 2609
+    _ANALYZEPLANRESPONSE_PERSIST._serialized_end = 2618
+    _ANALYZEPLANRESPONSE_UNPERSIST._serialized_start = 2762
+    _ANALYZEPLANRESPONSE_UNPERSIST._serialized_end = 2773
+    _ANALYZEPLANRESPONSE_GETSTORAGELEVEL._serialized_start = 4666
+    _ANALYZEPLANRESPONSE_GETSTORAGELEVEL._serialized_end = 4749
+    _EXECUTEPLANREQUEST._serialized_start = 4762
+    _EXECUTEPLANREQUEST._serialized_end = 5437
+    _EXECUTEPLANREQUEST_REQUESTOPTION._serialized_start = 5196
+    _EXECUTEPLANREQUEST_REQUESTOPTION._serialized_end = 5361
+    _EXECUTEPLANRESPONSE._serialized_start = 5440
+    _EXECUTEPLANRESPONSE._serialized_end = 7653
+    _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_start = 6789
+    _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_end = 6860
+    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 6862
+    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 6980
+    _EXECUTEPLANRESPONSE_METRICS._serialized_start = 6983
+    _EXECUTEPLANRESPONSE_METRICS._serialized_end = 7500
+    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 7078
+    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 7410
+    
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start
 = 7287
+    
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end 
= 7410
+    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 7412
+    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 7500
+    _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_start = 7502
+    _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_end = 7618
+    _EXECUTEPLANRESPONSE_RESULTCOMPLETE._serialized_start = 7620
+    _EXECUTEPLANRESPONSE_RESULTCOMPLETE._serialized_end = 7636
+    _KEYVALUE._serialized_start = 7655
+    _KEYVALUE._serialized_end = 7720
+    _CONFIGREQUEST._serialized_start = 7723
+    _CONFIGREQUEST._serialized_end = 8882
+    _CONFIGREQUEST_OPERATION._serialized_start = 8031
+    _CONFIGREQUEST_OPERATION._serialized_end = 8529
+    _CONFIGREQUEST_SET._serialized_start = 8531
+    _CONFIGREQUEST_SET._serialized_end = 8583
+    _CONFIGREQUEST_GET._serialized_start = 8585
+    _CONFIGREQUEST_GET._serialized_end = 8610
+    _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 8612
+    _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 8675
+    _CONFIGREQUEST_GETOPTION._serialized_start = 8677
+    _CONFIGREQUEST_GETOPTION._serialized_end = 8708
+    _CONFIGREQUEST_GETALL._serialized_start = 8710
+    _CONFIGREQUEST_GETALL._serialized_end = 8758
+    _CONFIGREQUEST_UNSET._serialized_start = 8760
+    _CONFIGREQUEST_UNSET._serialized_end = 8787
+    _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 8789
+    _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 8823
+    _CONFIGRESPONSE._serialized_start = 8885
+    _CONFIGRESPONSE._serialized_end = 9060
+    _ADDARTIFACTSREQUEST._serialized_start = 9063
+    _ADDARTIFACTSREQUEST._serialized_end = 10065
+    _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 9538
+    _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 9591
+    _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 9593
+    _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 9704
+    _ADDARTIFACTSREQUEST_BATCH._serialized_start = 9706
+    _ADDARTIFACTSREQUEST_BATCH._serialized_end = 9799
+    _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 9802
+    _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 9995
+    _ADDARTIFACTSRESPONSE._serialized_start = 10068
+    _ADDARTIFACTSRESPONSE._serialized_end = 10340
+    _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 10259
+    _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 10340
+    _ARTIFACTSTATUSESREQUEST._serialized_start = 10343
+    _ARTIFACTSTATUSESREQUEST._serialized_end = 10669
+    _ARTIFACTSTATUSESRESPONSE._serialized_start = 10672
+    _ARTIFACTSTATUSESRESPONSE._serialized_end = 11024
+    _ARTIFACTSTATUSESRESPONSE_STATUSESENTRY._serialized_start = 10867
+    _ARTIFACTSTATUSESRESPONSE_STATUSESENTRY._serialized_end = 10982
+    _ARTIFACTSTATUSESRESPONSE_ARTIFACTSTATUS._serialized_start = 10984
+    _ARTIFACTSTATUSESRESPONSE_ARTIFACTSTATUS._serialized_end = 11024
+    _INTERRUPTREQUEST._serialized_start = 11027
+    _INTERRUPTREQUEST._serialized_end = 11630
+    _INTERRUPTREQUEST_INTERRUPTTYPE._serialized_start = 11430
+    _INTERRUPTREQUEST_INTERRUPTTYPE._serialized_end = 11558
+    _INTERRUPTRESPONSE._serialized_start = 11633
+    _INTERRUPTRESPONSE._serialized_end = 11777
+    _REATTACHOPTIONS._serialized_start = 11779
+    _REATTACHOPTIONS._serialized_end = 11832
+    _REATTACHEXECUTEREQUEST._serialized_start = 11835
+    _REATTACHEXECUTEREQUEST._serialized_end = 12241
+    _RELEASEEXECUTEREQUEST._serialized_start = 12244
+    _RELEASEEXECUTEREQUEST._serialized_end = 12829
+    _RELEASEEXECUTEREQUEST_RELEASEALL._serialized_start = 12698
+    _RELEASEEXECUTEREQUEST_RELEASEALL._serialized_end = 12710
+    _RELEASEEXECUTEREQUEST_RELEASEUNTIL._serialized_start = 12712
+    _RELEASEEXECUTEREQUEST_RELEASEUNTIL._serialized_end = 12759
+    _RELEASEEXECUTERESPONSE._serialized_start = 12832
+    _RELEASEEXECUTERESPONSE._serialized_end = 12997
+    _RELEASESESSIONREQUEST._serialized_start = 13000
+    _RELEASESESSIONREQUEST._serialized_end = 13171
+    _RELEASESESSIONRESPONSE._serialized_start = 13173
+    _RELEASESESSIONRESPONSE._serialized_end = 13281
+    _FETCHERRORDETAILSREQUEST._serialized_start = 13284
+    _FETCHERRORDETAILSREQUEST._serialized_end = 13616
+    _FETCHERRORDETAILSRESPONSE._serialized_start = 13619
+    _FETCHERRORDETAILSRESPONSE._serialized_end = 15174
+    _FETCHERRORDETAILSRESPONSE_STACKTRACEELEMENT._serialized_start = 13848
+    _FETCHERRORDETAILSRESPONSE_STACKTRACEELEMENT._serialized_end = 14022
+    _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT._serialized_start = 14025
+    _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT._serialized_end = 14393
+    _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT_CONTEXTTYPE._serialized_start = 
14356
+    _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT_CONTEXTTYPE._serialized_end = 14393
+    _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE._serialized_start = 14396
+    _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE._serialized_end = 14805
+    
_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_start
 = 14707
+    
_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_end
 = 14775
+    _FETCHERRORDETAILSRESPONSE_ERROR._serialized_start = 14808
+    _FETCHERRORDETAILSRESPONSE_ERROR._serialized_end = 15155
+    _SPARKCONNECTSERVICE._serialized_start = 15177
+    _SPARKCONNECTSERVICE._serialized_end = 16123
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi 
b/python/pyspark/sql/connect/proto/base_pb2.pyi
index 5ed2d207aca5..86dec5711ece 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/base_pb2.pyi
@@ -478,6 +478,7 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
         ) -> None: ...
 
     SESSION_ID_FIELD_NUMBER: builtins.int
+    CLIENT_OBSERVED_SERVER_SIDE_SESSION_ID_FIELD_NUMBER: builtins.int
     USER_CONTEXT_FIELD_NUMBER: builtins.int
     CLIENT_TYPE_FIELD_NUMBER: builtins.int
     SCHEMA_FIELD_NUMBER: builtins.int
@@ -501,6 +502,12 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
     collate streaming responses from different queries within the dedicated 
session.
     The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
     """
+    client_observed_server_side_session_id: builtins.str
+    """(Optional)
+
+    Server-side generated idempotency key from the previous responses (if 
any). Server
+    can use this to validate that the server side session has not changed.
+    """
     @property
     def user_context(self) -> global___UserContext:
         """(Required) User context"""
@@ -539,6 +546,7 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
         self,
         *,
         session_id: builtins.str = ...,
+        client_observed_server_side_session_id: builtins.str | None = ...,
         user_context: global___UserContext | None = ...,
         client_type: builtins.str | None = ...,
         schema: global___AnalyzePlanRequest.Schema | None = ...,
@@ -558,10 +566,14 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
     def HasField(
         self,
         field_name: typing_extensions.Literal[
+            "_client_observed_server_side_session_id",
+            b"_client_observed_server_side_session_id",
             "_client_type",
             b"_client_type",
             "analyze",
             b"analyze",
+            "client_observed_server_side_session_id",
+            b"client_observed_server_side_session_id",
             "client_type",
             b"client_type",
             "ddl_parse",
@@ -597,10 +609,14 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
     def ClearField(
         self,
         field_name: typing_extensions.Literal[
+            "_client_observed_server_side_session_id",
+            b"_client_observed_server_side_session_id",
             "_client_type",
             b"_client_type",
             "analyze",
             b"analyze",
+            "client_observed_server_side_session_id",
+            b"client_observed_server_side_session_id",
             "client_type",
             b"client_type",
             "ddl_parse",
@@ -636,6 +652,13 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
         ],
     ) -> None: ...
     @typing.overload
+    def WhichOneof(
+        self,
+        oneof_group: typing_extensions.Literal[
+            "_client_observed_server_side_session_id", 
b"_client_observed_server_side_session_id"
+        ],
+    ) -> typing_extensions.Literal["client_observed_server_side_session_id"] | 
None: ...
+    @typing.overload
     def WhichOneof(
         self, oneof_group: typing_extensions.Literal["_client_type", 
b"_client_type"]
     ) -> typing_extensions.Literal["client_type"] | None: ...
@@ -1060,6 +1083,7 @@ class ExecutePlanRequest(google.protobuf.message.Message):
         ) -> typing_extensions.Literal["reattach_options", "extension"] | 
None: ...
 
     SESSION_ID_FIELD_NUMBER: builtins.int
+    CLIENT_OBSERVED_SERVER_SIDE_SESSION_ID_FIELD_NUMBER: builtins.int
     USER_CONTEXT_FIELD_NUMBER: builtins.int
     OPERATION_ID_FIELD_NUMBER: builtins.int
     PLAN_FIELD_NUMBER: builtins.int
@@ -1074,6 +1098,12 @@ class 
ExecutePlanRequest(google.protobuf.message.Message):
     collate streaming responses from different queries within the dedicated 
session.
     The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
     """
+    client_observed_server_side_session_id: builtins.str
+    """(Optional)
+
+    Server-side generated idempotency key from the previous responses (if 
any). Server
+    can use this to validate that the server side session has not changed.
+    """
     @property
     def user_context(self) -> global___UserContext:
         """(Required) User context
@@ -1116,6 +1146,7 @@ class ExecutePlanRequest(google.protobuf.message.Message):
         self,
         *,
         session_id: builtins.str = ...,
+        client_observed_server_side_session_id: builtins.str | None = ...,
         user_context: global___UserContext | None = ...,
         operation_id: builtins.str | None = ...,
         plan: global___Plan | None = ...,
@@ -1127,10 +1158,14 @@ class 
ExecutePlanRequest(google.protobuf.message.Message):
     def HasField(
         self,
         field_name: typing_extensions.Literal[
+            "_client_observed_server_side_session_id",
+            b"_client_observed_server_side_session_id",
             "_client_type",
             b"_client_type",
             "_operation_id",
             b"_operation_id",
+            "client_observed_server_side_session_id",
+            b"client_observed_server_side_session_id",
             "client_type",
             b"client_type",
             "operation_id",
@@ -1144,10 +1179,14 @@ class 
ExecutePlanRequest(google.protobuf.message.Message):
     def ClearField(
         self,
         field_name: typing_extensions.Literal[
+            "_client_observed_server_side_session_id",
+            b"_client_observed_server_side_session_id",
             "_client_type",
             b"_client_type",
             "_operation_id",
             b"_operation_id",
+            "client_observed_server_side_session_id",
+            b"client_observed_server_side_session_id",
             "client_type",
             b"client_type",
             "operation_id",
@@ -1165,6 +1204,13 @@ class 
ExecutePlanRequest(google.protobuf.message.Message):
         ],
     ) -> None: ...
     @typing.overload
+    def WhichOneof(
+        self,
+        oneof_group: typing_extensions.Literal[
+            "_client_observed_server_side_session_id", 
b"_client_observed_server_side_session_id"
+        ],
+    ) -> typing_extensions.Literal["client_observed_server_side_session_id"] | 
None: ...
+    @typing.overload
     def WhichOneof(
         self, oneof_group: typing_extensions.Literal["_client_type", 
b"_client_type"]
     ) -> typing_extensions.Literal["client_type"] | None: ...
@@ -1836,6 +1882,7 @@ class ConfigRequest(google.protobuf.message.Message):
         def ClearField(self, field_name: typing_extensions.Literal["keys", 
b"keys"]) -> None: ...
 
     SESSION_ID_FIELD_NUMBER: builtins.int
+    CLIENT_OBSERVED_SERVER_SIDE_SESSION_ID_FIELD_NUMBER: builtins.int
     USER_CONTEXT_FIELD_NUMBER: builtins.int
     OPERATION_FIELD_NUMBER: builtins.int
     CLIENT_TYPE_FIELD_NUMBER: builtins.int
@@ -1847,6 +1894,12 @@ class ConfigRequest(google.protobuf.message.Message):
     collate streaming responses from different queries within the dedicated 
session.
     The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
     """
+    client_observed_server_side_session_id: builtins.str
+    """(Optional)
+
+    Server-side generated idempotency key from the previous responses (if 
any). Server
+    can use this to validate that the server side session has not changed.
+    """
     @property
     def user_context(self) -> global___UserContext:
         """(Required) User context"""
@@ -1862,6 +1915,7 @@ class ConfigRequest(google.protobuf.message.Message):
         self,
         *,
         session_id: builtins.str = ...,
+        client_observed_server_side_session_id: builtins.str | None = ...,
         user_context: global___UserContext | None = ...,
         operation: global___ConfigRequest.Operation | None = ...,
         client_type: builtins.str | None = ...,
@@ -1869,8 +1923,12 @@ class ConfigRequest(google.protobuf.message.Message):
     def HasField(
         self,
         field_name: typing_extensions.Literal[
+            "_client_observed_server_side_session_id",
+            b"_client_observed_server_side_session_id",
             "_client_type",
             b"_client_type",
+            "client_observed_server_side_session_id",
+            b"client_observed_server_side_session_id",
             "client_type",
             b"client_type",
             "operation",
@@ -1882,8 +1940,12 @@ class ConfigRequest(google.protobuf.message.Message):
     def ClearField(
         self,
         field_name: typing_extensions.Literal[
+            "_client_observed_server_side_session_id",
+            b"_client_observed_server_side_session_id",
             "_client_type",
             b"_client_type",
+            "client_observed_server_side_session_id",
+            b"client_observed_server_side_session_id",
             "client_type",
             b"client_type",
             "operation",
@@ -1894,6 +1956,14 @@ class ConfigRequest(google.protobuf.message.Message):
             b"user_context",
         ],
     ) -> None: ...
+    @typing.overload
+    def WhichOneof(
+        self,
+        oneof_group: typing_extensions.Literal[
+            "_client_observed_server_side_session_id", 
b"_client_observed_server_side_session_id"
+        ],
+    ) -> typing_extensions.Literal["client_observed_server_side_session_id"] | 
None: ...
+    @typing.overload
     def WhichOneof(
         self, oneof_group: typing_extensions.Literal["_client_type", 
b"_client_type"]
     ) -> typing_extensions.Literal["client_type"] | None: ...
@@ -2090,6 +2160,7 @@ class 
AddArtifactsRequest(google.protobuf.message.Message):
 
     SESSION_ID_FIELD_NUMBER: builtins.int
     USER_CONTEXT_FIELD_NUMBER: builtins.int
+    CLIENT_OBSERVED_SERVER_SIDE_SESSION_ID_FIELD_NUMBER: builtins.int
     CLIENT_TYPE_FIELD_NUMBER: builtins.int
     BATCH_FIELD_NUMBER: builtins.int
     BEGIN_CHUNK_FIELD_NUMBER: builtins.int
@@ -2105,6 +2176,12 @@ class 
AddArtifactsRequest(google.protobuf.message.Message):
     @property
     def user_context(self) -> global___UserContext:
         """User context"""
+    client_observed_server_side_session_id: builtins.str
+    """(Optional)
+
+    Server-side generated idempotency key from the previous responses (if 
any). Server
+    can use this to validate that the server side session has not changed.
+    """
     client_type: builtins.str
     """Provides optional information about the client sending the request. 
This field
     can be used for language or version specific information and is only 
intended for
@@ -2128,6 +2205,7 @@ class 
AddArtifactsRequest(google.protobuf.message.Message):
         *,
         session_id: builtins.str = ...,
         user_context: global___UserContext | None = ...,
+        client_observed_server_side_session_id: builtins.str | None = ...,
         client_type: builtins.str | None = ...,
         batch: global___AddArtifactsRequest.Batch | None = ...,
         begin_chunk: global___AddArtifactsRequest.BeginChunkedArtifact | None 
= ...,
@@ -2136,6 +2214,8 @@ class 
AddArtifactsRequest(google.protobuf.message.Message):
     def HasField(
         self,
         field_name: typing_extensions.Literal[
+            "_client_observed_server_side_session_id",
+            b"_client_observed_server_side_session_id",
             "_client_type",
             b"_client_type",
             "batch",
@@ -2144,6 +2224,8 @@ class 
AddArtifactsRequest(google.protobuf.message.Message):
             b"begin_chunk",
             "chunk",
             b"chunk",
+            "client_observed_server_side_session_id",
+            b"client_observed_server_side_session_id",
             "client_type",
             b"client_type",
             "payload",
@@ -2155,6 +2237,8 @@ class 
AddArtifactsRequest(google.protobuf.message.Message):
     def ClearField(
         self,
         field_name: typing_extensions.Literal[
+            "_client_observed_server_side_session_id",
+            b"_client_observed_server_side_session_id",
             "_client_type",
             b"_client_type",
             "batch",
@@ -2163,6 +2247,8 @@ class 
AddArtifactsRequest(google.protobuf.message.Message):
             b"begin_chunk",
             "chunk",
             b"chunk",
+            "client_observed_server_side_session_id",
+            b"client_observed_server_side_session_id",
             "client_type",
             b"client_type",
             "payload",
@@ -2174,6 +2260,13 @@ class 
AddArtifactsRequest(google.protobuf.message.Message):
         ],
     ) -> None: ...
     @typing.overload
+    def WhichOneof(
+        self,
+        oneof_group: typing_extensions.Literal[
+            "_client_observed_server_side_session_id", 
b"_client_observed_server_side_session_id"
+        ],
+    ) -> typing_extensions.Literal["client_observed_server_side_session_id"] | 
None: ...
+    @typing.overload
     def WhichOneof(
         self, oneof_group: typing_extensions.Literal["_client_type", 
b"_client_type"]
     ) -> typing_extensions.Literal["client_type"] | None: ...
@@ -2262,6 +2355,7 @@ class 
ArtifactStatusesRequest(google.protobuf.message.Message):
     DESCRIPTOR: google.protobuf.descriptor.Descriptor
 
     SESSION_ID_FIELD_NUMBER: builtins.int
+    CLIENT_OBSERVED_SERVER_SIDE_SESSION_ID_FIELD_NUMBER: builtins.int
     USER_CONTEXT_FIELD_NUMBER: builtins.int
     CLIENT_TYPE_FIELD_NUMBER: builtins.int
     NAMES_FIELD_NUMBER: builtins.int
@@ -2273,6 +2367,12 @@ class 
ArtifactStatusesRequest(google.protobuf.message.Message):
     collate streaming responses from different queries within the dedicated 
session.
     The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
     """
+    client_observed_server_side_session_id: builtins.str
+    """(Optional)
+
+    Server-side generated idempotency key from the previous responses (if 
any). Server
+    can use this to validate that the server side session has not changed.
+    """
     @property
     def user_context(self) -> global___UserContext:
         """User context"""
@@ -2296,6 +2396,7 @@ class 
ArtifactStatusesRequest(google.protobuf.message.Message):
         self,
         *,
         session_id: builtins.str = ...,
+        client_observed_server_side_session_id: builtins.str | None = ...,
         user_context: global___UserContext | None = ...,
         client_type: builtins.str | None = ...,
         names: collections.abc.Iterable[builtins.str] | None = ...,
@@ -2303,8 +2404,12 @@ class 
ArtifactStatusesRequest(google.protobuf.message.Message):
     def HasField(
         self,
         field_name: typing_extensions.Literal[
+            "_client_observed_server_side_session_id",
+            b"_client_observed_server_side_session_id",
             "_client_type",
             b"_client_type",
+            "client_observed_server_side_session_id",
+            b"client_observed_server_side_session_id",
             "client_type",
             b"client_type",
             "user_context",
@@ -2314,8 +2419,12 @@ class 
ArtifactStatusesRequest(google.protobuf.message.Message):
     def ClearField(
         self,
         field_name: typing_extensions.Literal[
+            "_client_observed_server_side_session_id",
+            b"_client_observed_server_side_session_id",
             "_client_type",
             b"_client_type",
+            "client_observed_server_side_session_id",
+            b"client_observed_server_side_session_id",
             "client_type",
             b"client_type",
             "names",
@@ -2326,6 +2435,14 @@ class 
ArtifactStatusesRequest(google.protobuf.message.Message):
             b"user_context",
         ],
     ) -> None: ...
+    @typing.overload
+    def WhichOneof(
+        self,
+        oneof_group: typing_extensions.Literal[
+            "_client_observed_server_side_session_id", 
b"_client_observed_server_side_session_id"
+        ],
+    ) -> typing_extensions.Literal["client_observed_server_side_session_id"] | 
None: ...
+    @typing.overload
     def WhichOneof(
         self, oneof_group: typing_extensions.Literal["_client_type", 
b"_client_type"]
     ) -> typing_extensions.Literal["client_type"] | None: ...
@@ -2447,6 +2564,7 @@ class InterruptRequest(google.protobuf.message.Message):
     """Interrupt the running execution within the session with the provided 
operation_id."""
 
     SESSION_ID_FIELD_NUMBER: builtins.int
+    CLIENT_OBSERVED_SERVER_SIDE_SESSION_ID_FIELD_NUMBER: builtins.int
     USER_CONTEXT_FIELD_NUMBER: builtins.int
     CLIENT_TYPE_FIELD_NUMBER: builtins.int
     INTERRUPT_TYPE_FIELD_NUMBER: builtins.int
@@ -2460,6 +2578,12 @@ class InterruptRequest(google.protobuf.message.Message):
     collate streaming responses from different queries within the dedicated 
session.
     The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
     """
+    client_observed_server_side_session_id: builtins.str
+    """(Optional)
+
+    Server-side generated idempotency key from the previous responses (if 
any). Server
+    can use this to validate that the server side session has not changed.
+    """
     @property
     def user_context(self) -> global___UserContext:
         """(Required) User context"""
@@ -2478,6 +2602,7 @@ class InterruptRequest(google.protobuf.message.Message):
         self,
         *,
         session_id: builtins.str = ...,
+        client_observed_server_side_session_id: builtins.str | None = ...,
         user_context: global___UserContext | None = ...,
         client_type: builtins.str | None = ...,
         interrupt_type: global___InterruptRequest.InterruptType.ValueType = 
...,
@@ -2487,8 +2612,12 @@ class InterruptRequest(google.protobuf.message.Message):
     def HasField(
         self,
         field_name: typing_extensions.Literal[
+            "_client_observed_server_side_session_id",
+            b"_client_observed_server_side_session_id",
             "_client_type",
             b"_client_type",
+            "client_observed_server_side_session_id",
+            b"client_observed_server_side_session_id",
             "client_type",
             b"client_type",
             "interrupt",
@@ -2504,8 +2633,12 @@ class InterruptRequest(google.protobuf.message.Message):
     def ClearField(
         self,
         field_name: typing_extensions.Literal[
+            "_client_observed_server_side_session_id",
+            b"_client_observed_server_side_session_id",
             "_client_type",
             b"_client_type",
+            "client_observed_server_side_session_id",
+            b"client_observed_server_side_session_id",
             "client_type",
             b"client_type",
             "interrupt",
@@ -2523,6 +2656,13 @@ class InterruptRequest(google.protobuf.message.Message):
         ],
     ) -> None: ...
     @typing.overload
+    def WhichOneof(
+        self,
+        oneof_group: typing_extensions.Literal[
+            "_client_observed_server_side_session_id", 
b"_client_observed_server_side_session_id"
+        ],
+    ) -> typing_extensions.Literal["client_observed_server_side_session_id"] | 
None: ...
+    @typing.overload
     def WhichOneof(
         self, oneof_group: typing_extensions.Literal["_client_type", 
b"_client_type"]
     ) -> typing_extensions.Literal["client_type"] | None: ...
@@ -2602,6 +2742,7 @@ class 
ReattachExecuteRequest(google.protobuf.message.Message):
     DESCRIPTOR: google.protobuf.descriptor.Descriptor
 
     SESSION_ID_FIELD_NUMBER: builtins.int
+    CLIENT_OBSERVED_SERVER_SIDE_SESSION_ID_FIELD_NUMBER: builtins.int
     USER_CONTEXT_FIELD_NUMBER: builtins.int
     OPERATION_ID_FIELD_NUMBER: builtins.int
     CLIENT_TYPE_FIELD_NUMBER: builtins.int
@@ -2612,6 +2753,12 @@ class 
ReattachExecuteRequest(google.protobuf.message.Message):
     The session_id of the request to reattach to.
     This must be an id of existing session.
     """
+    client_observed_server_side_session_id: builtins.str
+    """(Optional)
+
+    Server-side generated idempotency key from the previous responses (if 
any). Server
+    can use this to validate that the server side session has not changed.
+    """
     @property
     def user_context(self) -> global___UserContext:
         """(Required) User context
@@ -2643,6 +2790,7 @@ class 
ReattachExecuteRequest(google.protobuf.message.Message):
         self,
         *,
         session_id: builtins.str = ...,
+        client_observed_server_side_session_id: builtins.str | None = ...,
         user_context: global___UserContext | None = ...,
         operation_id: builtins.str = ...,
         client_type: builtins.str | None = ...,
@@ -2651,10 +2799,14 @@ class 
ReattachExecuteRequest(google.protobuf.message.Message):
     def HasField(
         self,
         field_name: typing_extensions.Literal[
+            "_client_observed_server_side_session_id",
+            b"_client_observed_server_side_session_id",
             "_client_type",
             b"_client_type",
             "_last_response_id",
             b"_last_response_id",
+            "client_observed_server_side_session_id",
+            b"client_observed_server_side_session_id",
             "client_type",
             b"client_type",
             "last_response_id",
@@ -2666,10 +2818,14 @@ class 
ReattachExecuteRequest(google.protobuf.message.Message):
     def ClearField(
         self,
         field_name: typing_extensions.Literal[
+            "_client_observed_server_side_session_id",
+            b"_client_observed_server_side_session_id",
             "_client_type",
             b"_client_type",
             "_last_response_id",
             b"_last_response_id",
+            "client_observed_server_side_session_id",
+            b"client_observed_server_side_session_id",
             "client_type",
             b"client_type",
             "last_response_id",
@@ -2683,6 +2839,13 @@ class 
ReattachExecuteRequest(google.protobuf.message.Message):
         ],
     ) -> None: ...
     @typing.overload
+    def WhichOneof(
+        self,
+        oneof_group: typing_extensions.Literal[
+            "_client_observed_server_side_session_id", 
b"_client_observed_server_side_session_id"
+        ],
+    ) -> typing_extensions.Literal["client_observed_server_side_session_id"] | 
None: ...
+    @typing.overload
     def WhichOneof(
         self, oneof_group: typing_extensions.Literal["_client_type", 
b"_client_type"]
     ) -> typing_extensions.Literal["client_type"] | None: ...
@@ -2729,6 +2892,7 @@ class 
ReleaseExecuteRequest(google.protobuf.message.Message):
         ) -> None: ...
 
     SESSION_ID_FIELD_NUMBER: builtins.int
+    CLIENT_OBSERVED_SERVER_SIDE_SESSION_ID_FIELD_NUMBER: builtins.int
     USER_CONTEXT_FIELD_NUMBER: builtins.int
     OPERATION_ID_FIELD_NUMBER: builtins.int
     CLIENT_TYPE_FIELD_NUMBER: builtins.int
@@ -2740,6 +2904,12 @@ class 
ReleaseExecuteRequest(google.protobuf.message.Message):
     The session_id of the request to reattach to.
     This must be an id of existing session.
     """
+    client_observed_server_side_session_id: builtins.str
+    """(Optional)
+
+    Server-side generated idempotency key from the previous responses (if 
any). Server
+    can use this to validate that the server side session has not changed.
+    """
     @property
     def user_context(self) -> global___UserContext:
         """(Required) User context
@@ -2765,6 +2935,7 @@ class 
ReleaseExecuteRequest(google.protobuf.message.Message):
         self,
         *,
         session_id: builtins.str = ...,
+        client_observed_server_side_session_id: builtins.str | None = ...,
         user_context: global___UserContext | None = ...,
         operation_id: builtins.str = ...,
         client_type: builtins.str | None = ...,
@@ -2774,8 +2945,12 @@ class 
ReleaseExecuteRequest(google.protobuf.message.Message):
     def HasField(
         self,
         field_name: typing_extensions.Literal[
+            "_client_observed_server_side_session_id",
+            b"_client_observed_server_side_session_id",
             "_client_type",
             b"_client_type",
+            "client_observed_server_side_session_id",
+            b"client_observed_server_side_session_id",
             "client_type",
             b"client_type",
             "release",
@@ -2791,8 +2966,12 @@ class 
ReleaseExecuteRequest(google.protobuf.message.Message):
     def ClearField(
         self,
         field_name: typing_extensions.Literal[
+            "_client_observed_server_side_session_id",
+            b"_client_observed_server_side_session_id",
             "_client_type",
             b"_client_type",
+            "client_observed_server_side_session_id",
+            b"client_observed_server_side_session_id",
             "client_type",
             b"client_type",
             "operation_id",
@@ -2810,6 +2989,13 @@ class 
ReleaseExecuteRequest(google.protobuf.message.Message):
         ],
     ) -> None: ...
     @typing.overload
+    def WhichOneof(
+        self,
+        oneof_group: typing_extensions.Literal[
+            "_client_observed_server_side_session_id", 
b"_client_observed_server_side_session_id"
+        ],
+    ) -> typing_extensions.Literal["client_observed_server_side_session_id"] | 
None: ...
+    @typing.overload
     def WhichOneof(
         self, oneof_group: typing_extensions.Literal["_client_type", 
b"_client_type"]
     ) -> typing_extensions.Literal["client_type"] | None: ...
@@ -2964,6 +3150,7 @@ class 
FetchErrorDetailsRequest(google.protobuf.message.Message):
     DESCRIPTOR: google.protobuf.descriptor.Descriptor
 
     SESSION_ID_FIELD_NUMBER: builtins.int
+    CLIENT_OBSERVED_SERVER_SIDE_SESSION_ID_FIELD_NUMBER: builtins.int
     USER_CONTEXT_FIELD_NUMBER: builtins.int
     ERROR_ID_FIELD_NUMBER: builtins.int
     CLIENT_TYPE_FIELD_NUMBER: builtins.int
@@ -2972,6 +3159,12 @@ class 
FetchErrorDetailsRequest(google.protobuf.message.Message):
     The session_id specifies a Spark session for a user identified by 
user_context.user_id.
     The id should be a UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`.
     """
+    client_observed_server_side_session_id: builtins.str
+    """(Optional)
+
+    Server-side generated idempotency key from the previous responses (if 
any). Server
+    can use this to validate that the server side session has not changed.
+    """
     @property
     def user_context(self) -> global___UserContext:
         """User context"""
@@ -2988,6 +3181,7 @@ class 
FetchErrorDetailsRequest(google.protobuf.message.Message):
         self,
         *,
         session_id: builtins.str = ...,
+        client_observed_server_side_session_id: builtins.str | None = ...,
         user_context: global___UserContext | None = ...,
         error_id: builtins.str = ...,
         client_type: builtins.str | None = ...,
@@ -2995,8 +3189,12 @@ class 
FetchErrorDetailsRequest(google.protobuf.message.Message):
     def HasField(
         self,
         field_name: typing_extensions.Literal[
+            "_client_observed_server_side_session_id",
+            b"_client_observed_server_side_session_id",
             "_client_type",
             b"_client_type",
+            "client_observed_server_side_session_id",
+            b"client_observed_server_side_session_id",
             "client_type",
             b"client_type",
             "user_context",
@@ -3006,8 +3204,12 @@ class 
FetchErrorDetailsRequest(google.protobuf.message.Message):
     def ClearField(
         self,
         field_name: typing_extensions.Literal[
+            "_client_observed_server_side_session_id",
+            b"_client_observed_server_side_session_id",
             "_client_type",
             b"_client_type",
+            "client_observed_server_side_session_id",
+            b"client_observed_server_side_session_id",
             "client_type",
             b"client_type",
             "error_id",
@@ -3018,6 +3220,14 @@ class 
FetchErrorDetailsRequest(google.protobuf.message.Message):
             b"user_context",
         ],
     ) -> None: ...
+    @typing.overload
+    def WhichOneof(
+        self,
+        oneof_group: typing_extensions.Literal[
+            "_client_observed_server_side_session_id", 
b"_client_observed_server_side_session_id"
+        ],
+    ) -> typing_extensions.Literal["client_observed_server_side_session_id"] | 
None: ...
+    @typing.overload
     def WhichOneof(
         self, oneof_group: typing_extensions.Literal["_client_type", 
b"_client_type"]
     ) -> typing_extensions.Literal["client_type"] | None: ...


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to