This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 1e5f93912 [CELEBORN-753] Rename spark patch file name to make it more 
clear
1e5f93912 is described below

commit 1e5f93912eb9193bc3113953705521973d84ab24
Author: Angerszhuuuu <[email protected]>
AuthorDate: Fri Jun 30 11:41:12 2023 +0800

    [CELEBORN-753] Rename spark patch file name to make it more clear
    
    ### What changes were proposed in this pull request?
    Rename spark patch file name to make it more clear
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1666 from AngersZhuuuu/CELEBORN-753.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: mingji <[email protected]>
    (cherry picked from commit 6e35745736f7248ac4782958d79967c934f512df)
    Signed-off-by: mingji <[email protected]>
---
 README.md                                                         | 6 +++---
 ...huffle_spark3.patch => Celeborn_Columnar_Shuffle_spark3.patch} | 0
 ..._DRA_spark2.patch => Celeborn_Dynamic_Allocation_spark2.patch} | 8 ++++----
 ..._DRA_spark3.patch => Celeborn_Dynamic_Allocation_spark3.patch} | 8 ++++----
 ..._spark3_4.patch => Celeborn_Dynamic_Allocation_spark3_4.patch} | 8 ++++----
 5 files changed, 15 insertions(+), 15 deletions(-)

diff --git a/README.md b/README.md
index ecb2bbd2b..764c1b33a 100644
--- a/README.md
+++ b/README.md
@@ -295,9 +295,9 @@ See more detail in [CONFIGURATIONS](docs/configuration.md)
 
 ### Support Spark Dynamic Allocation
 We provide a patch to enable users to use Spark with both Dynamic Resource 
Allocation(DRA) and Celeborn.
-For Spark2.x check [Spark2 Patch](assets/spark-patch/RSS_DRA_spark2.patch).  
-For Spark3.x check [Spark3 Patch](assets/spark-patch/RSS_DRA_spark3.patch).
-For Spark3.4 check [Spark3 Patch](assets/spark-patch/RSS_DRA_spark3_4.patch).
+For Spark2.x check [Spark2 
Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark2.patch).  
+For Spark3.x check [Spark3 
Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch).
+For Spark3.4 check [Spark3 
Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_4.patch).
 
 ### Metrics
 Celeborn has various metrics. [METRICS](METRICS.md)
diff --git a/assets/spark-patch/RSS_Columnar_Shuffle_spark3.patch 
b/assets/spark-patch/Celeborn_Columnar_Shuffle_spark3.patch
similarity index 100%
rename from assets/spark-patch/RSS_Columnar_Shuffle_spark3.patch
rename to assets/spark-patch/Celeborn_Columnar_Shuffle_spark3.patch
diff --git a/assets/spark-patch/RSS_DRA_spark2.patch 
b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark2.patch
similarity index 95%
rename from assets/spark-patch/RSS_DRA_spark2.patch
rename to assets/spark-patch/Celeborn_Dynamic_Allocation_spark2.patch
index b612511de..325f1211f 100644
--- a/assets/spark-patch/RSS_DRA_spark2.patch
+++ b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark2.patch
@@ -40,7 +40,7 @@ diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scal
 -      throw new SparkException("Dynamic allocation of executors requires the 
external " +
 -        "shuffle service. You may enable this through 
spark.shuffle.service.enabled.")
 +    if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) &&
-+      !Utils.isRssEnabled(conf) && !testing) {
++      !Utils.isCelebornEnabled(conf) && !testing) {
 +      throw new SparkException("Dynamic allocation of executors requires the 
external or remote " +
 +        "shuffle service. You may enable this through 
spark.shuffle.service.enabled or " +
 +        "set 
spark.shuffle.manager=org.apache.spark.shuffle.celeborn.RssShuffleManager.")
@@ -60,7 +60,7 @@ diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b
      // we know to unregister shuffle output.  (Note that "worker" 
specifically refers to the process
      // from a Standalone cluster, where the shuffle service lives in the 
Worker.)
 -    val fileLost = workerLost || 
!env.blockManager.externalShuffleServiceEnabled
-+    val fileLost = !Utils.isRssEnabled(sc.getConf) && (workerLost || 
!env.blockManager.externalShuffleServiceEnabled)
++    val fileLost = !Utils.isCelebornEnabled(sc.getConf) && (workerLost || 
!env.blockManager.externalShuffleServiceEnabled)
      removeExecutorAndUnregisterOutputs(
        execId = execId,
        fileLost = fileLost,
@@ -80,7 +80,7 @@ diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
 -        && !isZombie) {
 +    if (tasks(0).isInstanceOf[ShuffleMapTask]
 +      && !env.blockManager.externalShuffleServiceEnabled
-+      && !Utils.isRssEnabled(conf)
++      && !Utils.isCelebornEnabled(conf)
 +      && !isZombie) {
        for ((tid, info) <- taskInfos if info.executorId == execId) {
          val index = taskInfos(tid).index
@@ -98,7 +98,7 @@ diff --git 
a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/ma
      resultProps
    }
 +
-+  def isRssEnabled(conf: SparkConf): Boolean =
++  def isCelebornEnabled(conf: SparkConf): Boolean =
 +    conf.get("spark.shuffle.manager", "sort").contains("celeborn")
 +
  }
diff --git a/assets/spark-patch/RSS_DRA_spark3.patch 
b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
similarity index 95%
rename from assets/spark-patch/RSS_DRA_spark3.patch
rename to assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
index d07bcb46c..5cebe83c7 100644
--- a/assets/spark-patch/RSS_DRA_spark3.patch
+++ b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
@@ -26,7 +26,7 @@ diff --git 
a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/ma
      files.toSeq
    }
 +
-+  def isRssEnabled(conf: SparkConf): Boolean =
++  def isCelebornEnabled(conf: SparkConf): Boolean =
 +    conf.get("spark.shuffle.manager", "sort").contains("celeborn")
  }
  
