This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new c7c2f6a35 [CELEBORN-858] Generate patch to each Spark 3.x minor version
c7c2f6a35 is described below
commit c7c2f6a35ae0bf5119df5e87ef7c389a09886e59
Author: Luke Yan <[email protected]>
AuthorDate: Fri Nov 10 15:35:54 2023 +0800
[CELEBORN-858] Generate patch to each Spark 3.x minor version
### What changes were proposed in this pull request?
Add the following patch files in directory
`incubator-celeborn/tree/spark3-patch/assets/spark-patch` :
1. Celeborn_Dynamic_Allocation_spark3_0.patch
2. Celeborn_Dynamic_Allocation_spark3_1.patch
3. Celeborn_Dynamic_Allocation_spark3_2.patch
4. Celeborn_Dynamic_Allocation_spark3_3.patch
Delete a patch at the same time:
1. Celeborn_Dynamic_Allocation_spark3.patch
Modified `Support Spark Dynamic Allocation` in incubator-celeborn/README.md
:

### Why are the changes needed?
Convenient for customers to apply patches in Spark 3.X for `Support Spark
Dynamic Allocation`
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
yes. All patch files can be applied to the corresponding version of spark
source code through `git apply` without any code conflicts.
Closes #2085 from lukeyan2023/spark3-patch.
Authored-by: Luke Yan <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
README.md | 5 +-
... => Celeborn_Dynamic_Allocation_spark3_0.patch} | 89 +++++++++++-----------
... => Celeborn_Dynamic_Allocation_spark3_1.patch} | 81 +++++++++++---------
... => Celeborn_Dynamic_Allocation_spark3_2.patch} | 0
... => Celeborn_Dynamic_Allocation_spark3_3.patch} | 86 +++++++++++----------
5 files changed, 136 insertions(+), 125 deletions(-)
diff --git a/README.md b/README.md
index e1d06d177..c88cb05c3 100644
--- a/README.md
+++ b/README.md
@@ -333,7 +333,10 @@ for more details.
For Spark versions < 3.5.0, we provide a patch to enable users to use Spark
with DRA and Celeborn.
- For Spark 2.x check [Spark2
Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark2.patch).
-- For Spark 3.0-3.3 check [Spark3
Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch).
+- For Spark 3.0 check [Spark3.0
Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_0.patch).
+- For Spark 3.1 check [Spark3.1
Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_1.patch).
+- For Spark 3.2 check [Spark3.2
Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_2.patch).
+- For Spark 3.3 check [Spark3.3
Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_3.patch).
- For Spark 3.4 check [Spark3.4
Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_4.patch).
### Metrics
diff --git a/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_0.patch
similarity index 76%
copy from assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
copy to assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_0.patch
index 5cebe83c7..59081a2e8 100644
--- a/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
+++ b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_0.patch
@@ -13,74 +13,75 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-Index: core/src/main/scala/org/apache/spark/util/Utils.scala
-IDEA additional info:
-Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
-<+>UTF-8
-===================================================================
-diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
---- a/core/src/main/scala/org/apache/spark/util/Utils.scala (revision
d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala (date
1652322540231)
-@@ -3222,6 +3222,9 @@
- }
- files.toSeq
- }
-+
-+ def isCelebornEnabled(conf: SparkConf): Boolean =
-+ conf.get("spark.shuffle.manager", "sort").contains("celeborn")
- }
-
- private[util] object CallerContext extends Logging {
+Subject: [PATCH] [CORE][SHUFFLE] Support enabling DRA with Apache Celeborn
+---
Index: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
---- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
(date 1652322692344)
-@@ -211,7 +211,7 @@
- (decommissionEnabled &&
- conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
+--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
(revision 65ac1e75dc468f53fc778cd2ce1ba3f21067aab8)
++++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
(revision 1f5cb2ec9e9652d03c9775954b69a708b5d05ab3)
+@@ -198,7 +198,7 @@
+ if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
+ if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
logWarning("Dynamic allocation without a shuffle service is an
experimental feature.")
- } else if (!testing) {
+ } else if (!testing && !Utils.isCelebornEnabled(conf)) {
throw new SparkException("Dynamic allocation of executors requires
the external " +
"shuffle service. You may enable this through
spark.shuffle.service.enabled.")
}
+Index: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+IDEA additional info:
+Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
+<+>UTF-8
+===================================================================
+diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
(revision 65ac1e75dc468f53fc778cd2ce1ba3f21067aab8)
++++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
(revision 1f5cb2ec9e9652d03c9775954b69a708b5d05ab3)
+@@ -1851,7 +1851,8 @@
+ // if the cluster manager explicitly tells us that the entire worker was
lost, then
+ // we know to unregister shuffle output. (Note that "worker"
specifically refers to the process
+ // from a Standalone cluster, where the shuffle service lives in the
Worker.)
+- val fileLost = workerLost ||
!env.blockManager.externalShuffleServiceEnabled
++ val fileLost = !Utils.isCelebornEnabled(sc.getConf) &&
++ (workerLost || !env.blockManager.externalShuffleServiceEnabled)
+ removeExecutorAndUnregisterOutputs(
+ execId = execId,
+ fileLost = fileLost,
Index: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
---- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
(date 1652323382069)
-@@ -1015,7 +1015,8 @@
- // and we are not using an external shuffle server which could serve the
shuffle outputs.
+--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
(revision 65ac1e75dc468f53fc778cd2ce1ba3f21067aab8)
++++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
(revision 1f5cb2ec9e9652d03c9775954b69a708b5d05ab3)
+@@ -944,7 +944,7 @@
// The reason is the next stage wouldn't be able to fetch the data from
this dead executor
// so we would need to rerun these tasks on other executors.
-- if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled
&& !isZombie) {
-+ if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled
&& !isZombie &&
-+ !Utils.isCelebornEnabled(conf)) {
+ if (tasks(0).isInstanceOf[ShuffleMapTask] &&
!env.blockManager.externalShuffleServiceEnabled
+- && !isZombie) {
++ && !isZombie && !Utils.isCelebornEnabled(conf)) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
// We may have a running task whose partition has been marked as
successful,
-Index: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+Index: core/src/main/scala/org/apache/spark/util/Utils.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
-diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
(date 1652322695806)
-@@ -2231,7 +2231,8 @@
- // if the cluster manager explicitly tells us that the entire worker was
lost, then
- // we know to unregister shuffle output. (Note that "worker"
specifically refers to the process
- // from a Standalone cluster, where the shuffle service lives in the
Worker.)
-- val fileLost = workerHost.isDefined ||
!env.blockManager.externalShuffleServiceEnabled
-+ val fileLost = !Utils.isCelebornEnabled(sc.getConf) &&
-+ (workerHost.isDefined ||
!env.blockManager.externalShuffleServiceEnabled)
- removeExecutorAndUnregisterOutputs(
- execId = execId,
- fileLost = fileLost,
+diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
+--- a/core/src/main/scala/org/apache/spark/util/Utils.scala (revision
65ac1e75dc468f53fc778cd2ce1ba3f21067aab8)
++++ b/core/src/main/scala/org/apache/spark/util/Utils.scala (revision
1f5cb2ec9e9652d03c9775954b69a708b5d05ab3)
+@@ -2934,6 +2934,9 @@
+ props.forEach((k, v) => resultProps.put(k, v))
+ resultProps
+ }
++
++ def isCelebornEnabled(conf: SparkConf): Boolean =
++ conf.get("spark.shuffle.manager", "sort").contains("celeborn")
+ }
+
+ private[util] object CallerContext extends Logging {
diff --git a/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_1.patch
similarity index 86%
copy from assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
copy to assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_1.patch
index 5cebe83c7..8d9bfd6a0 100644
--- a/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
+++ b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_1.patch
@@ -13,33 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-Index: core/src/main/scala/org/apache/spark/util/Utils.scala
-IDEA additional info:
-Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
-<+>UTF-8
-===================================================================
-diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
---- a/core/src/main/scala/org/apache/spark/util/Utils.scala (revision
d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala (date
1652322540231)
-@@ -3222,6 +3222,9 @@
- }
- files.toSeq
- }
-+
-+ def isCelebornEnabled(conf: SparkConf): Boolean =
-+ conf.get("spark.shuffle.manager", "sort").contains("celeborn")
- }
-
- private[util] object CallerContext extends Logging {
+Subject: [PATCH] [CORE][SHUFFLE] Support enabling DRA with Apache Celeborn
+---
Index: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
---- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
(date 1652322692344)
-@@ -211,7 +211,7 @@
+--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
(revision d1f8a503a26bcfb4e466d9accc5fa241a7933667)
++++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
(revision c8609fa08534e1cc6fd377b3573e5e65ddc79f15)
+@@ -210,7 +210,7 @@
(decommissionEnabled &&
conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
logWarning("Dynamic allocation without a shuffle service is an
experimental feature.")
@@ -48,15 +32,33 @@ diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scal
throw new SparkException("Dynamic allocation of executors requires
the external " +
"shuffle service. You may enable this through
spark.shuffle.service.enabled.")
}
+Index: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+IDEA additional info:
+Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
+<+>UTF-8
+===================================================================
+diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
(revision d1f8a503a26bcfb4e466d9accc5fa241a7933667)
++++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
(revision c8609fa08534e1cc6fd377b3573e5e65ddc79f15)
+@@ -2080,7 +2080,8 @@
+ // if the cluster manager explicitly tells us that the entire worker was
lost, then
+ // we know to unregister shuffle output. (Note that "worker"
specifically refers to the process
+ // from a Standalone cluster, where the shuffle service lives in the
Worker.)
+- val fileLost = workerHost.isDefined ||
!env.blockManager.externalShuffleServiceEnabled
++ val fileLost = !Utils.isCelebornEnabled(sc.getConf) &&
++ (workerHost.isDefined ||
!env.blockManager.externalShuffleServiceEnabled)
+ removeExecutorAndUnregisterOutputs(
+ execId = execId,
+ fileLost = fileLost,
Index: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
---- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
(date 1652323382069)
-@@ -1015,7 +1015,8 @@
+--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
(revision d1f8a503a26bcfb4e466d9accc5fa241a7933667)
++++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
(revision c8609fa08534e1cc6fd377b3573e5e65ddc79f15)
+@@ -973,7 +973,8 @@
// and we are not using an external shuffle server which could serve the
shuffle outputs.
// The reason is the next stage wouldn't be able to fetch the data from
this dead executor
// so we would need to rerun these tasks on other executors.
@@ -66,21 +68,24 @@ diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
// We may have a running task whose partition has been marked as
successful,
-Index: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+Index: core/src/main/scala/org/apache/spark/util/Utils.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
-diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
(date 1652322695806)
-@@ -2231,7 +2231,8 @@
- // if the cluster manager explicitly tells us that the entire worker was
lost, then
- // we know to unregister shuffle output. (Note that "worker"
specifically refers to the process
- // from a Standalone cluster, where the shuffle service lives in the
Worker.)
-- val fileLost = workerHost.isDefined ||
!env.blockManager.externalShuffleServiceEnabled
-+ val fileLost = !Utils.isCelebornEnabled(sc.getConf) &&
-+ (workerHost.isDefined ||
!env.blockManager.externalShuffleServiceEnabled)
- removeExecutorAndUnregisterOutputs(
- execId = execId,
- fileLost = fileLost,
+diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
+--- a/core/src/main/scala/org/apache/spark/util/Utils.scala (revision
d1f8a503a26bcfb4e466d9accc5fa241a7933667)
++++ b/core/src/main/scala/org/apache/spark/util/Utils.scala (revision
c8609fa08534e1cc6fd377b3573e5e65ddc79f15)
+@@ -3057,7 +3057,12 @@
+ 0
+ }
+ }
++
++ def isCelebornEnabled(conf: SparkConf): Boolean =
++ conf.get("spark.shuffle.manager", "sort").contains("celeborn")
+ }
++
++
+
+ private[util] object CallerContext extends Logging {
+ val callerContextSupported: Boolean = {
diff --git a/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_2.patch
similarity index 100%
copy from assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
copy to assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_2.patch
diff --git a/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_3.patch
similarity index 81%
rename from assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
rename to assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_3.patch
index 5cebe83c7..e998e2e5f 100644
--- a/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch
+++ b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_3.patch
@@ -13,50 +13,52 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-Index: core/src/main/scala/org/apache/spark/util/Utils.scala
-IDEA additional info:
-Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
-<+>UTF-8
-===================================================================
-diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
---- a/core/src/main/scala/org/apache/spark/util/Utils.scala (revision
d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala (date
1652322540231)
-@@ -3222,6 +3222,9 @@
- }
- files.toSeq
- }
-+
-+ def isCelebornEnabled(conf: SparkConf): Boolean =
-+ conf.get("spark.shuffle.manager", "sort").contains("celeborn")
- }
-
- private[util] object CallerContext extends Logging {
+Subject: [PATCH] [CORE][SHUFFLE] Support enabling DRA with Apache Celeborn
+---
Index: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
---- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
(date 1652322692344)
-@@ -211,7 +211,7 @@
- (decommissionEnabled &&
- conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
- logWarning("Dynamic allocation without a shuffle service is an
experimental feature.")
+--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
(revision 8c2b3319c6734250ff9d72f3d7e5cab56b142195)
++++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
(revision 5e318f8fbeadec03986b3ad9cad4c1cda33c9ed2)
+@@ -209,7 +209,7 @@
+ } else if (decommissionEnabled &&
+ conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) {
+ logInfo("Shuffle data decommission is enabled without a shuffle
service.")
- } else if (!testing) {
+ } else if (!testing && !Utils.isCelebornEnabled(conf)) {
throw new SparkException("Dynamic allocation of executors requires
the external " +
"shuffle service. You may enable this through
spark.shuffle.service.enabled.")
}
+Index: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+IDEA additional info:
+Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
+<+>UTF-8
+===================================================================
+diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
(revision 8c2b3319c6734250ff9d72f3d7e5cab56b142195)
++++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
(revision 5e318f8fbeadec03986b3ad9cad4c1cda33c9ed2)
+@@ -2414,7 +2414,8 @@
+ // if the cluster manager explicitly tells us that the entire worker was
lost, then
+ // we know to unregister shuffle output. (Note that "worker"
specifically refers to the process
+ // from a Standalone cluster, where the shuffle service lives in the
Worker.)
+- val fileLost = workerHost.isDefined ||
!env.blockManager.externalShuffleServiceEnabled
++ val fileLost = !Utils.isCelebornEnabled(sc.getConf) &&
++ (workerHost.isDefined ||
!env.blockManager.externalShuffleServiceEnabled)
+ removeExecutorAndUnregisterOutputs(
+ execId = execId,
+ fileLost = fileLost,
Index: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
---- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
(date 1652323382069)
-@@ -1015,7 +1015,8 @@
+--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
(revision 8c2b3319c6734250ff9d72f3d7e5cab56b142195)
++++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
(revision 5e318f8fbeadec03986b3ad9cad4c1cda33c9ed2)
+@@ -1032,7 +1032,8 @@
// and we are not using an external shuffle server which could serve the
shuffle outputs.
// The reason is the next stage wouldn't be able to fetch the data from
this dead executor
// so we would need to rerun these tasks on other executors.
@@ -64,23 +66,23 @@ diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+ if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled
&& !isZombie &&
+ !Utils.isCelebornEnabled(conf)) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
- val index = taskInfos(tid).index
+ val index = info.index
// We may have a running task whose partition has been marked as
successful,
-Index: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+Index: core/src/main/scala/org/apache/spark/util/Utils.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
-diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
(date 1652322695806)
-@@ -2231,7 +2231,8 @@
- // if the cluster manager explicitly tells us that the entire worker was
lost, then
- // we know to unregister shuffle output. (Note that "worker"
specifically refers to the process
- // from a Standalone cluster, where the shuffle service lives in the
Worker.)
-- val fileLost = workerHost.isDefined ||
!env.blockManager.externalShuffleServiceEnabled
-+ val fileLost = !Utils.isCelebornEnabled(sc.getConf) &&
-+ (workerHost.isDefined ||
!env.blockManager.externalShuffleServiceEnabled)
- removeExecutorAndUnregisterOutputs(
- execId = execId,
- fileLost = fileLost,
+diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
+--- a/core/src/main/scala/org/apache/spark/util/Utils.scala (revision
8c2b3319c6734250ff9d72f3d7e5cab56b142195)
++++ b/core/src/main/scala/org/apache/spark/util/Utils.scala (revision
5e318f8fbeadec03986b3ad9cad4c1cda33c9ed2)
+@@ -3246,6 +3246,9 @@
+ files.toSeq
+ }
+
++ def isCelebornEnabled(conf: SparkConf): Boolean =
++ conf.get("spark.shuffle.manager", "sort").contains("celeborn")
++
+ /**
+ * Return the median number of a long array
+ *