This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new fbaa7417f [#1085] feat(spark): Add dynamic allocation patch for Spark
3.4 (#1225)
fbaa7417f is described below
commit fbaa7417f18840068b254dd70a3a6faa85e71044
Author: summaryzb <[email protected]>
AuthorDate: Sun Oct 8 01:17:35 2023 -0500
[#1085] feat(spark): Add dynamic allocation patch for Spark 3.4 (#1225)
### What changes were proposed in this pull request?
Add the dynamic allocation patch for Spark 3.4
### Why are the changes needed?
https://github.com/apache/incubator-uniffle/issues/1085
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test
---
README.md | 2 +-
.../spark-3.4.1_dynamic_allocation_support.patch | 91 ++++++++++++++++++++++
2 files changed, 92 insertions(+), 1 deletion(-)
diff --git a/README.md b/README.md
index dda6e8f0e..95e1c82f2 100644
--- a/README.md
+++ b/README.md
@@ -250,7 +250,7 @@ Deploy Steps:
### Support Spark dynamic allocation
To support spark dynamic allocation with Uniffle, spark code should be updated.
-There are 4 patches for spark (2.4.6/3.1.2/3.2.1/3.3.1) in patch/spark folder
for reference.
+There are 4 patches for spark (2.4.6/3.1.2/3.2.1/3.3.1/3.4.1) in patch/spark
folder for reference.
After apply the patch and rebuild spark, add following configuration in spark
conf to enable dynamic allocation:
```
diff --git a/patch/spark/spark-3.4.1_dynamic_allocation_support.patch
b/patch/spark/spark-3.4.1_dynamic_allocation_support.patch
new file mode 100644
index 000000000..2f79bba3b
--- /dev/null
+++ b/patch/spark/spark-3.4.1_dynamic_allocation_support.patch
@@ -0,0 +1,91 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+index f06312c15cf..899af5df485 100644
+--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
++++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+@@ -204,7 +204,9 @@ private[spark] class ExecutorAllocationManager(
+ s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be
> 0!")
+ }
+ if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
+- if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
++ if (conf.isRssEnable()) {
++ logInfo("Dynamic allocation will use remote shuffle service")
++ } else if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
+ logInfo("Dynamic allocation is enabled without a shuffle service.")
+ } else if (decommissionEnabled &&
+ conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) {
+diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala
b/core/src/main/scala/org/apache/spark/SparkConf.scala
+index 08344d8e547..ff3bab6710d 100644
+--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
++++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
+@@ -580,6 +580,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable
with Logging with Seria
+ Utils.redact(this, getAll).sorted.map { case (k, v) => k + "=" + v
}.mkString("\n")
+ }
+
++ /**
++ * Return true if remote shuffle service is enabled.
++ */
++ def isRssEnable(): Boolean = get("spark.shuffle.manager",
"sort").contains("RssShuffleManager")
+ }
+
+ private[spark] object SparkConf extends Logging {
+diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+index 2a966fab6f0..89cfdfe8082 100644
+--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
++++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+@@ -2515,7 +2515,8 @@ private[spark] class DAGScheduler(
+ // if the cluster manager explicitly tells us that the entire worker was
lost, then
+ // we know to unregister shuffle output. (Note that "worker"
specifically refers to the process
+ // from a Standalone cluster, where the shuffle service lives in the
Worker.)
+- val fileLost = workerHost.isDefined ||
!env.blockManager.externalShuffleServiceEnabled
++ val fileLost = (workerHost.isDefined ||
!env.blockManager.externalShuffleServiceEnabled) &&
++ !sc.getConf.isRssEnable()
+ removeExecutorAndUnregisterOutputs(
+ execId = execId,
+ fileLost = fileLost,
+diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+index 124a27502fe..38e6c9bca21 100644
+--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
++++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+@@ -1053,7 +1053,7 @@ private[spark] class TaskSetManager(
+ // could serve the shuffle outputs or the executor lost is caused by
decommission (which
+ // can destroy the whole host). The reason is the next stage wouldn't be
able to fetch the
+ // data from this dead executor so we would need to rerun these tasks on
other executors.
+- val maybeShuffleMapOutputLoss = isShuffleMapTasks &&
++ val maybeShuffleMapOutputLoss = isShuffleMapTasks && !conf.isRssEnable()
&&
+ (reason.isInstanceOf[ExecutorDecommission] ||
!env.blockManager.externalShuffleServiceEnabled)
+ if (maybeShuffleMapOutputLoss && !isZombie) {
+ for ((tid, info) <- taskInfos if info.executorId == execId) {
+diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
+index 367732dbb20..ad42e827271 100644
+---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
++++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
+@@ -166,6 +166,9 @@ class ShuffledRowRDD(
+ }
+
+ override def getPreferredLocations(partition: Partition): Seq[String] = {
++ if (conf.isRssEnable()) {
++ return Nil
++ }
+ val tracker =
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+ partition.asInstanceOf[ShuffledRowRDDPartition].spec match {
+ case CoalescedPartitionSpec(startReducerIndex, endReducerIndex, _) =>
+--
+2.39.3 (Apple Git-145)
+