This is an automated email from the ASF dual-hosted git repository.

yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 675c99e55e [CORE] Make max broadcast table size configurable (#9359)
675c99e55e is described below

commit 675c99e55e53d6a37d3c8ca409a84f86d8093252
Author: Kent Yao <[email protected]>
AuthorDate: Fri Apr 18 17:10:50 2025 +0800

    [CORE] Make max broadcast table size configurable (#9359)
    
    Port apache/spark#50327 to make max broadcast table size configurable.
    
    Also, remove our hardcoded ones and make some custom spark build happy
---
 .../gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala       | 9 ++++++---
 .../apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala  | 9 ++++++---
 .../src/main/scala/org/apache/gluten/config/GlutenConfig.scala   | 6 +++++-
 3 files changed, 17 insertions(+), 7 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 8136488b6a..0ec44a207a 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -32,6 +32,7 @@ import org.apache.gluten.vectorized.{BlockOutputStream, 
CHColumnarBatchSerialize
 
 import org.apache.spark.ShuffleDependency
 import org.apache.spark.internal.Logging
+import org.apache.spark.memory.SparkMemoryUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.{GenShuffleWriterParameters, 
GlutenShuffleWriterWrapper, HashPartitioningWrapper}
@@ -49,7 +50,7 @@ import 
org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
 import org.apache.spark.sql.execution.datasources.{FileFormat, 
HadoopFsRelation}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.joins.{BuildSideRelation, 
ClickHouseBuildSideRelation, HashedRelationBroadcastMode}
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.utils.{CHExecUtil, PushDownUtil}
@@ -559,9 +560,11 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
     val batches = countsAndBytes.map(_._2)
     val totalBatchesSize = batches.map(_.length).sum
     val rawSize = dataSize.value
-    if (rawSize >= BroadcastExchangeExec.MAX_BROADCAST_TABLE_BYTES) {
+    if (rawSize >= GlutenConfig.get.maxBroadcastTableSize) {
       throw new GlutenException(
-        s"Cannot broadcast the table that is larger than 8GB: $rawSize bytes")
+        "Cannot broadcast the table that is larger than " +
+          
s"${SparkMemoryUtil.bytesToString(GlutenConfig.get.maxBroadcastTableSize)}: " +
+          s"${SparkMemoryUtil.bytesToString(rawSize)}")
     }
     if ((rawSize == 0 && totalBatchesSize != 0) || totalBatchesSize < 0) {
       throw new GlutenException(
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 77cb2d68be..0dc83e98d4 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -28,6 +28,7 @@ import org.apache.gluten.vectorized.{ColumnarBatchSerializer, 
ColumnarBatchSeria
 
 import org.apache.spark.{ShuffleDependency, SparkException}
 import org.apache.spark.api.python.{ColumnarArrowEvalPythonExec, 
PullOutArrowEvalPythonPreProjectHelper}
+import org.apache.spark.memory.SparkMemoryUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.{GenShuffleWriterParameters, 
GlutenShuffleWriterWrapper}
@@ -41,7 +42,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources.FileFormat
-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.joins.{BuildSideRelation, 
HashedRelationBroadcastMode}
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.python.ArrowEvalPythonExec
@@ -649,9 +650,11 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
       .filter(_.getNumRows != 0)
       .collect
     val rawSize = serialized.map(_.getSerialized.length).sum
-    if (rawSize >= BroadcastExchangeExec.MAX_BROADCAST_TABLE_BYTES) {
+    if (rawSize >= GlutenConfig.get.maxBroadcastTableSize) {
       throw new SparkException(
-        s"Cannot broadcast the table that is larger than 8GB: ${rawSize >> 30} 
GB")
+        "Cannot broadcast the table that is larger than " +
+          
s"${SparkMemoryUtil.bytesToString(GlutenConfig.get.maxBroadcastTableSize)}: " +
+          s"${SparkMemoryUtil.bytesToString(rawSize)}")
     }
     numOutputRows += serialized.map(_.getNumRows).sum
     dataSize += rawSize
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 449db38e24..22d269cc25 100644
--- a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -17,7 +17,7 @@
 package org.apache.gluten.config
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.network.util.{ByteUnit, JavaUtils}
 import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf, 
SQLConfProvider}
 
 import com.google.common.collect.ImmutableList
@@ -364,6 +364,9 @@ class GlutenConfig(conf: SQLConf) extends Logging {
   def enableColumnarRange: Boolean = getConf(COLUMNAR_RANGE_ENABLED)
   def enableColumnarCollectLimit: Boolean = 
getConf(COLUMNAR_COLLECT_LIMIT_ENABLED)
   def getSupportedFlattenedExpressions: String = 
getConf(GLUTEN_SUPPORTED_FLATTENED_FUNCTIONS)
+
+  def maxBroadcastTableSize: Long =
+    
JavaUtils.byteStringAsBytes(conf.getConfString(SPARK_MAX_BROADCAST_TABLE_SIZE, 
"8GB"))
 }
 
 object GlutenConfig {
@@ -442,6 +445,7 @@ object GlutenConfig {
   val SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE = 
"spark.shuffle.spill.diskWriteBufferSize"
   val SPARK_SHUFFLE_SPILL_COMPRESS = "spark.shuffle.spill.compress"
   val SPARK_SHUFFLE_SPILL_COMPRESS_DEFAULT: Boolean = true
+  val SPARK_MAX_BROADCAST_TABLE_SIZE = "spark.sql.maxBroadcastTableSize"
 
   def get: GlutenConfig = {
     new GlutenConfig(SQLConf.get)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to