Repository: spark
Updated Branches:
  refs/heads/master c82fe4781 -> eb5bdcaf6


[SPARK-695] In DAGScheduler's getPreferredLocs, track set of visited partitions.

getPreferredLocs traverses a dependency graph of partitions using depth first 
search.  Given a complex dependency graph, the old implementation may explore a 
set of paths in the graph that is exponential in the number of nodes.  By 
maintaining a set of visited nodes the new implementation avoids revisiting 
nodes, preventing exponential blowup.

Some comment and whitespace cleanups are also included.

Author: Aaron Staple <[email protected]>

Closes #1362 from staple/SPARK-695 and squashes the following commits:

ecea0f3 [Aaron Staple] address review comments
751c661 [Aaron Staple] [SPARK-695] Add a unit test.
5adf326 [Aaron Staple] Replace getPreferredLocsInternal's HashMap argument with 
a simpler HashSet.
58e37d0 [Aaron Staple] Replace comment documenting NarrowDependency.
6751ced [Aaron Staple] Revert "Remove unused variable."
04c7097 [Aaron Staple] Fix indentation.
0030884 [Aaron Staple] Remove unused variable.
33f67c6 [Aaron Staple] Clarify comment.
4e42b46 [Aaron Staple] Remove apparently incorrect comment describing 
NarrowDependency.
65c2d3d [Aaron Staple] [SPARK-695] In DAGScheduler's getPreferredLocs, track 
set of visited partitions.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb5bdcaf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb5bdcaf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb5bdcaf

Branch: refs/heads/master
Commit: eb5bdcaf6c7834558cb76b7132f68b8d94230356
Parents: c82fe47
Author: Aaron Staple <[email protected]>
Authored: Fri Aug 1 12:04:04 2014 -0700
Committer: Matei Zaharia <[email protected]>
Committed: Fri Aug 1 12:04:04 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/Dependency.scala  |  4 ++--
 .../scala/org/apache/spark/SparkContext.scala     |  2 +-
 .../scala/org/apache/spark/rdd/CoalescedRDD.scala |  4 ++--
 .../org/apache/spark/scheduler/DAGScheduler.scala | 18 +++++++++++++++++-
 .../spark/scheduler/DAGSchedulerSuite.scala       | 16 +++++++++++++++-
 5 files changed, 37 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/eb5bdcaf/core/src/main/scala/org/apache/spark/Dependency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala 
b/core/src/main/scala/org/apache/spark/Dependency.scala
index 3935c87..ab2594c 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -34,8 +34,8 @@ abstract class Dependency[T] extends Serializable {
 
 /**
  * :: DeveloperApi ::
- * Base class for dependencies where each partition of the parent RDD is used 
by at most one
- * partition of the child RDD.  Narrow dependencies allow for pipelined 
execution.
+ * Base class for dependencies where each partition of the child RDD depends 
on a small number
+ * of partitions of the parent RDD. Narrow dependencies allow for pipelined 
execution.
  */
 @DeveloperApi
 abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {

http://git-wip-us.apache.org/repos/asf/spark/blob/eb5bdcaf/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 5f75c1d..368835a 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -458,7 +458,7 @@ class SparkContext(config: SparkConf) extends Logging {
   /** Distribute a local Scala collection to form an RDD, with one or more
     * location preferences (hostnames of Spark nodes) for each object.
     * Create a new partition for each collection item. */
-   def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = {
+  def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = {
     val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
     new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/eb5bdcaf/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index e7221e3..11ebafb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -49,8 +49,8 @@ private[spark] case class CoalescedRDDPartition(
   }
 
   /**
-   * Computes how many of the parents partitions have getPreferredLocation
-   * as one of their preferredLocations
+   * Computes the fraction of the parents' partitions containing 
preferredLocation within
+   * their getPreferredLocs.
    * @return locality of this coalesced partition between 0 and 1
    */
   def localFraction: Double = {

http://git-wip-us.apache.org/repos/asf/spark/blob/eb5bdcaf/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c7e3d7c..5110785 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1148,6 +1148,22 @@ class DAGScheduler(
    */
   private[spark]
   def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = 
synchronized {
+    getPreferredLocsInternal(rdd, partition, new HashSet)
+  }
+
+  /** Recursive implementation for getPreferredLocs. */
+  private def getPreferredLocsInternal(
+      rdd: RDD[_],
+      partition: Int,
+      visited: HashSet[(RDD[_],Int)])
+    : Seq[TaskLocation] =
+  {
+    // If the partition has already been visited, no need to re-visit.
+    // This avoids exponential path exploration.  SPARK-695
+    if (!visited.add((rdd,partition))) {
+      // Nil has already been returned for previously visited partitions.
+      return Nil
+    }
     // If the partition is cached, return the cache locations
     val cached = getCacheLocs(rdd)(partition)
     if (!cached.isEmpty) {
@@ -1164,7 +1180,7 @@ class DAGScheduler(
     rdd.dependencies.foreach {
       case n: NarrowDependency[_] =>
         for (inPart <- n.getParents(partition)) {
-          val locs = getPreferredLocs(n.rdd, inPart)
+          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
           if (locs != Nil) {
             return locs
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/eb5bdcaf/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 0ce13d0..36e238b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -23,6 +23,8 @@ import scala.language.reflectiveCalls
 import akka.actor._
 import akka.testkit.{ImplicitSender, TestKit, TestActorRef}
 import org.scalatest.{BeforeAndAfter, FunSuiteLike}
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
@@ -64,7 +66,7 @@ class MyRDD(
 class DAGSchedulerSuiteDummyException extends Exception
 
 class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with 
FunSuiteLike
-  with ImplicitSender with BeforeAndAfter with LocalSparkContext {
+  with ImplicitSender with BeforeAndAfter with LocalSparkContext with Timeouts 
{
 
   val conf = new SparkConf
   /** Set of TaskSets the DAGScheduler has requested executed. */
@@ -294,6 +296,18 @@ class DAGSchedulerSuite extends 
TestKit(ActorSystem("DAGSchedulerSuite")) with F
     assertDataStructuresEmpty
   }
 
+  test("avoid exponential blowup when getting preferred locs list") {
+    // Build up a complex dependency graph with repeated zip operations, 
without preferred locations.
+    var rdd: RDD[_] = new MyRDD(sc, 1, Nil)
+    (1 to 30).foreach(_ => rdd = rdd.zip(rdd))
+    // getPreferredLocs runs quickly, indicating that exponential graph 
traversal is avoided.
+    failAfter(10 seconds) {
+      val preferredLocs = scheduler.getPreferredLocs(rdd,0)
+      // No preferred locations are returned.
+      assert(preferredLocs.length === 0)
+    }
+  }
+
   test("unserializable task") {
     val unserializableRdd = new MyRDD(sc, 1, Nil) {
       class UnserializableClass

Reply via email to