This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 669e63a34012 [SPARK-49673][CONNECT] Increase 
CONNECT_GRPC_ARROW_MAX_BATCH_SIZE to 0.7 * CONNECT_GRPC_MAX_MESSAGE_SIZE
669e63a34012 is described below

commit 669e63a34012404d8d864cd6294f799b672f6f9a
Author: Robert Dillitz <[email protected]>
AuthorDate: Thu Sep 19 08:54:20 2024 +0900

    [SPARK-49673][CONNECT] Increase CONNECT_GRPC_ARROW_MAX_BATCH_SIZE to 0.7 * 
CONNECT_GRPC_MAX_MESSAGE_SIZE
    
    ### What changes were proposed in this pull request?
    Increases the default `maxBatchSize` from 4MiB * 0.7 to 128MiB (=
    CONNECT_GRPC_MAX_MESSAGE_SIZE) * 0.7. This makes better use of the allowed 
maximum message size.
    This limit is used when creating Arrow batches for the `SqlCommandResult` 
in the `SparkConnectPlanner` and for `ExecutePlanResponse.ArrowBatch` in 
`processAsArrowBatches`. This, for example, lets us return much larger 
`LocalRelations` in the `SqlCommandResult` (i.e., for the `SHOW PARTITIONS` 
command) while still staying within the GRPC message size limit.
    
    ### Why are the changes needed?
    There are `SqlCommandResults` that exceed 0.7 * 4MiB.
    
    ### Does this PR introduce _any_ user-facing change?
    Now support `SqlCommandResults` <= 0.7 * 128 MiB instead of only <= 0.7 * 
4MiB and ExecutePlanResponses will now better use the limit of 128MiB.
    
    ### How was this patch tested?
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #48122 from dillitz/increase-sql-command-batch-size.
    
    Authored-by: Robert Dillitz <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../org/apache/spark/sql/ClientE2ETestSuite.scala  | 23 ++++++++++++++++++++--
 .../apache/spark/sql/test/RemoteSparkSession.scala |  2 ++
 .../apache/spark/sql/connect/config/Connect.scala  |  2 +-
 3 files changed, 24 insertions(+), 3 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 52cdbd47357f..b47231948dc9 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -23,7 +23,7 @@ import java.util.Properties
 
 import scala.collection.mutable
 import scala.concurrent.{ExecutionContext, Future}
-import scala.concurrent.duration.DurationInt
+import scala.concurrent.duration.{DurationInt, FiniteDuration}
 import scala.jdk.CollectionConverters._
 
 import org.apache.commons.io.FileUtils
@@ -37,7 +37,7 @@ import 
org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException,
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
 import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
 import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.connect.client.{SparkConnectClient, SparkResult}
+import org.apache.spark.sql.connect.client.{RetryPolicy, SparkConnectClient, 
SparkResult}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SqlApiConf
 import org.apache.spark.sql.test.{ConnectFunSuite, IntegrationTestUtils, 
RemoteSparkSession, SQLHelper}
@@ -1566,6 +1566,25 @@ class ClientE2ETestSuite
     val result = df.select(trim(col("col"), " ").as("trimmed_col")).collect()
     assert(result sameElements Array(Row("a"), Row("b"), Row("c")))
   }
+
+  test("SPARK-49673: new batch size, multiple batches") {
+    val maxBatchSize = 
spark.conf.get("spark.connect.grpc.arrow.maxBatchSize").dropRight(1).toInt
+    // Adjust client grpcMaxMessageSize to maxBatchSize (10MiB; set in 
RemoteSparkSession config)
+    val sparkWithLowerMaxMessageSize = SparkSession
+      .builder()
+      .client(
+        SparkConnectClient
+          .builder()
+          .userId("test")
+          .port(port)
+          .grpcMaxMessageSize(maxBatchSize)
+          .retryPolicy(RetryPolicy
+            .defaultPolicy()
+            .copy(maxRetries = Some(10), maxBackoff = Some(FiniteDuration(30, 
"s"))))
+          .build())
+      .create()
+    assert(sparkWithLowerMaxMessageSize.range(maxBatchSize).collect().length 
== maxBatchSize)
+  }
 }
 
 private[sql] case class ClassData(a: String, b: Int)
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/RemoteSparkSession.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/RemoteSparkSession.scala
index e0de73e496d9..36aaa2cc7fbf 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/RemoteSparkSession.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/RemoteSparkSession.scala
@@ -124,6 +124,8 @@ object SparkConnectServerUtils {
       // to make the tests exercise reattach.
       "spark.connect.execute.reattachable.senderMaxStreamDuration=1s",
       "spark.connect.execute.reattachable.senderMaxStreamSize=123",
+      // Testing SPARK-49673, setting maxBatchSize to 10MiB
+      s"spark.connect.grpc.arrow.maxBatchSize=${10 * 1024 * 1024}",
       // Disable UI
       "spark.ui.enabled=false")
     Seq("--jars", catalystTestJar) ++ confs.flatMap(v => "--conf" :: v :: Nil)
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index 92709ff29a1c..b64637f7d247 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -63,7 +63,7 @@ object Connect {
           "conservatively use 70% of it because the size is not accurate but 
estimated.")
       .version("3.4.0")
       .bytesConf(ByteUnit.BYTE)
-      .createWithDefault(4 * 1024 * 1024)
+      .createWithDefault(ConnectCommon.CONNECT_GRPC_MAX_MESSAGE_SIZE)
 
   val CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE =
     buildStaticConf("spark.connect.grpc.maxInboundMessageSize")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to