This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fac11c494706 [SPARK-55291][CONNECT] Pre-process metadata headers at
client interceptor construction time
fac11c494706 is described below
commit fac11c494706f351a51143cc9b7edc0ca0f28bcc
Author: Yihong He <[email protected]>
AuthorDate: Thu Feb 5 00:01:31 2026 +0800
[SPARK-55291][CONNECT] Pre-process metadata headers at client interceptor
construction time
### What changes were proposed in this pull request?
Refactored MetadataHeaderClientInterceptor to pre-process metadata headers
(key creation and Base64 decoding) at construction time instead of on every RPC
call.
### Why are the changes needed?
- Avoids redundant processing on every RPC
- Catches invalid Base64-encoded binary headers earlier at construction time
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`build/sbt "connect-client-jvm/testOnly *SparkConnectClientSuite -- -z
SPARK-55243"`
### Was this patch authored or co-authored using generative AI tooling?
Yes
Closes #54074 from heyihong/SPARK-55291.
Authored-by: Yihong He <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../connect/client/SparkConnectClientSuite.scala | 6 ++---
.../sql/connect/client/SparkConnectClient.scala | 29 +++++++++++++++-------
2 files changed, 22 insertions(+), 13 deletions(-)
diff --git
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
index 1f0e7b89ddc7..1cfc2d3eb09f 100644
---
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
+++
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
@@ -693,11 +693,9 @@ class SparkConnectClientSuite extends ConnectFunSuite {
assert(new String(bytes, UTF_8) == binaryData)
}
- // Non base64-encoded -bin header throws IllegalArgumentException.
- client = buildClientWithHeader(keyName, binaryData)
-
+ // Non base64-encoded -bin header throws IllegalArgumentException at
construction time.
assertThrows[IllegalArgumentException] {
- client.execute(plan)
+ buildClientWithHeader(keyName, binaryData)
}
// Non -bin headers keep using the ASCII marshaller.
diff --git
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index cac43c2cb67c..0fa7d9ada48b 100644
---
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -1094,7 +1094,23 @@ object SparkConnectClient {
*/
private[client] class MetadataHeaderClientInterceptor(metadata: Map[String,
String])
extends ClientInterceptor {
- metadata.foreach { case (key, value) => assert(key != null && value !=
null) }
+
+ // Sealed trait for pre-processed metadata entries
+ private sealed trait MetadataEntry
+ private case class AsciiEntry(key: Metadata.Key[String], value: String)
extends MetadataEntry
+ private case class BinaryEntry(key: Metadata.Key[Array[Byte]], value:
Array[Byte])
+ extends MetadataEntry
+
+ // Pre-process metadata at construction time
+ private val entries: Seq[MetadataEntry] = metadata.map { case (key, value)
=>
+ assert(key != null && value != null)
+ if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
+ val valueByteArray = Base64.getDecoder.decode(value.getBytes(UTF_8))
+ BinaryEntry(Metadata.Key.of(key, Metadata.BINARY_BYTE_MARSHALLER),
valueByteArray)
+ } else {
+ AsciiEntry(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER),
value)
+ }
+ }.toSeq
override def interceptCall[ReqT, RespT](
method: MethodDescriptor[ReqT, RespT],
@@ -1105,14 +1121,9 @@ object SparkConnectClient {
override def start(
responseListener: ClientCall.Listener[RespT],
headers: Metadata): Unit = {
- metadata.foreach { case (key, value) =>
- if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
- // Expects a base64-encoded value string.
- val valueByteArray =
Base64.getDecoder.decode(value.getBytes(UTF_8))
- headers.put(Metadata.Key.of(key,
Metadata.BINARY_BYTE_MARSHALLER), valueByteArray)
- } else {
- headers.put(Metadata.Key.of(key,
Metadata.ASCII_STRING_MARSHALLER), value)
- }
+ entries.foreach {
+ case AsciiEntry(key, value) => headers.put(key, value)
+ case BinaryEntry(key, value) => headers.put(key, value)
}
super.start(responseListener, headers)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]