This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7c741c8c25fc [SPARK-46265][CONNECT] Assertions in AddArtifact RPC make
the connect client incompatible with older clusters
7c741c8c25fc is described below
commit 7c741c8c25fc4fe3d7d5fa5d490bb9b08debd952
Author: Niranjan Jayakar <[email protected]>
AuthorDate: Tue Dec 5 12:43:48 2023 -0400
[SPARK-46265][CONNECT] Assertions in AddArtifact RPC make the connect
client incompatible with older clusters
### What changes were proposed in this pull request?
A previous commit - d9c5f9d6 - updated the response of the
`AddArtifact` RPC to return the session id. Further, it also added
an assertion to the client of this RPC asserting that the session id
returned by the server matches the session id that was requested.
However, we will have the case that a connect client with this new
assertion included may connect to a Spark cluster that does not yet
have this change, making the change backwards incompatible.
Loosen the assertion to allow empty session id in the RPC's response.
### Why are the changes needed?
Newer connect clients can connect with older Spark clusters.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manually started the a connect cluster using `spark-connect-shell`
and using the connect client shell via `spark-connect-scala-client`.
Changed the service to not return session id and verified the fix.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44181 from nija-at/artifact-sessionid.
Authored-by: Niranjan Jayakar <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
---
.../org/apache/spark/sql/connect/client/ArtifactManager.scala | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
index 7a6eb963cb33..3cd35803d1ec 100644
---
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
+++
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
@@ -125,7 +125,9 @@ class ArtifactManager(
.addAllNames(Arrays.asList(artifactName))
.build()
val response = bstub.artifactStatus(request)
- if (response.getSessionId != sessionId) {
+ if (StringUtils.isNotEmpty(response.getSessionId) && response.getSessionId
!= sessionId) {
+ // In older versions of the Spark cluster, the session ID is not set in
the response.
+ // Ignore this check to keep compatibility.
throw new IllegalStateException(
s"Session ID mismatch: $sessionId != ${response.getSessionId}")
}
@@ -185,7 +187,9 @@ class ArtifactManager(
val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
private val summaries = mutable.Buffer.empty[ArtifactSummary]
override def onNext(v: AddArtifactsResponse): Unit = {
- if (v.getSessionId != sessionId) {
+ if (StringUtils.isNotEmpty(v.getSessionId) && v.getSessionId !=
sessionId) {
+ // In older versions of the Spark cluster, the session ID is not set
in the response.
+ // Ignore this check to keep compatibility.
throw new IllegalStateException(s"Session ID mismatch: $sessionId !=
${v.getSessionId}")
}
v.getArtifactsList.forEach { summary =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]