This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 61b85aeabe5 [SPARK-42816][CONNECT] Support Max Message size up to 128MB 61b85aeabe5 is described below commit 61b85aeabe50453043b0b206d954b3018b134f6a Author: Martin Grund <martin.gr...@databricks.com> AuthorDate: Tue Mar 21 18:12:57 2023 -0700 [SPARK-42816][CONNECT] Support Max Message size up to 128MB ### What changes were proposed in this pull request? This change lifts the default message size of 4MB to 128MB and makes it configurable. While 128MB is a "random number" it supports creating DataFrames from reasonably sized local data without failing. ### Why are the changes needed? Usability ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manual Closes #40447 from grundprinzip/SPARK-42816. Lead-authored-by: Martin Grund <martin.gr...@databricks.com> Co-authored-by: Martin Grund <grundprin...@gmail.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit da19e0de05b4fbccae2c21385e67256ff31b1f1a) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../apache/spark/sql/connect/client/SparkConnectClient.scala | 1 + .../spark/sql/connect/common/config/ConnectCommon.scala | 1 + .../scala/org/apache/spark/sql/connect/config/Connect.scala | 8 ++++++++ .../spark/sql/connect/service/SparkConnectService.scala | 3 ++- docs/configuration.md | 8 ++++++++ python/pyspark/sql/connect/client.py | 12 +++++++++++- python/pyspark/sql/tests/connect/test_connect_basic.py | 9 +++++++++ 7 files changed, 40 insertions(+), 2 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala index 60d6d202ff5..a298c526883 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala @@ -406,6 +406,7 @@ private[sql] object SparkConnectClient { if (metadata.nonEmpty) { channelBuilder.intercept(new MetadataHeaderClientInterceptor(metadata)) } + channelBuilder.maxInboundMessageSize(ConnectCommon.CONNECT_GRPC_MAX_MESSAGE_SIZE) new SparkConnectClient( userContextBuilder.build(), channelBuilder, diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/config/ConnectCommon.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/config/ConnectCommon.scala index 48ae4d2d77f..3f594d79b62 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/config/ConnectCommon.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/config/ConnectCommon.scala @@ -18,4 +18,5 @@ package org.apache.spark.sql.connect.common.config private[connect] object ConnectCommon { val CONNECT_GRPC_BINDING_PORT: Int = 15002 + val CONNECT_GRPC_MAX_MESSAGE_SIZE: Int = 128 * 1024 * 1024; } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala index 64b5bd5d813..19fdad97b5f 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala @@ -47,6 +47,14 @@ object Connect { .bytesConf(ByteUnit.MiB) .createWithDefaultString("4m") + val CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE = + ConfigBuilder("spark.connect.grpc.maxInboundMessageSize") + .doc("Sets the maximum inbound message in bytes size for the gRPC requests." + + "Requests with a larger payload will fail.") + .version("3.4.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(ConnectCommon.CONNECT_GRPC_MAX_MESSAGE_SIZE) + val CONNECT_EXTENSIONS_RELATION_CLASSES = ConfigBuilder("spark.connect.extensions.relation.classes") .doc(""" diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index cd353b6ff60..a9442a8c92c 100755 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -42,7 +42,7 @@ import org.apache.spark.connect.proto import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_BINDING_PORT +import org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_BINDING_PORT, CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE} /** * The SparkConnectService implementation. @@ -276,6 +276,7 @@ object SparkConnectService { val port = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT) val sb = NettyServerBuilder .forPort(port) + .maxInboundMessageSize(SparkEnv.get.conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE).toInt) .addService(new SparkConnectService(debugMode)) // Add all registered interceptors to the server builder. diff --git a/docs/configuration.md b/docs/configuration.md index 0ca53acbeb6..4133fd5d441 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -3171,6 +3171,14 @@ They are typically set via the config file and command-lineoptions with `--conf/ <td>When using Apache Arrow, limit the maximum size of one arrow batch that can be sent from server side to client side. Currently, we conservatively use 70% of it because the size is not accurate but estimated.</td> <td>3.4.0</td> </tr> +<tr> + <td><code>spark.connect.grpc.maxInboundMessageSize</code></td> + <td> + 134217728 + </td> + <td>Sets the maximum inbound message size for the gRPC requests. Requests with a larger payload will fail.</td> + <td>3.4.0</td> +</tr> <tr> <td><code>spark.connect.extensions.relation.classes</code></td> <td> diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py index 53fa97372a7..4db852c951e 100644 --- a/python/pyspark/sql/connect/client.py +++ b/python/pyspark/sql/connect/client.py @@ -123,6 +123,7 @@ class ChannelBuilder: PARAM_TOKEN = "token" PARAM_USER_ID = "user_id" PARAM_USER_AGENT = "user_agent" + MAX_MESSAGE_LENGTH = 128 * 1024 * 1024 @staticmethod def default_port() -> int: @@ -177,7 +178,16 @@ class ChannelBuilder: f"Path component for connection URI must be empty: {self.url.path}" ) self._extract_attributes() - self._channel_options = channelOptions + + GRPC_DEFAULT_OPTIONS = [ + ("grpc.max_send_message_length", ChannelBuilder.MAX_MESSAGE_LENGTH), + ("grpc.max_receive_message_length", ChannelBuilder.MAX_MESSAGE_LENGTH), + ] + + if channelOptions is None: + self._channel_options = GRPC_DEFAULT_OPTIONS + else: + self._channel_options = GRPC_DEFAULT_OPTIONS + channelOptions def _extract_attributes(self) -> None: if len(self.url.params) > 0: diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py index 491865ad9c9..0a47dc193c9 100644 --- a/python/pyspark/sql/tests/connect/test_connect_basic.py +++ b/python/pyspark/sql/tests/connect/test_connect_basic.py @@ -2934,6 +2934,15 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase): self.assertEqual(cdf2.schema, sdf2.schema) self.assertEqual(cdf2.collect(), sdf2.collect()) + def test_large_client_data(self): + # SPARK-42816 support more than 4MB message size. + # ~200bytes + cols = ["abcdefghijklmnoprstuvwxyz" for x in range(10)] + # 100k rows => 20MB + row_count = 100 * 1000 + rows = [cols] * row_count + self.assertEqual(row_count, self.connect.createDataFrame(data=rows).count()) + def test_unsupported_jvm_attribute(self): # Unsupported jvm attributes for Spark session. unsupported_attrs = ["_jsc", "_jconf", "_jvm", "_jsparkSession"] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org