Repository: spark
Updated Branches:
  refs/heads/branch-1.5 4f75ce2e1 -> fa55c2742


[SPARK-10008] Ensure shuffle locality doesn't take precedence over narrow deps

The shuffle locality patch made the DAGScheduler aware of shuffle data,
but for RDDs that have both narrow and shuffle dependencies, it can
cause them to place tasks based on the shuffle dependency instead of the
narrow one. This case is common in iterative join-based algorithms like
PageRank and ALS, where one RDD is hash-partitioned and one isn't.

Author: Matei Zaharia <ma...@databricks.com>

Closes #8220 from mateiz/shuffle-loc-fix.

(cherry picked from commit cf016075a006034c24c5b758edb279f3e151d25d)
Signed-off-by: Matei Zaharia <ma...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa55c274
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa55c274
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa55c274

Branch: refs/heads/branch-1.5
Commit: fa55c27427bec0291847d254f4424b754dd211c9
Parents: 4f75ce2
Author: Matei Zaharia <ma...@databricks.com>
Authored: Sun Aug 16 00:34:58 2015 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Sun Aug 16 00:35:09 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 37 +++++++++++---------
 .../spark/scheduler/DAGSchedulerSuite.scala     | 26 ++++++++++++--
 2 files changed, 44 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fa55c274/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index f1c63d0..dadf83a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1383,33 +1383,36 @@ class DAGScheduler(
       return rddPrefs.map(TaskLocation(_))
     }
 
+    // If the RDD has narrow dependencies, pick the first partition of the 
first narrow dependency
+    // that has any placement preferences. Ideally we would choose based on 
transfer sizes,
+    // but this will do for now.
     rdd.dependencies.foreach {
       case n: NarrowDependency[_] =>
-        // If the RDD has narrow dependencies, pick the first partition of the 
first narrow dep
-        // that has any placement preferences. Ideally we would choose based 
on transfer sizes,
-        // but this will do for now.
         for (inPart <- n.getParents(partition)) {
           val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
           if (locs != Nil) {
             return locs
           }
         }
-      case s: ShuffleDependency[_, _, _] =>
-        // For shuffle dependencies, pick locations which have at least 
REDUCER_PREF_LOCS_FRACTION
-        // of data as preferred locations
-        if (shuffleLocalityEnabled &&
-            rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD &&
-            s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) {
-          // Get the preferred map output locations for this reducer
-          val topLocsForReducer = 
mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
-            partition, rdd.partitions.size, REDUCER_PREF_LOCS_FRACTION)
-          if (topLocsForReducer.nonEmpty) {
-            return topLocsForReducer.get.map(loc => TaskLocation(loc.host, 
loc.executorId))
-          }
-        }
-
       case _ =>
     }
+
+    // If the RDD has shuffle dependencies and shuffle locality is enabled, 
pick locations that
+    // have at least REDUCER_PREF_LOCS_FRACTION of data as preferred locations
+    if (shuffleLocalityEnabled && rdd.partitions.length < 
SHUFFLE_PREF_REDUCE_THRESHOLD) {
+      rdd.dependencies.foreach {
+        case s: ShuffleDependency[_, _, _] =>
+          if (s.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD) {
+            // Get the preferred map output locations for this reducer
+            val topLocsForReducer = 
mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
+              partition, rdd.partitions.length, REDUCER_PREF_LOCS_FRACTION)
+            if (topLocsForReducer.nonEmpty) {
+              return topLocsForReducer.get.map(loc => TaskLocation(loc.host, 
loc.executorId))
+            }
+          }
+        case _ =>
+      }
+    }
     Nil
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fa55c274/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index b0ca49c..a063596 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -926,7 +926,7 @@ class DAGSchedulerSuite
     assertLocations(reduceTaskSet, Seq(Seq("hostA")))
     complete(reduceTaskSet, Seq((Success, 42)))
     assert(results === Map(0 -> 42))
-    assertDataStructuresEmpty
+    assertDataStructuresEmpty()
   }
 
   test("reduce task locality preferences should only include machines with 
largest map outputs") {
@@ -950,7 +950,29 @@ class DAGSchedulerSuite
     assertLocations(reduceTaskSet, Seq(hosts))
     complete(reduceTaskSet, Seq((Success, 42)))
     assert(results === Map(0 -> 42))
-    assertDataStructuresEmpty
+    assertDataStructuresEmpty()
+  }
+
+  test("stages with both narrow and shuffle dependencies use narrow ones for 
locality") {
+    // Create an RDD that has both a shuffle dependency and a narrow 
dependency (e.g. for a join)
+    val rdd1 = new MyRDD(sc, 1, Nil)
+    val rdd2 = new MyRDD(sc, 1, Nil, locations = Seq(Seq("hostB")))
+    val shuffleDep = new ShuffleDependency(rdd1, null)
+    val narrowDep = new OneToOneDependency(rdd2)
+    val shuffleId = shuffleDep.shuffleId
+    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep, narrowDep))
+    submit(reduceRdd, Array(0))
+    complete(taskSets(0), Seq(
+      (Success, makeMapStatus("hostA", 1))))
+    assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 
0).map(_._1).toSet ===
+      HashSet(makeBlockManagerId("hostA")))
+
+    // Reducer should run where RDD 2 has preferences, even though though it 
also has a shuffle dep
+    val reduceTaskSet = taskSets(1)
+    assertLocations(reduceTaskSet, Seq(Seq("hostB")))
+    complete(reduceTaskSet, Seq((Success, 42)))
+    assert(results === Map(0 -> 42))
+    assertDataStructuresEmpty()
   }
 
   test("Spark exceptions should include call site in stack trace") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to