This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 61b85aeabe5 [SPARK-42816][CONNECT] Support Max Message size up to 128MB
61b85aeabe5 is described below
commit 61b85aeabe50453043b0b206d954b3018b134f6a
Author: Martin Grund <[email protected]>
AuthorDate: Tue Mar 21 18:12:57 2023 -0700
[SPARK-42816][CONNECT] Support Max Message size up to 128MB
### What changes were proposed in this pull request?
This change lifts the default message size of 4MB to 128MB and makes it
configurable. While 128MB is a "random number" it supports creating DataFrames
from reasonably sized local data without failing.
### Why are the changes needed?
Usability
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual
Closes #40447 from grundprinzip/SPARK-42816.
Lead-authored-by: Martin Grund <[email protected]>
Co-authored-by: Martin Grund <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit da19e0de05b4fbccae2c21385e67256ff31b1f1a)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/sql/connect/client/SparkConnectClient.scala | 1 +
.../spark/sql/connect/common/config/ConnectCommon.scala | 1 +
.../scala/org/apache/spark/sql/connect/config/Connect.scala | 8 ++++++++
.../spark/sql/connect/service/SparkConnectService.scala | 3 ++-
docs/configuration.md | 8 ++++++++
python/pyspark/sql/connect/client.py | 12 +++++++++++-
python/pyspark/sql/tests/connect/test_connect_basic.py | 9 +++++++++
7 files changed, 40 insertions(+), 2 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index 60d6d202ff5..a298c526883 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -406,6 +406,7 @@ private[sql] object SparkConnectClient {
if (metadata.nonEmpty) {
channelBuilder.intercept(new MetadataHeaderClientInterceptor(metadata))
}
+
channelBuilder.maxInboundMessageSize(ConnectCommon.CONNECT_GRPC_MAX_MESSAGE_SIZE)
new SparkConnectClient(
userContextBuilder.build(),
channelBuilder,
diff --git
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/config/ConnectCommon.scala
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/config/ConnectCommon.scala
index 48ae4d2d77f..3f594d79b62 100644
---
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/config/ConnectCommon.scala
+++
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/config/ConnectCommon.scala
@@ -18,4 +18,5 @@ package org.apache.spark.sql.connect.common.config
private[connect] object ConnectCommon {
val CONNECT_GRPC_BINDING_PORT: Int = 15002
+ val CONNECT_GRPC_MAX_MESSAGE_SIZE: Int = 128 * 1024 * 1024;
}
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index 64b5bd5d813..19fdad97b5f 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -47,6 +47,14 @@ object Connect {
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("4m")
+ val CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE =
+ ConfigBuilder("spark.connect.grpc.maxInboundMessageSize")
+ .doc("Sets the maximum inbound message in bytes size for the gRPC
requests." +
+ "Requests with a larger payload will fail.")
+ .version("3.4.0")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefault(ConnectCommon.CONNECT_GRPC_MAX_MESSAGE_SIZE)
+
val CONNECT_EXTENSIONS_RELATION_CLASSES =
ConfigBuilder("spark.connect.extensions.relation.classes")
.doc("""
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index cd353b6ff60..a9442a8c92c 100755
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -42,7 +42,7 @@ import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{AddArtifactsRequest,
AddArtifactsResponse}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_BINDING_PORT
+import org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_BINDING_PORT,
CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE}
/**
* The SparkConnectService implementation.
@@ -276,6 +276,7 @@ object SparkConnectService {
val port = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT)
val sb = NettyServerBuilder
.forPort(port)
+
.maxInboundMessageSize(SparkEnv.get.conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE).toInt)
.addService(new SparkConnectService(debugMode))
// Add all registered interceptors to the server builder.
diff --git a/docs/configuration.md b/docs/configuration.md
index 0ca53acbeb6..4133fd5d441 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -3171,6 +3171,14 @@ They are typically set via the config file and
command-lineoptions with `--conf/
<td>When using Apache Arrow, limit the maximum size of one arrow batch that
can be sent from server side to client side. Currently, we conservatively use
70% of it because the size is not accurate but estimated.</td>
<td>3.4.0</td>
</tr>
+<tr>
+ <td><code>spark.connect.grpc.maxInboundMessageSize</code></td>
+ <td>
+ 134217728
+ </td>
+ <td>Sets the maximum inbound message size for the gRPC requests. Requests
with a larger payload will fail.</td>
+ <td>3.4.0</td>
+</tr>
<tr>
<td><code>spark.connect.extensions.relation.classes</code></td>
<td>
diff --git a/python/pyspark/sql/connect/client.py
b/python/pyspark/sql/connect/client.py
index 53fa97372a7..4db852c951e 100644
--- a/python/pyspark/sql/connect/client.py
+++ b/python/pyspark/sql/connect/client.py
@@ -123,6 +123,7 @@ class ChannelBuilder:
PARAM_TOKEN = "token"
PARAM_USER_ID = "user_id"
PARAM_USER_AGENT = "user_agent"
+ MAX_MESSAGE_LENGTH = 128 * 1024 * 1024
@staticmethod
def default_port() -> int:
@@ -177,7 +178,16 @@ class ChannelBuilder:
f"Path component for connection URI must be empty:
{self.url.path}"
)
self._extract_attributes()
- self._channel_options = channelOptions
+
+ GRPC_DEFAULT_OPTIONS = [
+ ("grpc.max_send_message_length",
ChannelBuilder.MAX_MESSAGE_LENGTH),
+ ("grpc.max_receive_message_length",
ChannelBuilder.MAX_MESSAGE_LENGTH),
+ ]
+
+ if channelOptions is None:
+ self._channel_options = GRPC_DEFAULT_OPTIONS
+ else:
+ self._channel_options = GRPC_DEFAULT_OPTIONS + channelOptions
def _extract_attributes(self) -> None:
if len(self.url.params) > 0:
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 491865ad9c9..0a47dc193c9 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -2934,6 +2934,15 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
self.assertEqual(cdf2.schema, sdf2.schema)
self.assertEqual(cdf2.collect(), sdf2.collect())
+ def test_large_client_data(self):
+ # SPARK-42816 support more than 4MB message size.
+ # ~200bytes
+ cols = ["abcdefghijklmnoprstuvwxyz" for x in range(10)]
+ # 100k rows => 20MB
+ row_count = 100 * 1000
+ rows = [cols] * row_count
+ self.assertEqual(row_count,
self.connect.createDataFrame(data=rows).count())
+
def test_unsupported_jvm_attribute(self):
# Unsupported jvm attributes for Spark session.
unsupported_attrs = ["_jsc", "_jconf", "_jvm", "_jsparkSession"]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]