This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new f35a55df3 [CELEBORN-858] Generate patch to each Spark 3.x minor version
f35a55df3 is described below

commit f35a55df3d8b42bde7a82b849d13943f23325197
Author: Luke Yan <[email protected]>
AuthorDate: Fri Nov 10 15:35:54 2023 +0800

    [CELEBORN-858] Generate patch to each Spark 3.x minor version
    
    Add the following patch files in directory 
`incubator-celeborn/tree/spark3-patch/assets/spark-patch` :
    
    1. Celeborn_Dynamic_Allocation_spark3_0.patch
    2. Celeborn_Dynamic_Allocation_spark3_1.patch
    3. Celeborn_Dynamic_Allocation_spark3_2.patch
    4. Celeborn_Dynamic_Allocation_spark3_3.patch
    
    Delete a patch at the same time:
    
    1. Celeborn_Dynamic_Allocation_spark3.patch
    
    Modified `Support Spark Dynamic Allocation` in incubator-celeborn/README.md 
:
    
    
![image](https://github.com/apache/incubator-celeborn/assets/108530647/61e2e69b-d3f5-4d11-a20b-374622936443)
    
    Convenient for customers to apply patches in Spark 3.X for `Support Spark 
Dynamic Allocation`
    
    no
    
    yes. All patch files can be applied to the corresponding version of spark 
source code through `git apply`  without any code conflicts.
    
    Closes #2085 from lukeyan2023/spark3-patch.
    
    Authored-by: Luke Yan <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit c7c2f6a35ae0bf5119df5e87ef7c389a09886e59)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 README.md                                          | 16 +++-
 ... => Celeborn_Dynamic_Allocation_spark3_0.patch} | 89 +++++++++++-----------
 ... => Celeborn_Dynamic_Allocation_spark3_1.patch} | 81 +++++++++++---------
 ... => Celeborn_Dynamic_Allocation_spark3_2.patch} |  0
 ... => Celeborn_Dynamic_Allocation_spark3_3.patch} | 86 +++++++++++----------
 5 files changed, 144 insertions(+), 128 deletions(-)

diff --git a/README.md b/README.md
index 108558fbd..be08653ff 100644
--- a/README.md
+++ b/README.md
@@ -302,10 +302,18 @@ Masters and works can be deployed on the same node but 
should not deploy multipl
 See more detail in [CONFIGURATIONS](docs/configuration.md)
 
 ### Support Spark Dynamic Allocation
