s0nskar commented on code in PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#discussion_r1552832095


##########
assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch:
##########
@@ -0,0 +1,78 @@
+From 39eeab2426f9676580e4e19c8b079e1967081c7d Mon Sep 17 00:00:00 2001
+From: wangshengjie <[email protected]>
+Date: Sun, 24 Mar 2024 19:51:05 +0800
+Subject: [PATCH] [SQL] Handle skew partitions with Celeborn
+
+---
+ .../org/apache/spark/sql/internal/SQLConf.scala      | 10 ++++++++++
+ .../execution/adaptive/ShufflePartitionsUtil.scala   | 12 +++++++++++-
+ 2 files changed, 21 insertions(+), 1 deletion(-)
+
+diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+index af03ad9a4cb..1e55af89160 100644
+--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
++++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+@@ -3784,6 +3784,13 @@ object SQLConf {
+     .booleanConf
+     .createWithDefault(false)
+ 
++  val CELEBORN_CLIENT_DATA_PUSH_FAILURE_TRACKING_ENABLED =
++    buildConf("spark.celeborn.client.dataPushFailure.tracking.enabled")
++      .withAlternative("celeborn.client.dataPushFailure.tracking.enabled")
++      .version("3.1.2-mdh")
++      .booleanConf
++      .createWithDefault(false)
++
+   /**
+    * Holds information about keys that have been deprecated.
+    *
+@@ -4549,6 +4556,9 @@ class SQLConf extends Serializable with Logging {
+   def histogramNumericPropagateInputType: Boolean =
+     getConf(SQLConf.HISTOGRAM_NUMERIC_PROPAGATE_INPUT_TYPE)
+ 
++  def isCelebornClientPushFailedTrackingEnabled: Boolean = getConf(
++    SQLConf.CELEBORN_CLIENT_DATA_PUSH_FAILURE_TRACKING_ENABLED)
++
+   /** ********************** SQLConf functionality methods ************ */
+ 
+   /** Set Spark SQL configuration properties. */
+diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+index af689db3379..7da6211e509 100644
+--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
++++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+@@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer
+ import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, 
SparkEnv}
+ import org.apache.spark.internal.Logging
+ import org.apache.spark.sql.execution.{CoalescedPartitionSpec, 
PartialReducerPartitionSpec, ShufflePartitionSpec}
++import org.apache.spark.sql.internal.SQLConf
++import org.apache.spark.util.Utils
+ 
+ object ShufflePartitionsUtil extends Logging {
+   final val SMALL_PARTITION_FACTOR = 0.2
+@@ -387,6 +389,10 @@ object ShufflePartitionsUtil extends Logging {
+     val mapStartIndices = splitSizeListByTargetSize(
+       mapPartitionSizes, targetSize, smallPartitionFactor)
+     if (mapStartIndices.length > 1) {
++      // If Celeborn is enabled, split skew partitions without shuffle 
mapper-range reading
++      val splitSkewPartitionWithCeleborn = 
Utils.isCelebornEnabled(SparkEnv.get.conf) &&
++        SQLConf.get.isCelebornClientPushFailedTrackingEnabled
++
+       Some(mapStartIndices.indices.map { i =>
+         val startMapIndex = mapStartIndices(i)
+         val endMapIndex = if (i == mapStartIndices.length - 1) {
+@@ -400,7 +406,11 @@ object ShufflePartitionsUtil extends Logging {
+           dataSize += mapPartitionSizes(mapIndex)
+           mapIndex += 1
+         }
+-        PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, 
dataSize)
++        if (splitSkewPartitionWithCeleborn) {
++          PartialReducerPartitionSpec(reducerId, mapStartIndices.length, i, 
dataSize)

Review Comment:
   We can maybe add a note here that these dataSize will not be accurate. Even 
though in the current downstream code, we're only getting the sum of dataSize 
which should be equal but someone might be using these differently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to