Updated Branches:
  refs/heads/branch-0.8 f2cdcc465 -> 8e9bd9371

Merge pull request #42 from pwendell/shuffle-read-perf

Fix inconsistent and incorrect log messages in shuffle read path

The user-facing messages generated by the CacheManager are currently wrong and 
somewhat misleading. This patch makes the messages more accurate. It also uses 
a consistent representation of the partition being fetched (`rdd_xx_yy`) so 
that it's easier for users to trace what is going on when reading logs.

(cherry picked from commit ea34c521025d3408d44d45ab5c132fd9791794f6)
Signed-off-by: Reynold Xin <r...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/8e9bd937
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/8e9bd937
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/8e9bd937

Branch: refs/heads/branch-0.8
Commit: 8e9bd93713b47e77ed72a64572cba17f5ba0f48a
Parents: f2cdcc4
Author: Reynold Xin <r...@apache.org>
Authored: Mon Oct 7 20:45:58 2013 -0700
Committer: Reynold Xin <r...@apache.org>
Committed: Mon Oct 7 20:47:09 2013 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/CacheManager.scala | 17 ++++++++---------
 .../org/apache/spark/storage/BlockManager.scala    | 12 +++++++++++-
 2 files changed, 19 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8e9bd937/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala 
b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 68b99ca..3aeea99 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -32,22 +32,21 @@ private[spark] class CacheManager(blockManager: 
BlockManager) extends Logging {
   def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, 
storageLevel: StorageLevel)
       : Iterator[T] = {
     val key = "rdd_%d_%d".format(rdd.id, split.index)
-    logInfo("Cache key is " + key)
+    logDebug("Looking for partition " + key)
     blockManager.get(key) match {
-      case Some(cachedValues) =>
-        // Partition is in cache, so just return its values
-        logInfo("Found partition in cache!")
-        return cachedValues.asInstanceOf[Iterator[T]]
+      case Some(values) =>
+        // Partition is already materialized, so just return its values
+        return values.asInstanceOf[Iterator[T]]
 
       case None =>
         // Mark the split as loading (unless someone else marks it first)
         loading.synchronized {
           if (loading.contains(key)) {
-            logInfo("Loading contains " + key + ", waiting...")
+            logInfo("Another thread is loading %s, waiting for it to 
finish...".format(key))
             while (loading.contains(key)) {
               try {loading.wait()} catch {case _ : Throwable =>}
             }
-            logInfo("Loading no longer contains " + key + ", so returning 
cached result")
+            logInfo("Finished waiting for %s".format(key))
             // See whether someone else has successfully loaded it. The main 
way this would fail
             // is for the RDD-level cache eviction policy if someone else has 
loaded the same RDD
             // partition but we didn't want to make space for it. However, 
that case is unlikely
@@ -57,7 +56,7 @@ private[spark] class CacheManager(blockManager: BlockManager) 
extends Logging {
               case Some(values) =>
                 return values.asInstanceOf[Iterator[T]]
               case None =>
-                logInfo("Whoever was loading " + key + " failed; we'll try it 
ourselves")
+                logInfo("Whoever was loading %s failed; we'll try it 
ourselves".format(key))
                 loading.add(key)
             }
           } else {
@@ -66,7 +65,7 @@ private[spark] class CacheManager(blockManager: BlockManager) 
extends Logging {
         }
         try {
           // If we got here, we have to load the split
-          logInfo("Computing partition " + split)
+          logInfo("Partition %s not found, computing it".format(key))
           val computedValues = rdd.computeOrReadCheckpoint(split, context)
           // Persist the result, so long as the task is not running locally
           if (context.runningLocally) { return computedValues }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8e9bd937/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 60fdc5f..f30e74d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -498,7 +498,17 @@ private[spark] class BlockManager(
    * Get a block from the block manager (either local or remote).
    */
   def get(blockId: String): Option[Iterator[Any]] = {
-    getLocal(blockId).orElse(getRemote(blockId))
+    val local = getLocal(blockId)
+    if (local.isDefined) {
+      logInfo("Found block %s locally".format(blockId))
+      return local
+    }
+    val remote = getRemote(blockId)
+    if (remote.isDefined) {
+      logInfo("Found block %s remotely".format(blockId))
+      return remote
+    }
+    None
   }
 
   /**

Reply via email to