-We provide a patch to enable users to use Spark with both Dynamic Resource 
Allocation(DRA) and Celeborn.
-For Spark2.x check [Spark2 
Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark2.patch).  
-For Spark3.x check [Spark3 
Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch).
-For Spark3.4 check [Spark3 
Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_4.patch).
+For Spark versions >= 3.5.0, Celeborn can be used with Dynamic Resource 
Allocation(DRA) 
+when `spark.shuffle.sort.io.plugin.class` is set to 
`org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO`.
+Check [SPARK-42689](https://issues.apache.org/jira/browse/SPARK-42689) and 
[CELEBORN-911](https://issues.apache.org/jira/browse/CELEBORN-911)
+for more details.
+
+For Spark versions < 3.5.0, we provide a patch to enable users to use Spark 
with DRA and Celeborn.
+- For Spark 2.x check [Spark2 
Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark2.patch).
+- For Spark 3.0 check [Spark3.0 
Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_0.patch).
+- For Spark 3.1 check [Spark3.1 
Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_1.patch).
+- For Spark 3.2 check [Spark3.2 
Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_2.patch).
+- For Spark 3.3 check [Spark3.3 
Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_3.patch).
+- For Spark 3.4 check [Spark3.4 
Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_4.patch).
 
 ### Metrics
 Celeborn has various metrics. [METRICS](METRICS.md)
diff --git a/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch 
b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_0.patch
similarity index 76%
copy from assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
copy to assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_0.patch
index 5cebe83c7..59081a2e8 100644
--- a/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
+++ b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_0.patch
@@ -13,74 +13,75 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-Index: core/src/main/scala/org/apache/spark/util/Utils.scala
-IDEA additional info:
-Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
-<+>UTF-8
-===================================================================
-diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
---- a/core/src/main/scala/org/apache/spark/util/Utils.scala    (revision 
d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala    (date 
1652322540231)
-@@ -3222,6 +3222,9 @@
-     }
-     files.toSeq
-   }
-+
-+  def isCelebornEnabled(conf: SparkConf): Boolean =
-+    conf.get("spark.shuffle.manager", "sort").contains("celeborn")
- }
- 
- private[util] object CallerContext extends Logging {
+Subject: [PATCH] [CORE][SHUFFLE] Support enabling DRA with Apache Celeborn
+---
 Index: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
 IDEA additional info:
 Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
 <+>UTF-8
 ===================================================================
 diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
---- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala     
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala     
(date 1652322692344)
-@@ -211,7 +211,7 @@
-           (decommissionEnabled &&
-             conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
+--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala     
(revision 65ac1e75dc468f53fc778cd2ce1ba3f21067aab8)
++++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala     
(revision 1f5cb2ec9e9652d03c9775954b69a708b5d05ab3)
+@@ -198,7 +198,7 @@
+     if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
+       if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
          logWarning("Dynamic allocation without a shuffle service is an 
experimental feature.")
 -      } else if (!testing) {
 +      } else if (!testing && !Utils.isCelebornEnabled(conf)) {
          throw new SparkException("Dynamic allocation of executors requires 
the external " +
            "shuffle service. You may enable this through 
spark.shuffle.service.enabled.")
        }
+Index: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+IDEA additional info:
+Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
+<+>UTF-8
+===================================================================
+diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala        
(revision 65ac1e75dc468f53fc778cd2ce1ba3f21067aab8)
++++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala        
(revision 1f5cb2ec9e9652d03c9775954b69a708b5d05ab3)
+@@ -1851,7 +1851,8 @@
+     // if the cluster manager explicitly tells us that the entire worker was 
lost, then
+     // we know to unregister shuffle output.  (Note that "worker" 
specifically refers to the process
+     // from a Standalone cluster, where the shuffle service lives in the 
Worker.)
+-    val fileLost = workerLost || 
!env.blockManager.externalShuffleServiceEnabled
++    val fileLost = !Utils.isCelebornEnabled(sc.getConf) &&
++      (workerLost || !env.blockManager.externalShuffleServiceEnabled)
+     removeExecutorAndUnregisterOutputs(
+       execId = execId,
+       fileLost = fileLost,
 Index: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
 IDEA additional info:
 Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
 <+>UTF-8
 ===================================================================
 diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
---- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala      
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala      
(date 1652323382069)
-@@ -1015,7 +1015,8 @@
-     // and we are not using an external shuffle server which could serve the 
shuffle outputs.
+--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala      
(revision 65ac1e75dc468f53fc778cd2ce1ba3f21067aab8)
++++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala      
(revision 1f5cb2ec9e9652d03c9775954b69a708b5d05ab3)
+@@ -944,7 +944,7 @@
      // The reason is the next stage wouldn't be able to fetch the data from 
this dead executor
      // so we would need to rerun these tasks on other executors.
--    if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled 
&& !isZombie) {
-+    if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled 
&& !isZombie &&
-+      !Utils.isCelebornEnabled(conf)) {
+     if (tasks(0).isInstanceOf[ShuffleMapTask] && 
!env.blockManager.externalShuffleServiceEnabled
+-        && !isZombie) {
++        && !isZombie && !Utils.isCelebornEnabled(conf)) {
        for ((tid, info) <- taskInfos if info.executorId == execId) {
          val index = taskInfos(tid).index
          // We may have a running task whose partition has been marked as 
successful,
-Index: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+Index: core/src/main/scala/org/apache/spark/util/Utils.scala
 IDEA additional info:
 Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
 <+>UTF-8
 ===================================================================
-diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala        
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala        
(date 1652322695806)
-@@ -2231,7 +2231,8 @@
-     // if the cluster manager explicitly tells us that the entire worker was 
lost, then
-     // we know to unregister shuffle output.  (Note that "worker" 
specifically refers to the process
-     // from a Standalone cluster, where the shuffle service lives in the 
Worker.)
--    val fileLost = workerHost.isDefined || 
!env.blockManager.externalShuffleServiceEnabled
-+    val fileLost = !Utils.isCelebornEnabled(sc.getConf) &&
-+      (workerHost.isDefined || 
!env.blockManager.externalShuffleServiceEnabled)
-     removeExecutorAndUnregisterOutputs(
-       execId = execId,
-       fileLost = fileLost,
+diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
+--- a/core/src/main/scala/org/apache/spark/util/Utils.scala    (revision 
65ac1e75dc468f53fc778cd2ce1ba3f21067aab8)
++++ b/core/src/main/scala/org/apache/spark/util/Utils.scala    (revision 
1f5cb2ec9e9652d03c9775954b69a708b5d05ab3)
+@@ -2934,6 +2934,9 @@
+     props.forEach((k, v) => resultProps.put(k, v))
+     resultProps
+   }
++
++  def isCelebornEnabled(conf: SparkConf): Boolean =
++    conf.get("spark.shuffle.manager", "sort").contains("celeborn")
+ }
+ 
+ private[util] object CallerContext extends Logging {
diff --git a/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch 
b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_1.patch
similarity index 86%
copy from assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
copy to assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_1.patch
index 5cebe83c7..8d9bfd6a0 100644
--- a/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
+++ b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_1.patch
@@ -13,33 +13,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-Index: core/src/main/scala/org/apache/spark/util/Utils.scala
-IDEA additional info:
-Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
-<+>UTF-8
-===================================================================
-diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
---- a/core/src/main/scala/org/apache/spark/util/Utils.scala    (revision 
d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala    (date 
1652322540231)
-@@ -3222,6 +3222,9 @@
-     }
-     files.toSeq
-   }
-+
-+  def isCelebornEnabled(conf: SparkConf): Boolean =
-+    conf.get("spark.shuffle.manager", "sort").contains("celeborn")
- }
- 
- private[util] object CallerContext extends Logging {
+Subject: [PATCH] [CORE][SHUFFLE] Support enabling DRA with Apache Celeborn
+---
 Index: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
 IDEA additional info:
 Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
 <+>UTF-8
 ===================================================================
 diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
---- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala     
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala     
(date 1652322692344)
-@@ -211,7 +211,7 @@
+--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala     
(revision d1f8a503a26bcfb4e466d9accc5fa241a7933667)
++++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala     
(revision c8609fa08534e1cc6fd377b3573e5e65ddc79f15)
+@@ -210,7 +210,7 @@
            (decommissionEnabled &&
              conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
          logWarning("Dynamic allocation without a shuffle service is an 
experimental feature.")
@@ -48,15 +32,33 @@ diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scal
          throw new SparkException("Dynamic allocation of executors requires 
the external " +
            "shuffle service. You may enable this through 
spark.shuffle.service.enabled.")
        }
+Index: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+IDEA additional info:
+Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
+<+>UTF-8
+===================================================================
+diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala        
(revision d1f8a503a26bcfb4e466d9accc5fa241a7933667)
++++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala        
(revision c8609fa08534e1cc6fd377b3573e5e65ddc79f15)
+@@ -2080,7 +2080,8 @@
+     // if the cluster manager explicitly tells us that the entire worker was 
lost, then
+     // we know to unregister shuffle output.  (Note that "worker" 
specifically refers to the process
+     // from a Standalone cluster, where the shuffle service lives in the 
Worker.)
+-    val fileLost = workerHost.isDefined || 
!env.blockManager.externalShuffleServiceEnabled
++    val fileLost = !Utils.isCelebornEnabled(sc.getConf) &&
++      (workerHost.isDefined || 
!env.blockManager.externalShuffleServiceEnabled)
+     removeExecutorAndUnregisterOutputs(
+       execId = execId,
+       fileLost = fileLost,
 Index: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
 IDEA additional info:
 Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
 <+>UTF-8
 ===================================================================
 diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
---- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala      
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala      
(date 1652323382069)
-@@ -1015,7 +1015,8 @@
+--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala      
(revision d1f8a503a26bcfb4e466d9accc5fa241a7933667)
++++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala      
(revision c8609fa08534e1cc6fd377b3573e5e65ddc79f15)
+@@ -973,7 +973,8 @@
      // and we are not using an external shuffle server which could serve the 
shuffle outputs.
      // The reason is the next stage wouldn't be able to fetch the data from 
this dead executor
      // so we would need to rerun these tasks on other executors.
@@ -66,21 +68,24 @@ diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
        for ((tid, info) <- taskInfos if info.executorId == execId) {
          val index = taskInfos(tid).index
          // We may have a running task whose partition has been marked as 
successful,
-Index: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+Index: core/src/main/scala/org/apache/spark/util/Utils.scala
 IDEA additional info:
 Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
 <+>UTF-8
 ===================================================================
-diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala        
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala        
(date 1652322695806)
-@@ -2231,7 +2231,8 @@
-     // if the cluster manager explicitly tells us that the entire worker was 
lost, then
-     // we know to unregister shuffle output.  (Note that "worker" 
specifically refers to the process
-     // from a Standalone cluster, where the shuffle service lives in the 
Worker.)
--    val fileLost = workerHost.isDefined || 
!env.blockManager.externalShuffleServiceEnabled
-+    val fileLost = !Utils.isCelebornEnabled(sc.getConf) &&
-+      (workerHost.isDefined || 
!env.blockManager.externalShuffleServiceEnabled)
-     removeExecutorAndUnregisterOutputs(
-       execId = execId,
-       fileLost = fileLost,
+diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
+--- a/core/src/main/scala/org/apache/spark/util/Utils.scala    (revision 
d1f8a503a26bcfb4e466d9accc5fa241a7933667)
++++ b/core/src/main/scala/org/apache/spark/util/Utils.scala    (revision 
c8609fa08534e1cc6fd377b3573e5e65ddc79f15)
+@@ -3057,7 +3057,12 @@
+       0
+     }
+   }
++
++  def isCelebornEnabled(conf: SparkConf): Boolean =
++    conf.get("spark.shuffle.manager", "sort").contains("celeborn")
+ }
++
++
+ 
+ private[util] object CallerContext extends Logging {
+   val callerContextSupported: Boolean = {
diff --git a/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch 
b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_2.patch
similarity index 100%
copy from assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
copy to assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_2.patch
diff --git a/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch 
b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_3.patch
similarity index 81%
rename from assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
rename to assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_3.patch
index 5cebe83c7..e998e2e5f 100644
--- a/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
+++ b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_3.patch
@@ -13,50 +13,52 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-Index: core/src/main/scala/org/apache/spark/util/Utils.scala
-IDEA additional info:
-Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
-<+>UTF-8
-===================================================================
-diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
---- a/core/src/main/scala/org/apache/spark/util/Utils.scala    (revision 
d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala    (date 
1652322540231)
-@@ -3222,6 +3222,9 @@
-     }
-     files.toSeq
-   }
-+
-+  def isCelebornEnabled(conf: SparkConf): Boolean =
-+    conf.get("spark.shuffle.manager", "sort").contains("celeborn")
- }
- 
- private[util] object CallerContext extends Logging {
+Subject: [PATCH] [CORE][SHUFFLE] Support enabling DRA with Apache Celeborn
+---
 Index: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
 IDEA additional info:
 Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
 <+>UTF-8
 ===================================================================
 diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
---- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala     
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala     
(date 1652322692344)
-@@ -211,7 +211,7 @@
-           (decommissionEnabled &&
-             conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
-         logWarning("Dynamic allocation without a shuffle service is an 
experimental feature.")
+--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala     
(revision 8c2b3319c6734250ff9d72f3d7e5cab56b142195)
++++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala     
(revision 5e318f8fbeadec03986b3ad9cad4c1cda33c9ed2)
+@@ -209,7 +209,7 @@
+       } else if (decommissionEnabled &&
+           conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) {
+         logInfo("Shuffle data decommission is enabled without a shuffle 
service.")
 -      } else if (!testing) {
 +      } else if (!testing && !Utils.isCelebornEnabled(conf)) {
          throw new SparkException("Dynamic allocation of executors requires 
the external " +
            "shuffle service. You may enable this through 
spark.shuffle.service.enabled.")
        }
+Index: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+IDEA additional info:
+Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
+<+>UTF-8
+===================================================================
+diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala        
(revision 8c2b3319c6734250ff9d72f3d7e5cab56b142195)
++++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala        
(revision 5e318f8fbeadec03986b3ad9cad4c1cda33c9ed2)
+@@ -2414,7 +2414,8 @@
+     // if the cluster manager explicitly tells us that the entire worker was 
lost, then
+     // we know to unregister shuffle output.  (Note that "worker" 
specifically refers to the process
+     // from a Standalone cluster, where the shuffle service lives in the 
Worker.)
+-    val fileLost = workerHost.isDefined || 
!env.blockManager.externalShuffleServiceEnabled
++    val fileLost = !Utils.isCelebornEnabled(sc.getConf) &&
++      (workerHost.isDefined || 
!env.blockManager.externalShuffleServiceEnabled)
+     removeExecutorAndUnregisterOutputs(
+       execId = execId,
+       fileLost = fileLost,
 Index: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
 IDEA additional info:
 Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
 <+>UTF-8
 ===================================================================
 diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
---- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala      
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala      
(date 1652323382069)
-@@ -1015,7 +1015,8 @@
+--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala      
(revision 8c2b3319c6734250ff9d72f3d7e5cab56b142195)
++++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala      
(revision 5e318f8fbeadec03986b3ad9cad4c1cda33c9ed2)
+@@ -1032,7 +1032,8 @@
      // and we are not using an external shuffle server which could serve the 
shuffle outputs.
      // The reason is the next stage wouldn't be able to fetch the data from 
this dead executor
      // so we would need to rerun these tasks on other executors.
@@ -64,23 +66,23 @@ diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
 +    if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled 
&& !isZombie &&
 +      !Utils.isCelebornEnabled(conf)) {
        for ((tid, info) <- taskInfos if info.executorId == execId) {
-         val index = taskInfos(tid).index
+         val index = info.index
          // We may have a running task whose partition has been marked as 
successful,
-Index: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+Index: core/src/main/scala/org/apache/spark/util/Utils.scala
 IDEA additional info:
 Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
 <+>UTF-8
 ===================================================================
-diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala        
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala        
(date 1652322695806)
-@@ -2231,7 +2231,8 @@
-     // if the cluster manager explicitly tells us that the entire worker was 
lost, then
-     // we know to unregister shuffle output.  (Note that "worker" 
specifically refers to the process
-     // from a Standalone cluster, where the shuffle service lives in the 
Worker.)
--    val fileLost = workerHost.isDefined || 
!env.blockManager.externalShuffleServiceEnabled
-+    val fileLost = !Utils.isCelebornEnabled(sc.getConf) &&
-+      (workerHost.isDefined || 
!env.blockManager.externalShuffleServiceEnabled)
-     removeExecutorAndUnregisterOutputs(
-       execId = execId,
-       fileLost = fileLost,
+diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
+--- a/core/src/main/scala/org/apache/spark/util/Utils.scala    (revision 
8c2b3319c6734250ff9d72f3d7e5cab56b142195)
++++ b/core/src/main/scala/org/apache/spark/util/Utils.scala    (revision 
5e318f8fbeadec03986b3ad9cad4c1cda33c9ed2)
+@@ -3246,6 +3246,9 @@
+     files.toSeq
+   }
+ 
++  def isCelebornEnabled(conf: SparkConf): Boolean =
++    conf.get("spark.shuffle.manager", "sort").contains("celeborn")
++
+   /**
+    * Return the median number of a long array
+    *

Reply via email to