Updated Branches: refs/heads/master 02f37ee85 -> ea34c5210
Fix inconsistent and incorrect log messages in shuffle read path Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/391133f6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/391133f6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/391133f6 Branch: refs/heads/master Commit: 391133f66a41cf78cc200c20c0228eb99eebc6fd Parents: 3745a18 Author: Patrick Wendell <pwend...@gmail.com> Authored: Mon Oct 7 17:08:06 2013 -0700 Committer: Patrick Wendell <pwend...@gmail.com> Committed: Mon Oct 7 17:24:18 2013 -0700 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/CacheManager.scala | 11 +++++------ .../scala/org/apache/spark/storage/BlockManager.scala | 12 +++++++++++- 2 files changed, 16 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/391133f6/core/src/main/scala/org/apache/spark/CacheManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 3d36761..048168c 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -34,12 +34,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel) : Iterator[T] = { val key = "rdd_%d_%d".format(rdd.id, split.index) - logInfo("Cache key is " + key) + logInfo("Looking for partition " + key) blockManager.get(key) match { - case Some(cachedValues) => - // Partition is in cache, so just return its values - logInfo("Found partition in cache!") - return cachedValues.asInstanceOf[Iterator[T]] + case Some(values) => + // Partition is already materialized, so just return its values + return values.asInstanceOf[Iterator[T]] case None => // Mark the split as loading (unless someone else marks it first) @@ -68,7 +67,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { } try { // If we got here, we have to load the split - logInfo("Computing partition " + split) + logInfo("Partition %s not found, computing it".format(key)) val computedValues = rdd.computeOrReadCheckpoint(split, context) // Persist the result, so long as the task is not running locally if (context.runningLocally) { return computedValues } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/391133f6/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 495a72d..37d0ddb 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -523,7 +523,17 @@ private[spark] class BlockManager( * Get a block from the block manager (either local or remote). */ def get(blockId: String): Option[Iterator[Any]] = { - getLocal(blockId).orElse(getRemote(blockId)) + val local = getLocal(blockId) + if (local.isDefined) { + logInfo("Found block %s locally".format(blockId)) + return local + } + val remote = getRemote(blockId) + if (remote.isDefined) { + logInfo("Found block %s remotely".format(blockId)) + return remote + } + None } /**