This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 125b2f87d453 [SPARK-46861][CORE] Avoid Deadlock in DAGScheduler 125b2f87d453 is described below commit 125b2f87d453a16325f24e7382707f2b365bba14 Author: fred-db <fredrik.kla...@databricks.com> AuthorDate: Thu Jan 25 08:34:37 2024 -0800 [SPARK-46861][CORE] Avoid Deadlock in DAGScheduler * The DAGScheduler could currently run into a deadlock with another thread if both access the partitions of the same RDD at the same time. * To make progress in getCacheLocs, we require both exclusive access to the RDD partitions and the location cache. We first lock on the location cache, and then on the RDD. * When accessing partitions of an RDD, the RDD first acquires exclusive access on the partitions, and then might acquire exclusive access on the location cache. * If thread 1 is able to acquire access on the RDD, while thread 2 holds the access to the location cache, we can run into a deadlock situation. * To fix this, acquire locks in the same order. Change the DAGScheduler to first acquire the lock on the RDD, and then the lock on the location cache. * This is a deadlock you can run into, which can prevent any progress on the cluster. * No * Unit test that reproduces the issue. No Closes #44882 from fred-db/fix-deadlock. Authored-by: fred-db <fredrik.kla...@databricks.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> (cherry picked from commit 617014cc92d933c70c9865a578fceb265883badd) Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 11 ++++--- .../org/apache/spark/scheduler/DAGScheduler.scala | 31 ++++++++++-------- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 38 +++++++++++++++++++++- 3 files changed, 62 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a21d2ae77396..f695b1020275 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -223,14 +223,17 @@ abstract class RDD[T: ClassTag]( * not use `this` because RDDs are user-visible, so users might have added their own locking on * RDDs; sharing that could lead to a deadlock. * - * One thread might hold the lock on many of these, for a chain of RDD dependencies; but - * because DAGs are acyclic, and we only ever hold locks for one path in that DAG, there is no - * chance of deadlock. + * One thread might hold the lock on many of these, for a chain of RDD dependencies. Deadlocks + * are possible if we try to lock another resource while holding the stateLock, + * and the lock acquisition sequence of these locks is not guaranteed to be the same. + * This can lead lead to a deadlock as one thread might first acquire the stateLock, + * and then the resource, + * while another thread might first acquire the resource, and then the stateLock. * * Executors may reference the shared fields (though they should never mutate them, * that only happens on the driver). */ - private val stateLock = new Serializable {} + private[spark] val stateLock = new Serializable {} // Our dependencies and partitions will be gotten by calling subclass's methods below, and will // be overwritten when we're checkpointed diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index d8adaae19b90..89d16e579348 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -174,6 +174,9 @@ private[spark] class DAGScheduler( * locations where that RDD partition is cached. * * All accesses to this map should be guarded by synchronizing on it (see SPARK-4454). + * If you need to access any RDD while synchronizing on the cache locations, + * first synchronize on the RDD, and then synchronize on this map to avoid deadlocks. The RDD + * could try to access the cache locations after synchronizing on the RDD. */ private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]] @@ -420,22 +423,24 @@ private[spark] class DAGScheduler( } private[scheduler] - def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized { - // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times - if (!cacheLocs.contains(rdd.id)) { - // Note: if the storage level is NONE, we don't need to get locations from block manager. - val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) { - IndexedSeq.fill(rdd.partitions.length)(Nil) - } else { - val blockIds = - rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId] - blockManagerMaster.getLocations(blockIds).map { bms => - bms.map(bm => TaskLocation(bm.host, bm.executorId)) + def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = rdd.stateLock.synchronized { + cacheLocs.synchronized { + // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times + if (!cacheLocs.contains(rdd.id)) { + // Note: if the storage level is NONE, we don't need to get locations from block manager. + val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) { + IndexedSeq.fill(rdd.partitions.length)(Nil) + } else { + val blockIds = + rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId] + blockManagerMaster.getLocations(blockIds).map { bms => + bms.map(bm => TaskLocation(bm.host, bm.executorId)) + } } + cacheLocs(rdd.id) = locs } - cacheLocs(rdd.id) = locs + cacheLocs(rdd.id) } - cacheLocs(rdd.id) } private def clearCacheLocs(): Unit = cacheLocs.synchronized { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9b7c5d5ace31..1818bf9b152d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -48,7 +48,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.scheduler.local.LocalSchedulerBackend import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException} import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerId, BlockManagerMaster} -import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, Clock, LongAccumulator, SystemClock, Utils} +import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, Clock, LongAccumulator, SystemClock, ThreadUtils, Utils} class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler) extends DAGSchedulerEventProcessLoop(dagScheduler) { @@ -594,6 +594,42 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti assertDataStructuresEmpty() } + // Note that this test is NOT perfectly reproducible when there is a deadlock as it uses + // Thread.sleep, but it should never fail / flake when there is no deadlock. + // If this test starts to flake, this shows that there is a deadlock! + test("No Deadlock between getCacheLocs and CoalescedRDD") { + val rdd = sc.parallelize(1 to 10, numSlices = 10) + val coalescedRDD = rdd.coalesce(2) + val executionContext = ThreadUtils.newDaemonFixedThreadPool( + nThreads = 2, "test-getCacheLocs") + // Used to only make progress on getCacheLocs after we acquired the lock to the RDD. + val rddLock = new java.util.concurrent.Semaphore(0) + val partitionsFuture = executionContext.submit(new Runnable { + override def run(): Unit = { + coalescedRDD.stateLock.synchronized { + rddLock.release(1) + // Try to access the partitions of the coalescedRDD. This will cause a call to + // getCacheLocs internally. + Thread.sleep(5000) + coalescedRDD.partitions + } + } + }) + val getCacheLocsFuture = executionContext.submit(new Runnable { + override def run(): Unit = { + rddLock.acquire() + // Access the cache locations. + // If the partition location cache is locked before the stateLock is locked, + // we'll run into a deadlock. + sc.dagScheduler.getCacheLocs(coalescedRDD) + } + }) + // If any of the futures throw a TimeOutException, this shows that there is a deadlock between + // getCacheLocs and accessing partitions of an RDD. + getCacheLocsFuture.get(120, TimeUnit.SECONDS) + partitionsFuture.get(120, TimeUnit.SECONDS) + } + test("All shuffle files on the storage endpoint should be cleaned up when it is lost") { conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true") conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org