This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 81844b0b0 [#1084] feat: Add dynamic allocation patch for Spark 3.3 
(#1224)
81844b0b0 is described below

commit 81844b0b08842428ca4338ea73e056575f30f443
Author: summaryzb <[email protected]>
AuthorDate: Fri Oct 6 20:37:47 2023 -0500

    [#1084] feat: Add dynamic allocation patch for Spark 3.3 (#1224)
    
    ### What changes were proposed in this pull request?
    Add the dynamic allocation patch for Spark 3.3
    
    ### Why are the changes needed?
    https://github.com/apache/incubator-uniffle/issues/1084
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Manual test
---
 README.md                                          |  2 +-
 .../spark-3.3.1_dynamic_allocation_support.patch   | 92 ++++++++++++++++++++++
 2 files changed, 93 insertions(+), 1 deletion(-)

diff --git a/README.md b/README.md
index 0007510e5..dda6e8f0e 100644
--- a/README.md
+++ b/README.md
@@ -250,7 +250,7 @@ Deploy Steps:
 ### Support Spark dynamic allocation
 
 To support spark dynamic allocation with Uniffle, spark code should be updated.
-There are 3 patches for spark (2.4.6/3.1.2/3.2.1) in patch/spark folder for 
reference.
+There are 4 patches for spark (2.4.6/3.1.2/3.2.1/3.3.1) in patch/spark folder 
for reference.
 
 After apply the patch and rebuild spark, add following configuration in spark 
conf to enable dynamic allocation:
   ```
diff --git a/patch/spark/spark-3.3.1_dynamic_allocation_support.patch 
b/patch/spark/spark-3.3.1_dynamic_allocation_support.patch
new file mode 100644
index 000000000..194e873cd
--- /dev/null
+++ b/patch/spark/spark-3.3.1_dynamic_allocation_support.patch
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+index b9bc3c63ff4..ebb4371aeb5 100644
+--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
++++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+@@ -204,7 +204,9 @@ private[spark] class ExecutorAllocationManager(
+         s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be 
> 0!")
+     }
+     if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
+-      if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
++      if (conf.isRssEnable()) {
++        logInfo("Dynamic allocation will use remote shuffle service")
++      } else if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
+         logInfo("Dynamic allocation is enabled without a shuffle service.")
+       } else if (decommissionEnabled &&
+           conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) {
+diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
+index 5f37a1abb19..af4bee1e1bb 100644
+--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
++++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
+@@ -580,6 +580,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Seria
+     Utils.redact(this, getAll).sorted.map { case (k, v) => k + "=" + v 
}.mkString("\n")
+   }
+ 
++  /**
++   * Return true if remote shuffle service is enabled.
++   */
++  def isRssEnable(): Boolean = get("spark.shuffle.manager", 
"sort").contains("RssShuffleManager")
+ }
+ 
+ private[spark] object SparkConf extends Logging {
+diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+index 0de60224179..8754212dd57 100644
+--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
++++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+@@ -2418,7 +2418,8 @@ private[spark] class DAGScheduler(
+     // if the cluster manager explicitly tells us that the entire worker was 
lost, then
+     // we know to unregister shuffle output.  (Note that "worker" 
specifically refers to the process
+     // from a Standalone cluster, where the shuffle service lives in the 
Worker.)
+-    val fileLost = workerHost.isDefined || 
!env.blockManager.externalShuffleServiceEnabled
++    val fileLost = (workerHost.isDefined || 
!env.blockManager.externalShuffleServiceEnabled) &&
++      !sc.getConf.isRssEnable()
+     removeExecutorAndUnregisterOutputs(
+       execId = execId,
+       fileLost = fileLost,
+diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+index 8523dc4c938..f7651fcd774 100644
+--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
++++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+@@ -1032,7 +1032,8 @@ private[spark] class TaskSetManager(
+     // and we are not using an external shuffle server which could serve the 
shuffle outputs.
+     // The reason is the next stage wouldn't be able to fetch the data from 
this dead executor
+     // so we would need to rerun these tasks on other executors.
+-    if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled 
&& !isZombie) {
++    if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled 
&&
++      !isZombie && !conf.isRssEnable()) {
+       for ((tid, info) <- taskInfos if info.executorId == execId) {
+         val index = info.index
+         // We may have a running task whose partition has been marked as 
successful,
+diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
+index 47d61196fe8..98a5381bef4 100644
+--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
++++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
+@@ -174,6 +174,9 @@ class ShuffledRowRDD(
+   }
+ 
+   override def getPreferredLocations(partition: Partition): Seq[String] = {
++    if (conf.isRssEnable()) {
++      return Nil
++    }
+     val tracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+     partition.asInstanceOf[ShuffledRowRDDPartition].spec match {
+       case CoalescedPartitionSpec(startReducerIndex, endReducerIndex, _) =>
+-- 
+2.39.3 (Apple Git-145)
+

Reply via email to