This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 3130ac9276bd [SPARK-46861][CORE] Avoid Deadlock in DAGScheduler
3130ac9276bd is described below
commit 3130ac9276bd43dd21aa1aa5e5ef920b00bc3aff
Author: fred-db <[email protected]>
AuthorDate: Thu Jan 25 08:34:37 2024 -0800
[SPARK-46861][CORE] Avoid Deadlock in DAGScheduler
* The DAGScheduler could currently run into a deadlock with another thread
if both access the partitions of the same RDD at the same time.
* To make progress in getCacheLocs, we require both exclusive access to the
RDD partitions and the location cache. We first lock on the location cache, and
then on the RDD.
* When accessing partitions of an RDD, the RDD first acquires exclusive
access on the partitions, and then might acquire exclusive access on the
location cache.
* If thread 1 is able to acquire access on the RDD, while thread 2 holds
the access to the location cache, we can run into a deadlock situation.
* To fix this, acquire locks in the same order. Change the DAGScheduler to
first acquire the lock on the RDD, and then the lock on the location cache.
* This is a deadlock you can run into, which can prevent any progress on
the cluster.
* No
* Unit test that reproduces the issue.
No
Closes #44882 from fred-db/fix-deadlock.
Authored-by: fred-db <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 617014cc92d933c70c9865a578fceb265883badd)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
core/src/main/scala/org/apache/spark/rdd/RDD.scala | 11 ++++---
.../org/apache/spark/scheduler/DAGScheduler.scala | 31 ++++++++++--------
.../apache/spark/scheduler/DAGSchedulerSuite.scala | 38 +++++++++++++++++++++-
3 files changed, 62 insertions(+), 18 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 407820b663a3..fc5a2089f43b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -223,14 +223,17 @@ abstract class RDD[T: ClassTag](
* not use `this` because RDDs are user-visible, so users might have added
their own locking on
* RDDs; sharing that could lead to a deadlock.
*
- * One thread might hold the lock on many of these, for a chain of RDD
dependencies; but
- * because DAGs are acyclic, and we only ever hold locks for one path in
that DAG, there is no
- * chance of deadlock.
+ * One thread might hold the lock on many of these, for a chain of RDD
dependencies. Deadlocks
+ * are possible if we try to lock another resource while holding the
stateLock,
+ * and the lock acquisition sequence of these locks is not guaranteed to be
the same.
+ * This can lead lead to a deadlock as one thread might first acquire the
stateLock,
+ * and then the resource,
+ * while another thread might first acquire the resource, and then the
stateLock.
*
* Executors may reference the shared fields (though they should never
mutate them,
* that only happens on the driver).
*/
- private val stateLock = new Serializable {}
+ private[spark] val stateLock = new Serializable {}
// Our dependencies and partitions will be gotten by calling subclass's
methods below, and will
// be overwritten when we're checkpointed
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 2a966fab6f02..26be8c72bbcb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -173,6 +173,9 @@ private[spark] class DAGScheduler(
* locations where that RDD partition is cached.
*
* All accesses to this map should be guarded by synchronizing on it (see
SPARK-4454).
+ * If you need to access any RDD while synchronizing on the cache locations,
+ * first synchronize on the RDD, and then synchronize on this map to avoid
deadlocks. The RDD
+ * could try to access the cache locations after synchronizing on the RDD.
*/
private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]
@@ -408,22 +411,24 @@ private[spark] class DAGScheduler(
}
private[scheduler]
- def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] =
cacheLocs.synchronized {
- // Note: this doesn't use `getOrElse()` because this method is called
O(num tasks) times
- if (!cacheLocs.contains(rdd.id)) {
- // Note: if the storage level is NONE, we don't need to get locations
from block manager.
- val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel ==
StorageLevel.NONE) {
- IndexedSeq.fill(rdd.partitions.length)(Nil)
- } else {
- val blockIds =
- rdd.partitions.indices.map(index => RDDBlockId(rdd.id,
index)).toArray[BlockId]
- blockManagerMaster.getLocations(blockIds).map { bms =>
- bms.map(bm => TaskLocation(bm.host, bm.executorId))
+ def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] =
rdd.stateLock.synchronized {
+ cacheLocs.synchronized {
+ // Note: this doesn't use `getOrElse()` because this method is called
O(num tasks) times
+ if (!cacheLocs.contains(rdd.id)) {
+ // Note: if the storage level is NONE, we don't need to get locations
from block manager.
+ val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel ==
StorageLevel.NONE) {
+ IndexedSeq.fill(rdd.partitions.length)(Nil)
+ } else {
+ val blockIds =
+ rdd.partitions.indices.map(index => RDDBlockId(rdd.id,
index)).toArray[BlockId]
+ blockManagerMaster.getLocations(blockIds).map { bms =>
+ bms.map(bm => TaskLocation(bm.host, bm.executorId))
+ }
}
+ cacheLocs(rdd.id) = locs
}
- cacheLocs(rdd.id) = locs
+ cacheLocs(rdd.id)
}
- cacheLocs(rdd.id)
}
private def clearCacheLocs(): Unit = cacheLocs.synchronized {
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 17abf3aef4e2..cb4cbc22e5d1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -48,7 +48,7 @@ import
org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.shuffle.{FetchFailedException,
MetadataFetchFailedException}
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerId,
BlockManagerMaster}
-import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite,
Clock, LongAccumulator, SystemClock, Utils}
+import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite,
Clock, LongAccumulator, SystemClock, ThreadUtils, Utils}
class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
extends DAGSchedulerEventProcessLoop(dagScheduler) {
@@ -589,6 +589,42 @@ class DAGSchedulerSuite extends SparkFunSuite with
TempLocalSparkContext with Ti
assertDataStructuresEmpty()
}
+ // Note that this test is NOT perfectly reproducible when there is a
deadlock as it uses
+ // Thread.sleep, but it should never fail / flake when there is no deadlock.
+ // If this test starts to flake, this shows that there is a deadlock!
+ test("No Deadlock between getCacheLocs and CoalescedRDD") {
+ val rdd = sc.parallelize(1 to 10, numSlices = 10)
+ val coalescedRDD = rdd.coalesce(2)
+ val executionContext = ThreadUtils.newDaemonFixedThreadPool(
+ nThreads = 2, "test-getCacheLocs")
+ // Used to only make progress on getCacheLocs after we acquired the lock
to the RDD.
+ val rddLock = new java.util.concurrent.Semaphore(0)
+ val partitionsFuture = executionContext.submit(new Runnable {
+ override def run(): Unit = {
+ coalescedRDD.stateLock.synchronized {
+ rddLock.release(1)
+ // Try to access the partitions of the coalescedRDD. This will cause
a call to
+ // getCacheLocs internally.
+ Thread.sleep(5000)
+ coalescedRDD.partitions
+ }
+ }
+ })
+ val getCacheLocsFuture = executionContext.submit(new Runnable {
+ override def run(): Unit = {
+ rddLock.acquire()
+ // Access the cache locations.
+ // If the partition location cache is locked before the stateLock is
locked,
+ // we'll run into a deadlock.
+ sc.dagScheduler.getCacheLocs(coalescedRDD)
+ }
+ })
+ // If any of the futures throw a TimeOutException, this shows that there
is a deadlock between
+ // getCacheLocs and accessing partitions of an RDD.
+ getCacheLocsFuture.get(120, TimeUnit.SECONDS)
+ partitionsFuture.get(120, TimeUnit.SECONDS)
+ }
+
test("All shuffle files on the storage endpoint should be cleaned up when it
is lost") {
conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]