This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 42bc297d5b9 [SPARK-41433][CONNECT] Make Max Arrow BatchSize
configurable
42bc297d5b9 is described below
commit 42bc297d5b9149730f91af26c857897bf4d6aa91
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Wed Dec 7 16:30:28 2022 +0800
[SPARK-41433][CONNECT] Make Max Arrow BatchSize configurable
### What changes were proposed in this pull request?
Make Max Arrow BatchSize configurable
### Why are the changes needed?
make batchsize configurable
### Does this PR introduce _any_ user-facing change?
yes, one new configration
### How was this patch tested?
existing tests
Closes #38958 from zhengruifeng/connect_config_batchsize.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
.../main/scala/org/apache/spark/sql/connect/config/Connect.scala | 9 +++++++++
.../spark/sql/connect/service/SparkConnectStreamHandler.scala | 9 ++++-----
2 files changed, 13 insertions(+), 5 deletions(-)
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index 76d159cfd15..a17e9784ec6 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.connect.config
import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
private[spark] object Connect {
@@ -34,4 +35,12 @@ private[spark] object Connect {
.version("3.4.0")
.stringConf
.createOptional
+
+ val CONNECT_GRPC_ARROW_MAX_BATCH_SIZE =
+ ConfigBuilder("spark.connect.grpc.arrow.maxBatchSize")
+ .doc("When using Apache Arrow, limit the maximum size of one arrow batch
that " +
+ "can be sent from server side to client side.")
+ .version("3.4.0")
+ .bytesConf(ByteUnit.MiB)
+ .createWithDefaultString("4m")
}
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
index b5d100e894d..fcae3501cef 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
@@ -23,11 +23,13 @@ import scala.util.control.NonFatal
import com.google.protobuf.ByteString
import io.grpc.stub.StreamObserver
+import org.apache.spark.SparkEnv
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{ExecutePlanRequest, ExecutePlanResponse}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
+import
org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_ARROW_MAX_BATCH_SIZE
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
AdaptiveSparkPlanHelper, QueryStageExec}
@@ -38,9 +40,6 @@ import org.apache.spark.util.ThreadUtils
class SparkConnectStreamHandler(responseObserver:
StreamObserver[ExecutePlanResponse])
extends Logging {
- // The maximum batch size in bytes for a single batch of data to be returned
via proto.
- private val MAX_BATCH_SIZE: Long = 4 * 1024 * 1024
-
def handle(v: ExecutePlanRequest): Unit = {
val session =
SparkConnectService.getOrCreateIsolatedSession(v.getUserContext.getUserId).session
@@ -64,12 +63,12 @@ class SparkConnectStreamHandler(responseObserver:
StreamObserver[ExecutePlanResp
val schema = dataframe.schema
val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch
val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone
+ // Conservatively sets it 70% because the size is not accurate but
estimated.
+ val maxBatchSize =
(SparkEnv.get.conf.get(CONNECT_GRPC_ARROW_MAX_BATCH_SIZE) * 0.7).toLong
SQLExecution.withNewExecutionId(dataframe.queryExecution,
Some("collectArrow")) {
val rows = dataframe.queryExecution.executedPlan.execute()
val numPartitions = rows.getNumPartitions
- // Conservatively sets it 70% because the size is not accurate but
estimated.
- val maxBatchSize = (MAX_BATCH_SIZE * 0.7).toLong
var numSent = 0
if (numPartitions > 0) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]