This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 6e7e50eaaf7 [SPARK-44657][CONNECT] Fix incorrect limit handling in
ArrowBatchWithSchemaIterator and config parsing of
CONNECT_GRPC_ARROW_MAX_BATCH_SIZE
6e7e50eaaf7 is described below
commit 6e7e50eaaf7770d46474b5299ccf8724018f526a
Author: vicennial <[email protected]>
AuthorDate: Tue Aug 8 17:30:16 2023 +0900
[SPARK-44657][CONNECT] Fix incorrect limit handling in
ArrowBatchWithSchemaIterator and config parsing of
CONNECT_GRPC_ARROW_MAX_BATCH_SIZE
Fixes the limit checking of `maxEstimatedBatchSize` and
`maxRecordsPerBatch` to respect the more restrictive limit and fixes the config
parsing of `CONNECT_GRPC_ARROW_MAX_BATCH_SIZE` by converting the value to bytes.
Bugfix.
In the arrow writer
[code](https://github.com/apache/spark/blob/6161bf44f40f8146ea4c115c788fd4eaeb128769/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala#L154-L163)
, the conditions don’t seem to hold what the documentation says regd
"maxBatchSize and maxRecordsPerBatch, respect whatever smaller" since it seems
to actually respect the conf which is "larger" (i.e less restrictive) due to ||
operator.
Further, when the `CONNECT_GRPC_ARROW_MAX_BATCH_SIZE` conf is read, the
value is not converted to bytes from MiB
([example](https://github.com/apache/spark/blob/3e5203c64c06cc8a8560dfa0fb6f52e74589b583/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala#L103)).
No.
Existing tests.
Closes #42321 from vicennial/SPARK-44657.
Authored-by: vicennial <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit f9d417fc17a82ddf02d6bbab82abc8e1aa264356)
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../apache/spark/sql/connect/config/Connect.scala | 10 +++++-----
.../spark/sql/execution/arrow/ArrowConverters.scala | 21 +++++++++++++--------
2 files changed, 18 insertions(+), 13 deletions(-)
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index 19fdad97b5f..2a5290d3469 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -40,12 +40,12 @@ object Connect {
val CONNECT_GRPC_ARROW_MAX_BATCH_SIZE =
ConfigBuilder("spark.connect.grpc.arrow.maxBatchSize")
.doc(
- "When using Apache Arrow, limit the maximum size of one arrow batch
that " +
- "can be sent from server side to client side. Currently, we
conservatively use 70% " +
- "of it because the size is not accurate but estimated.")
+ "When using Apache Arrow, limit the maximum size of one arrow batch,
in bytes unless " +
+ "otherwise specified, that can be sent from server side to client
side. Currently, we " +
+ "conservatively use 70% of it because the size is not accurate but
estimated.")
.version("3.4.0")
- .bytesConf(ByteUnit.MiB)
- .createWithDefaultString("4m")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefault(4 * 1024 * 1024)
val CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE =
ConfigBuilder("spark.connect.grpc.maxInboundMessageSize")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
index b22c80d17e8..df26a06c86d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
@@ -145,17 +145,22 @@ private[sql] object ArrowConverters extends Logging {
// Always write the schema.
MessageSerializer.serialize(writeChannel, arrowSchema)
+ def isBatchSizeLimitExceeded: Boolean = {
+ // If `maxEstimatedBatchSize` is zero or negative, it implies
unlimited.
+ maxEstimatedBatchSize > 0 && estimatedBatchSize >=
maxEstimatedBatchSize
+ }
+ def isRecordLimitExceeded: Boolean = {
+ // If `maxRecordsPerBatch` is zero or negative, it implies unlimited.
+ maxRecordsPerBatch > 0 && rowCountInLastBatch >= maxRecordsPerBatch
+ }
// Always write the first row.
while (rowIter.hasNext && (
- // For maxBatchSize and maxRecordsPerBatch, respect whatever smaller.
// If the size in bytes is positive (set properly), always write the
first row.
- rowCountInLastBatch == 0 && maxEstimatedBatchSize > 0 ||
- // If the size in bytes of rows are 0 or negative, unlimit it.
- estimatedBatchSize <= 0 ||
- estimatedBatchSize < maxEstimatedBatchSize ||
- // If the size of rows are 0 or negative, unlimit it.
- maxRecordsPerBatch <= 0 ||
- rowCountInLastBatch < maxRecordsPerBatch)) {
+ (rowCountInLastBatch == 0 && maxEstimatedBatchSize > 0) ||
+ // If either limit is hit, create a batch. This implies that the
limit that is hit first
+ // triggers the creation of a batch even if the other limit is not
yet hit, hence
+ // preferring the more restrictive limit.
+ (!isBatchSizeLimitExceeded && !isRecordLimitExceeded))) {
val row = rowIter.next()
arrowWriter.write(row)
estimatedBatchSize += (row match {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]