This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 4f46e8f  [SPARK-28917][CORE] Synchronize access to RDD mutable state
4f46e8f is described below

commit 4f46e8f804cba6d845116cb7daf9b4c682e6a0f1
Author: Imran Rashid <[email protected]>
AuthorDate: Tue Oct 8 11:35:54 2019 -0700

    [SPARK-28917][CORE] Synchronize access to RDD mutable state
    
    RDD dependencies and partitions can be simultaneously
    accessed and mutated by user threads and spark's scheduler threads, so
    access must be thread-safe.  In particular, as partitions and
    dependencies are lazily-initialized, before this change they could get
    initialized multiple times, which would lead to the scheduler having an
    inconsistent view of the pendings stages and get stuck.
    
    Tested with existing unit tests.
    
    Closes #25951 from squito/SPARK-28917.
    
    Authored-by: Imran Rashid <[email protected]>
    Signed-off-by: Marcelo Vanzin <[email protected]>
    (cherry picked from commit 0da667d31436c43e06cb6bb5ac65a17f65edd08b)
    Signed-off-by: Marcelo Vanzin <[email protected]>
---
 core/src/main/scala/org/apache/spark/rdd/RDD.scala | 41 +++++++++++++++++-----
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  6 ++--
 .../scala/org/apache/spark/DistributedSuite.scala  | 15 ++++++++
 3 files changed, 51 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 0be2543..31c6bc1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -222,10 +222,24 @@ abstract class RDD[T: ClassTag](
   /** Get the RDD's current storage level, or StorageLevel.NONE if none is 
set. */
   def getStorageLevel: StorageLevel = storageLevel
 
+  /**
+   * Lock for all mutable state of this RDD (persistence, partitions, 
dependencies, etc.).  We do
+   * not use `this` because RDDs are user-visible, so users might have added 
their own locking on
+   * RDDs; sharing that could lead to a deadlock.
+   *
+   * One thread might hold the lock on many of these, for a chain of RDD 
dependencies; but
+   * because DAGs are acyclic, and we only ever hold locks for one path in 
that DAG, there is no
+   * chance of deadlock.
+   *
+   * The use of Integer is simply so this is serializable -- executors may 
reference the shared
+   * fields (though they should never mutate them, that only happens on the 
driver).
+   */
+  private val stateLock = new Integer(0)
+
   // Our dependencies and partitions will be gotten by calling subclass's 
methods below, and will
   // be overwritten when we're checkpointed
-  private var dependencies_ : Seq[Dependency[_]] = _
-  @transient private var partitions_ : Array[Partition] = _
+  @volatile private var dependencies_ : Seq[Dependency[_]] = _
+  @volatile @transient private var partitions_ : Array[Partition] = _
 
   /** An Option holding our checkpoint RDD, if we are checkpointed */
   private def checkpointRDD: Option[CheckpointRDD[T]] = 
checkpointData.flatMap(_.checkpointRDD)
@@ -237,7 +251,11 @@ abstract class RDD[T: ClassTag](
   final def dependencies: Seq[Dependency[_]] = {
     checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
       if (dependencies_ == null) {
-        dependencies_ = getDependencies
+        stateLock.synchronized {
+          if (dependencies_ == null) {
+            dependencies_ = getDependencies
+          }
+        }
       }
       dependencies_
     }
@@ -250,10 +268,14 @@ abstract class RDD[T: ClassTag](
   final def partitions: Array[Partition] = {
     checkpointRDD.map(_.partitions).getOrElse {
       if (partitions_ == null) {
-        partitions_ = getPartitions
-        partitions_.zipWithIndex.foreach { case (partition, index) =>
-          require(partition.index == index,
-            s"partitions($index).partition == ${partition.index}, but it 
should equal $index")
+        stateLock.synchronized {
+          if (partitions_ == null) {
+            partitions_ = getPartitions
+            partitions_.zipWithIndex.foreach { case (partition, index) =>
+              require(partition.index == index,
+                s"partitions($index).partition == ${partition.index}, but it 
should equal $index")
+            }
+          }
         }
       }
       partitions_
@@ -1798,7 +1820,7 @@ abstract class RDD[T: ClassTag](
    * Changes the dependencies of this RDD from its original parents to a new 
RDD (`newRDD`)
    * created from the checkpoint file, and forget its old dependencies and 
partitions.
    */
-  private[spark] def markCheckpointed(): Unit = {
+  private[spark] def markCheckpointed(): Unit = stateLock.synchronized {
     clearDependencies()
     partitions_ = null
     deps = null    // Forget the constructor argument for dependencies too
@@ -1810,7 +1832,7 @@ abstract class RDD[T: ClassTag](
    * collected. Subclasses of RDD may override this method for implementing 
their own cleaning
    * logic. See [[org.apache.spark.rdd.UnionRDD]] for an example.
    */
-  protected def clearDependencies(): Unit = {
+  protected def clearDependencies(): Unit = stateLock.synchronized {
     dependencies_ = null
   }
 
@@ -1969,6 +1991,7 @@ abstract class RDD[T: ClassTag](
       deterministicLevelCandidates.maxBy(_.id)
     }
   }
+
 }
 
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 5be3c40..d0d12d8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -397,7 +397,8 @@ private[spark] class DAGScheduler(
     if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
       // Kind of ugly: need to register RDDs with the cache and map output 
tracker here
       // since we can't do it in the RDD constructor because # of partitions 
is unknown
-      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
+      logInfo(s"Registering RDD ${rdd.id} (${rdd.getCreationSite}) as input to 
" +
+        s"shuffle ${shuffleDep.shuffleId}")
       mapOutputTracker.registerShuffle(shuffleDep.shuffleId, 
rdd.partitions.length)
     }
     stage
@@ -1060,7 +1061,8 @@ private[spark] class DAGScheduler(
   private def submitStage(stage: Stage) {
     val jobId = activeJobForStage(stage)
     if (jobId.isDefined) {
-      logDebug("submitStage(" + stage + ")")
+      logDebug(s"submitStage($stage (name=${stage.name};" +
+        s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
       if (!waitingStages(stage) && !runningStages(stage) && 
!failedStages(stage)) {
         val missing = getMissingParentStages(stage).sortBy(_.id)
         logDebug("missing: " + missing)
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala 
b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 629a323..9398d5e 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -336,6 +336,21 @@ class DistributedSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
     }
   }
 
+  test("reference partitions inside a task") {
+    // Run a simple job which just makes sure there is no failure if we touch 
rdd.partitions
+    // inside a task.  This requires the stateLock to be serializable.  This 
is very convoluted
+    // use case, it's just a check for backwards-compatibility after the fix 
for SPARK-28917.
+    sc = new SparkContext("local-cluster[1,1,1024]", "test")
+    val rdd1 = sc.parallelize(1 to 10, 1)
+    val rdd2 = rdd1.map { x => x + 1}
+    // ensure we can force computation of rdd2.dependencies inside a task.  
Just touching
+    // it will force computation and touching the stateLock.  The check for 
null is to just
+    // to make sure that we've setup our test correctly, and haven't 
precomputed dependencies
+    // in the driver
+    val dependencyComputeCount = rdd1.map { x => if (rdd2.dependencies == 
null) 1 else 0}.sum()
+    assert(dependencyComputeCount > 0)
+  }
+
 }
 
 object DistributedSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to