This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 125b2f87d453 [SPARK-46861][CORE] Avoid Deadlock in DAGScheduler
125b2f87d453 is described below

commit 125b2f87d453a16325f24e7382707f2b365bba14
Author: fred-db <fredrik.kla...@databricks.com>
AuthorDate: Thu Jan 25 08:34:37 2024 -0800

    [SPARK-46861][CORE] Avoid Deadlock in DAGScheduler
    
    * The DAGScheduler could currently run into a deadlock with another thread 
if both access the partitions of the same RDD at the same time.
    * To make progress in getCacheLocs, we require both exclusive access to the 
RDD partitions and the location cache. We first lock on the location cache, and 
then on the RDD.
    * When accessing partitions of an RDD, the RDD first acquires exclusive 
access on the partitions, and then might acquire exclusive access on the 
location cache.
    * If thread 1 is able to acquire access on the RDD, while thread 2 holds 
the access to the location cache, we can run into a deadlock situation.
    * To fix this, acquire locks in the same order. Change the DAGScheduler to 
first acquire the lock on the RDD, and then the lock on the location cache.
    
    * This is a deadlock you can run into, which can prevent any progress on 
the cluster.
    
    * No
    
    * Unit test that reproduces the issue.
    
    No
    
    Closes #44882 from fred-db/fix-deadlock.
    
    Authored-by: fred-db <fredrik.kla...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 617014cc92d933c70c9865a578fceb265883badd)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 core/src/main/scala/org/apache/spark/rdd/RDD.scala | 11 ++++---
 .../org/apache/spark/scheduler/DAGScheduler.scala  | 31 ++++++++++--------
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 38 +++++++++++++++++++++-
 3 files changed, 62 insertions(+), 18 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a21d2ae77396..f695b1020275 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -223,14 +223,17 @@ abstract class RDD[T: ClassTag](
    * not use `this` because RDDs are user-visible, so users might have added 
their own locking on
    * RDDs; sharing that could lead to a deadlock.
    *
-   * One thread might hold the lock on many of these, for a chain of RDD 
dependencies; but
-   * because DAGs are acyclic, and we only ever hold locks for one path in 
that DAG, there is no
-   * chance of deadlock.
+   * One thread might hold the lock on many of these, for a chain of RDD 
dependencies. Deadlocks
+   * are possible if we try to lock another resource while holding the 
stateLock,
+   * and the lock acquisition sequence of these locks is not guaranteed to be 
the same.
+   * This can lead lead to a deadlock as one thread might first acquire the 
stateLock,
+   * and then the resource,
+   * while another thread might first acquire the resource, and then the 
stateLock.
    *
    * Executors may reference the shared fields (though they should never 
mutate them,
    * that only happens on the driver).
    */
-  private val stateLock = new Serializable {}
+  private[spark] val stateLock = new Serializable {}
 
   // Our dependencies and partitions will be gotten by calling subclass's 
methods below, and will
   // be overwritten when we're checkpointed
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index d8adaae19b90..89d16e579348 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -174,6 +174,9 @@ private[spark] class DAGScheduler(
    * locations where that RDD partition is cached.
    *
    * All accesses to this map should be guarded by synchronizing on it (see 
SPARK-4454).
+   * If you need to access any RDD while synchronizing on the cache locations,
+   * first synchronize on the RDD, and then synchronize on this map to avoid 
deadlocks. The RDD
+   * could try to access the cache locations after synchronizing on the RDD.
    */
   private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]
 
@@ -420,22 +423,24 @@ private[spark] class DAGScheduler(
   }
 
   private[scheduler]
-  def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = 
cacheLocs.synchronized {
-    // Note: this doesn't use `getOrElse()` because this method is called 
O(num tasks) times
-    if (!cacheLocs.contains(rdd.id)) {
-      // Note: if the storage level is NONE, we don't need to get locations 
from block manager.
-      val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == 
StorageLevel.NONE) {
-        IndexedSeq.fill(rdd.partitions.length)(Nil)
-      } else {
-        val blockIds =
-          rdd.partitions.indices.map(index => RDDBlockId(rdd.id, 
index)).toArray[BlockId]
-        blockManagerMaster.getLocations(blockIds).map { bms =>
-          bms.map(bm => TaskLocation(bm.host, bm.executorId))
+  def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = 
rdd.stateLock.synchronized {
+    cacheLocs.synchronized {
+      // Note: this doesn't use `getOrElse()` because this method is called 
O(num tasks) times
+      if (!cacheLocs.contains(rdd.id)) {
+        // Note: if the storage level is NONE, we don't need to get locations 
from block manager.
+        val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == 
StorageLevel.NONE) {
+          IndexedSeq.fill(rdd.partitions.length)(Nil)
+        } else {
+          val blockIds =
+            rdd.partitions.indices.map(index => RDDBlockId(rdd.id, 
index)).toArray[BlockId]
+          blockManagerMaster.getLocations(blockIds).map { bms =>
+            bms.map(bm => TaskLocation(bm.host, bm.executorId))
+          }
         }
+        cacheLocs(rdd.id) = locs
       }
-      cacheLocs(rdd.id) = locs
+      cacheLocs(rdd.id)
     }
-    cacheLocs(rdd.id)
   }
 
   private def clearCacheLocs(): Unit = cacheLocs.synchronized {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 9b7c5d5ace31..1818bf9b152d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -48,7 +48,7 @@ import 
org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.scheduler.local.LocalSchedulerBackend
 import org.apache.spark.shuffle.{FetchFailedException, 
MetadataFetchFailedException}
 import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerId, 
BlockManagerMaster}
-import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, 
Clock, LongAccumulator, SystemClock, Utils}
+import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, 
Clock, LongAccumulator, SystemClock, ThreadUtils, Utils}
 
 class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
   extends DAGSchedulerEventProcessLoop(dagScheduler) {
@@ -594,6 +594,42 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     assertDataStructuresEmpty()
   }
 
+  // Note that this test is NOT perfectly reproducible when there is a 
deadlock as it uses
+  // Thread.sleep, but it should never fail / flake when there is no deadlock.
+  // If this test starts to flake, this shows that there is a deadlock!
+  test("No Deadlock between getCacheLocs and CoalescedRDD") {
+    val rdd = sc.parallelize(1 to 10, numSlices = 10)
+    val coalescedRDD = rdd.coalesce(2)
+    val executionContext = ThreadUtils.newDaemonFixedThreadPool(
+      nThreads = 2, "test-getCacheLocs")
+    // Used to only make progress on getCacheLocs after we acquired the lock 
to the RDD.
+    val rddLock = new java.util.concurrent.Semaphore(0)
+    val partitionsFuture = executionContext.submit(new Runnable {
+      override def run(): Unit = {
+        coalescedRDD.stateLock.synchronized {
+          rddLock.release(1)
+          // Try to access the partitions of the coalescedRDD. This will cause 
a call to
+          // getCacheLocs internally.
+          Thread.sleep(5000)
+          coalescedRDD.partitions
+        }
+      }
+    })
+    val getCacheLocsFuture = executionContext.submit(new Runnable {
+      override def run(): Unit = {
+        rddLock.acquire()
+        // Access the cache locations.
+        // If the partition location cache is locked before the stateLock is 
locked,
+        // we'll run into a deadlock.
+        sc.dagScheduler.getCacheLocs(coalescedRDD)
+      }
+    })
+    // If any of the futures throw a TimeOutException, this shows that there 
is a deadlock between
+    // getCacheLocs and accessing partitions of an RDD.
+    getCacheLocsFuture.get(120, TimeUnit.SECONDS)
+    partitionsFuture.get(120, TimeUnit.SECONDS)
+  }
+
   test("All shuffle files on the storage endpoint should be cleaned up when it 
is lost") {
     conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
     conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to