This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 1e5f93912 [CELEBORN-753] Rename spark patch file name to make it more
clear
1e5f93912 is described below
commit 1e5f93912eb9193bc3113953705521973d84ab24
Author: Angerszhuuuu <[email protected]>
AuthorDate: Fri Jun 30 11:41:12 2023 +0800
[CELEBORN-753] Rename spark patch file name to make it more clear
### What changes were proposed in this pull request?
Rename spark patch file name to make it more clear
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1666 from AngersZhuuuu/CELEBORN-753.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit 6e35745736f7248ac4782958d79967c934f512df)
Signed-off-by: mingji <[email protected]>
---
README.md | 6 +++---
...huffle_spark3.patch => Celeborn_Columnar_Shuffle_spark3.patch} | 0
..._DRA_spark2.patch => Celeborn_Dynamic_Allocation_spark2.patch} | 8 ++++----
..._DRA_spark3.patch => Celeborn_Dynamic_Allocation_spark3.patch} | 8 ++++----
..._spark3_4.patch => Celeborn_Dynamic_Allocation_spark3_4.patch} | 8 ++++----
5 files changed, 15 insertions(+), 15 deletions(-)
diff --git a/README.md b/README.md
index ecb2bbd2b..764c1b33a 100644
--- a/README.md
+++ b/README.md
@@ -295,9 +295,9 @@ See more detail in [CONFIGURATIONS](docs/configuration.md)
### Support Spark Dynamic Allocation
We provide a patch to enable users to use Spark with both Dynamic Resource
Allocation(DRA) and Celeborn.
-For Spark2.x check [Spark2 Patch](assets/spark-patch/RSS_DRA_spark2.patch).
-For Spark3.x check [Spark3 Patch](assets/spark-patch/RSS_DRA_spark3.patch).
-For Spark3.4 check [Spark3 Patch](assets/spark-patch/RSS_DRA_spark3_4.patch).
+For Spark2.x check [Spark2
Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark2.patch).
+For Spark3.x check [Spark3
Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch).
+For Spark3.4 check [Spark3
Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_4.patch).
### Metrics
Celeborn has various metrics. [METRICS](METRICS.md)
diff --git a/assets/spark-patch/RSS_Columnar_Shuffle_spark3.patch
b/assets/spark-patch/Celeborn_Columnar_Shuffle_spark3.patch
similarity index 100%
rename from assets/spark-patch/RSS_Columnar_Shuffle_spark3.patch
rename to assets/spark-patch/Celeborn_Columnar_Shuffle_spark3.patch
diff --git a/assets/spark-patch/RSS_DRA_spark2.patch
b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark2.patch
similarity index 95%
rename from assets/spark-patch/RSS_DRA_spark2.patch
rename to assets/spark-patch/Celeborn_Dynamic_Allocation_spark2.patch
index b612511de..325f1211f 100644
--- a/assets/spark-patch/RSS_DRA_spark2.patch
+++ b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark2.patch
@@ -40,7 +40,7 @@ diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scal
- throw new SparkException("Dynamic allocation of executors requires the
external " +
- "shuffle service. You may enable this through
spark.shuffle.service.enabled.")
+ if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) &&
-+ !Utils.isRssEnabled(conf) && !testing) {
++ !Utils.isCelebornEnabled(conf) && !testing) {
+ throw new SparkException("Dynamic allocation of executors requires the
external or remote " +
+ "shuffle service. You may enable this through
spark.shuffle.service.enabled or " +
+ "set
spark.shuffle.manager=org.apache.spark.shuffle.celeborn.RssShuffleManager.")
@@ -60,7 +60,7 @@ diff --git
a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b
// we know to unregister shuffle output. (Note that "worker"
specifically refers to the process
// from a Standalone cluster, where the shuffle service lives in the
Worker.)
- val fileLost = workerLost ||
!env.blockManager.externalShuffleServiceEnabled
-+ val fileLost = !Utils.isRssEnabled(sc.getConf) && (workerLost ||
!env.blockManager.externalShuffleServiceEnabled)
++ val fileLost = !Utils.isCelebornEnabled(sc.getConf) && (workerLost ||
!env.blockManager.externalShuffleServiceEnabled)
removeExecutorAndUnregisterOutputs(
execId = execId,
fileLost = fileLost,
@@ -80,7 +80,7 @@ diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
- && !isZombie) {
+ if (tasks(0).isInstanceOf[ShuffleMapTask]
+ && !env.blockManager.externalShuffleServiceEnabled
-+ && !Utils.isRssEnabled(conf)
++ && !Utils.isCelebornEnabled(conf)
+ && !isZombie) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
@@ -98,7 +98,7 @@ diff --git
a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/ma
resultProps
}
+
-+ def isRssEnabled(conf: SparkConf): Boolean =
++ def isCelebornEnabled(conf: SparkConf): Boolean =
+ conf.get("spark.shuffle.manager", "sort").contains("celeborn")
+
}
diff --git a/assets/spark-patch/RSS_DRA_spark3.patch
b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
similarity index 95%
rename from assets/spark-patch/RSS_DRA_spark3.patch
rename to assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
index d07bcb46c..5cebe83c7 100644
--- a/assets/spark-patch/RSS_DRA_spark3.patch
+++ b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
@@ -26,7 +26,7 @@ diff --git
a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/ma
files.toSeq
}
+
-+ def isRssEnabled(conf: SparkConf): Boolean =
++ def isCelebornEnabled(conf: SparkConf): Boolean =
+ conf.get("spark.shuffle.manager", "sort").contains("celeborn")
}
@@ -44,7 +44,7 @@ diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scal
conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
logWarning("Dynamic allocation without a shuffle service is an
experimental feature.")
- } else if (!testing) {
-+ } else if (!testing && !Utils.isRssEnabled(conf)) {
++ } else if (!testing && !Utils.isCelebornEnabled(conf)) {
throw new SparkException("Dynamic allocation of executors requires
the external " +
"shuffle service. You may enable this through
spark.shuffle.service.enabled.")
}
@@ -62,7 +62,7 @@ diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
// so we would need to rerun these tasks on other executors.
- if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled
&& !isZombie) {
+ if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled
&& !isZombie &&
-+ !Utils.isRssEnabled(conf)) {
++ !Utils.isCelebornEnabled(conf)) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
// We may have a running task whose partition has been marked as
successful,
@@ -79,7 +79,7 @@ diff --git
a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b
// we know to unregister shuffle output. (Note that "worker"
specifically refers to the process
// from a Standalone cluster, where the shuffle service lives in the
Worker.)
- val fileLost = workerHost.isDefined ||
!env.blockManager.externalShuffleServiceEnabled
-+ val fileLost = !Utils.isRssEnabled(sc.getConf) &&
++ val fileLost = !Utils.isCelebornEnabled(sc.getConf) &&
+ (workerHost.isDefined ||
!env.blockManager.externalShuffleServiceEnabled)
removeExecutorAndUnregisterOutputs(
execId = execId,
diff --git a/assets/spark-patch/RSS_DRA_spark3_4.patch
b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_4.patch
similarity index 93%
rename from assets/spark-patch/RSS_DRA_spark3_4.patch
rename to assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_4.patch
index 9189fd2d9..38c36ba45 100644
--- a/assets/spark-patch/RSS_DRA_spark3_4.patch
+++ b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_4.patch
@@ -28,7 +28,7 @@ diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scal
conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) {
logInfo("Shuffle data decommission is enabled without a shuffle
service.")
- } else if (!testing) {
-+ } else if (!testing && !Utils.isRssEnabled(conf)) {
++ } else if (!testing && !Utils.isCelebornEnabled(conf)) {
throw new SparkException("Dynamic allocation of executors requires
the external " +
"shuffle service. You may enable this through
spark.shuffle.service.enabled.")
}
@@ -45,7 +45,7 @@ diff --git
a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b
// we know to unregister shuffle output. (Note that "worker"
specifically refers to the process
// from a Standalone cluster, where the shuffle service lives in the
Worker.)
- val fileLost = workerHost.isDefined ||
!env.blockManager.externalShuffleServiceEnabled
-+ val fileLost = !Utils.isRssEnabled(sc.getConf) && (workerHost.isDefined
|| !env.blockManager.externalShuffleServiceEnabled)
++ val fileLost = !Utils.isCelebornEnabled(sc.getConf) &&
(workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled)
removeExecutorAndUnregisterOutputs(
execId = execId,
fileLost = fileLost,
@@ -61,7 +61,7 @@ diff --git
a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/ma
files.toSeq
}
-+ def isRssEnabled(conf: SparkConf): Boolean =
++ def isCelebornEnabled(conf: SparkConf): Boolean =
+ conf.get("spark.shuffle.manager", "sort").contains("celeborn")
+
/**
@@ -80,7 +80,7 @@ diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
val maybeShuffleMapOutputLoss = isShuffleMapTasks &&
(reason.isInstanceOf[ExecutorDecommission] ||
!env.blockManager.externalShuffleServiceEnabled)
- if (maybeShuffleMapOutputLoss && !isZombie) {
-+ if (maybeShuffleMapOutputLoss && !isZombie && !Utils.isRssEnabled(conf)) {
++ if (maybeShuffleMapOutputLoss && !isZombie &&
!Utils.isCelebornEnabled(conf)) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = info.index
lazy val isShuffleMapOutputAvailable = reason match {