@@ -44,7 +44,7 @@ diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scal
              conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
          logWarning("Dynamic allocation without a shuffle service is an 
experimental feature.")
 -      } else if (!testing) {
-+      } else if (!testing && !Utils.isRssEnabled(conf)) {
++      } else if (!testing && !Utils.isCelebornEnabled(conf)) {
          throw new SparkException("Dynamic allocation of executors requires 
the external " +
            "shuffle service. You may enable this through 
spark.shuffle.service.enabled.")
        }
@@ -62,7 +62,7 @@ diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
      // so we would need to rerun these tasks on other executors.
 -    if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled 
&& !isZombie) {
 +    if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled 
&& !isZombie &&
-+      !Utils.isRssEnabled(conf)) {
++      !Utils.isCelebornEnabled(conf)) {
        for ((tid, info) <- taskInfos if info.executorId == execId) {
          val index = taskInfos(tid).index
          // We may have a running task whose partition has been marked as 
successful,
@@ -79,7 +79,7 @@ diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b
      // we know to unregister shuffle output.  (Note that "worker" 
specifically refers to the process
      // from a Standalone cluster, where the shuffle service lives in the 
Worker.)
 -    val fileLost = workerHost.isDefined || 
!env.blockManager.externalShuffleServiceEnabled
-+    val fileLost = !Utils.isRssEnabled(sc.getConf) &&
++    val fileLost = !Utils.isCelebornEnabled(sc.getConf) &&
 +      (workerHost.isDefined || 
!env.blockManager.externalShuffleServiceEnabled)
      removeExecutorAndUnregisterOutputs(
        execId = execId,
diff --git a/assets/spark-patch/RSS_DRA_spark3_4.patch 
b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_4.patch
similarity index 93%
rename from assets/spark-patch/RSS_DRA_spark3_4.patch
rename to assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_4.patch
index 9189fd2d9..38c36ba45 100644
--- a/assets/spark-patch/RSS_DRA_spark3_4.patch
+++ b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_4.patch
@@ -28,7 +28,7 @@ diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scal
            conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) {
          logInfo("Shuffle data decommission is enabled without a shuffle 
service.")
 -      } else if (!testing) {
-+      } else if (!testing && !Utils.isRssEnabled(conf)) {
++      } else if (!testing && !Utils.isCelebornEnabled(conf)) {
          throw new SparkException("Dynamic allocation of executors requires 
the external " +
            "shuffle service. You may enable this through 
spark.shuffle.service.enabled.")
        }
@@ -45,7 +45,7 @@ diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b
      // we know to unregister shuffle output.  (Note that "worker" 
specifically refers to the process
      // from a Standalone cluster, where the shuffle service lives in the 
Worker.)
 -    val fileLost = workerHost.isDefined || 
!env.blockManager.externalShuffleServiceEnabled
-+    val fileLost = !Utils.isRssEnabled(sc.getConf) && (workerHost.isDefined 
|| !env.blockManager.externalShuffleServiceEnabled)
++    val fileLost = !Utils.isCelebornEnabled(sc.getConf) && 
(workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled)
      removeExecutorAndUnregisterOutputs(
        execId = execId,
        fileLost = fileLost,
@@ -61,7 +61,7 @@ diff --git 
a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/ma
      files.toSeq
    }
 
-+  def isRssEnabled(conf: SparkConf): Boolean =
++  def isCelebornEnabled(conf: SparkConf): Boolean =
 +    conf.get("spark.shuffle.manager", "sort").contains("celeborn")
 +
    /**
@@ -80,7 +80,7 @@ diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
      val maybeShuffleMapOutputLoss = isShuffleMapTasks &&
        (reason.isInstanceOf[ExecutorDecommission] || 
!env.blockManager.externalShuffleServiceEnabled)
 -    if (maybeShuffleMapOutputLoss && !isZombie) {
-+    if (maybeShuffleMapOutputLoss && !isZombie && !Utils.isRssEnabled(conf)) {
++    if (maybeShuffleMapOutputLoss && !isZombie && 
!Utils.isCelebornEnabled(conf)) {
        for ((tid, info) <- taskInfos if info.executorId == execId) {
          val index = info.index
          lazy val isShuffleMapOutputAvailable = reason match {

Reply via email to