pan3793 commented on code in PR #2373: URL: https://github.com/apache/celeborn/pull/2373#discussion_r1950380522
########## assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch: ########## @@ -0,0 +1,78 @@ +From 39eeab2426f9676580e4e19c8b079e1967081c7d Mon Sep 17 00:00:00 2001 +From: wangshengjie <[email protected]> +Date: Sun, 24 Mar 2024 19:51:05 +0800 +Subject: [PATCH] [SQL] Handle skew partitions with Celeborn + +--- + .../org/apache/spark/sql/internal/SQLConf.scala | 10 ++++++++++ + .../execution/adaptive/ShufflePartitionsUtil.scala | 12 +++++++++++- + 2 files changed, 21 insertions(+), 1 deletion(-) + +diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +index af03ad9a4cb..1e55af89160 100644 +--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ++++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +@@ -3784,6 +3784,13 @@ object SQLConf { + .booleanConf + .createWithDefault(false) + ++ val CELEBORN_CLIENT_DATA_PUSH_FAILURE_TRACKING_ENABLED = ++ buildConf("spark.celeborn.client.dataPushFailure.tracking.enabled") ++ .withAlternative("celeborn.client.dataPushFailure.tracking.enabled") ++ .version("3.1.2-mdh") ++ .booleanConf ++ .createWithDefault(false) ++ + /** + * Holds information about keys that have been deprecated. + * +@@ -4549,6 +4556,9 @@ class SQLConf extends Serializable with Logging { + def histogramNumericPropagateInputType: Boolean = + getConf(SQLConf.HISTOGRAM_NUMERIC_PROPAGATE_INPUT_TYPE) + ++ def isCelebornClientPushFailedTrackingEnabled: Boolean = getConf( ++ SQLConf.CELEBORN_CLIENT_DATA_PUSH_FAILURE_TRACKING_ENABLED) ++ + /** ********************** SQLConf functionality methods ************ */ + + /** Set Spark SQL configuration properties. */ +diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala +index af689db3379..7da6211e509 100644 +--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala ++++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala +@@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer + import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv} + import org.apache.spark.internal.Logging + import org.apache.spark.sql.execution.{CoalescedPartitionSpec, PartialReducerPartitionSpec, ShufflePartitionSpec} ++import org.apache.spark.sql.internal.SQLConf ++import org.apache.spark.util.Utils + + object ShufflePartitionsUtil extends Logging { + final val SMALL_PARTITION_FACTOR = 0.2 +@@ -387,6 +389,10 @@ object ShufflePartitionsUtil extends Logging { + val mapStartIndices = splitSizeListByTargetSize( + mapPartitionSizes, targetSize, smallPartitionFactor) + if (mapStartIndices.length > 1) { ++ // If Celeborn is enabled, split skew partitions without shuffle mapper-range reading ++ val splitSkewPartitionWithCeleborn = Utils.isCelebornEnabled(SparkEnv.get.conf) && ++ SQLConf.get.isCelebornClientPushFailedTrackingEnabled ++ + Some(mapStartIndices.indices.map { i => + val startMapIndex = mapStartIndices(i) + val endMapIndex = if (i == mapStartIndices.length - 1) { +@@ -400,7 +406,11 @@ object ShufflePartitionsUtil extends Logging { + dataSize += mapPartitionSizes(mapIndex) + mapIndex += 1 + } +- PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, dataSize) ++ if (splitSkewPartitionWithCeleborn) { ++ PartialReducerPartitionSpec(reducerId, mapStartIndices.length, i, dataSize) Review Comment: The `dataSize` would be collected by Spark metrics and affect the UI display. Technically, Celebron would split the partition into more even sub-partitions (correct me if I'm wrong), how about calculating the `dataSize` by `partitionSize / subPartitionNum`(except for the last one, should be `partitionSize - sumOfOtherSubPartitionSize`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
