This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 669e63a34012 [SPARK-49673][CONNECT] Increase
CONNECT_GRPC_ARROW_MAX_BATCH_SIZE to 0.7 * CONNECT_GRPC_MAX_MESSAGE_SIZE
669e63a34012 is described below
commit 669e63a34012404d8d864cd6294f799b672f6f9a
Author: Robert Dillitz <[email protected]>
AuthorDate: Thu Sep 19 08:54:20 2024 +0900
[SPARK-49673][CONNECT] Increase CONNECT_GRPC_ARROW_MAX_BATCH_SIZE to 0.7 *
CONNECT_GRPC_MAX_MESSAGE_SIZE
### What changes were proposed in this pull request?
Increases the default `maxBatchSize` from 4MiB * 0.7 to 128MiB (=
CONNECT_GRPC_MAX_MESSAGE_SIZE) * 0.7. This makes better use of the allowed
maximum message size.
This limit is used when creating Arrow batches for the `SqlCommandResult`
in the `SparkConnectPlanner` and for `ExecutePlanResponse.ArrowBatch` in
`processAsArrowBatches`. This, for example, lets us return much larger
`LocalRelations` in the `SqlCommandResult` (i.e., for the `SHOW PARTITIONS`
command) while still staying within the GRPC message size limit.
### Why are the changes needed?
There are `SqlCommandResults` that exceed 0.7 * 4MiB.
### Does this PR introduce _any_ user-facing change?
Now support `SqlCommandResults` <= 0.7 * 128 MiB instead of only <= 0.7 *
4MiB and ExecutePlanResponses will now better use the limit of 128MiB.
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48122 from dillitz/increase-sql-command-batch-size.
Authored-by: Robert Dillitz <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../org/apache/spark/sql/ClientE2ETestSuite.scala | 23 ++++++++++++++++++++--
.../apache/spark/sql/test/RemoteSparkSession.scala | 2 ++
.../apache/spark/sql/connect/config/Connect.scala | 2 +-
3 files changed, 24 insertions(+), 3 deletions(-)
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 52cdbd47357f..b47231948dc9 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -23,7 +23,7 @@ import java.util.Properties
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
-import scala.concurrent.duration.DurationInt
+import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.jdk.CollectionConverters._
import org.apache.commons.io.FileUtils
@@ -37,7 +37,7 @@ import
org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException,
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.connect.client.{SparkConnectClient, SparkResult}
+import org.apache.spark.sql.connect.client.{RetryPolicy, SparkConnectClient,
SparkResult}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.sql.test.{ConnectFunSuite, IntegrationTestUtils,
RemoteSparkSession, SQLHelper}
@@ -1566,6 +1566,25 @@ class ClientE2ETestSuite
val result = df.select(trim(col("col"), " ").as("trimmed_col")).collect()
assert(result sameElements Array(Row("a"), Row("b"), Row("c")))
}
+
+ test("SPARK-49673: new batch size, multiple batches") {
+ val maxBatchSize =
spark.conf.get("spark.connect.grpc.arrow.maxBatchSize").dropRight(1).toInt
+ // Adjust client grpcMaxMessageSize to maxBatchSize (10MiB; set in
RemoteSparkSession config)
+ val sparkWithLowerMaxMessageSize = SparkSession
+ .builder()
+ .client(
+ SparkConnectClient
+ .builder()
+ .userId("test")
+ .port(port)
+ .grpcMaxMessageSize(maxBatchSize)
+ .retryPolicy(RetryPolicy
+ .defaultPolicy()
+ .copy(maxRetries = Some(10), maxBackoff = Some(FiniteDuration(30,
"s"))))
+ .build())
+ .create()
+ assert(sparkWithLowerMaxMessageSize.range(maxBatchSize).collect().length
== maxBatchSize)
+ }
}
private[sql] case class ClassData(a: String, b: Int)
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/RemoteSparkSession.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/RemoteSparkSession.scala
index e0de73e496d9..36aaa2cc7fbf 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/RemoteSparkSession.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/RemoteSparkSession.scala
@@ -124,6 +124,8 @@ object SparkConnectServerUtils {
// to make the tests exercise reattach.
"spark.connect.execute.reattachable.senderMaxStreamDuration=1s",
"spark.connect.execute.reattachable.senderMaxStreamSize=123",
+ // Testing SPARK-49673, setting maxBatchSize to 10MiB
+ s"spark.connect.grpc.arrow.maxBatchSize=${10 * 1024 * 1024}",
// Disable UI
"spark.ui.enabled=false")
Seq("--jars", catalystTestJar) ++ confs.flatMap(v => "--conf" :: v :: Nil)
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index 92709ff29a1c..b64637f7d247 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -63,7 +63,7 @@ object Connect {
"conservatively use 70% of it because the size is not accurate but
estimated.")
.version("3.4.0")
.bytesConf(ByteUnit.BYTE)
- .createWithDefault(4 * 1024 * 1024)
+ .createWithDefault(ConnectCommon.CONNECT_GRPC_MAX_MESSAGE_SIZE)
val CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE =
buildStaticConf("spark.connect.grpc.maxInboundMessageSize")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]