Updated Branches: refs/heads/branch-0.8 f2cdcc465 -> 8e9bd9371
Merge pull request #42 from pwendell/shuffle-read-perf Fix inconsistent and incorrect log messages in shuffle read path The user-facing messages generated by the CacheManager are currently wrong and somewhat misleading. This patch makes the messages more accurate. It also uses a consistent representation of the partition being fetched (`rdd_xx_yy`) so that it's easier for users to trace what is going on when reading logs. (cherry picked from commit ea34c521025d3408d44d45ab5c132fd9791794f6) Signed-off-by: Reynold Xin <r...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/8e9bd937 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/8e9bd937 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/8e9bd937 Branch: refs/heads/branch-0.8 Commit: 8e9bd93713b47e77ed72a64572cba17f5ba0f48a Parents: f2cdcc4 Author: Reynold Xin <r...@apache.org> Authored: Mon Oct 7 20:45:58 2013 -0700 Committer: Reynold Xin <r...@apache.org> Committed: Mon Oct 7 20:47:09 2013 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/CacheManager.scala | 17 ++++++++--------- .../org/apache/spark/storage/BlockManager.scala | 12 +++++++++++- 2 files changed, 19 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8e9bd937/core/src/main/scala/org/apache/spark/CacheManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 68b99ca..3aeea99 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -32,22 +32,21 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel) : Iterator[T] = { val key = "rdd_%d_%d".format(rdd.id, split.index) - logInfo("Cache key is " + key) + logDebug("Looking for partition " + key) blockManager.get(key) match { - case Some(cachedValues) => - // Partition is in cache, so just return its values - logInfo("Found partition in cache!") - return cachedValues.asInstanceOf[Iterator[T]] + case Some(values) => + // Partition is already materialized, so just return its values + return values.asInstanceOf[Iterator[T]] case None => // Mark the split as loading (unless someone else marks it first) loading.synchronized { if (loading.contains(key)) { - logInfo("Loading contains " + key + ", waiting...") + logInfo("Another thread is loading %s, waiting for it to finish...".format(key)) while (loading.contains(key)) { try {loading.wait()} catch {case _ : Throwable =>} } - logInfo("Loading no longer contains " + key + ", so returning cached result") + logInfo("Finished waiting for %s".format(key)) // See whether someone else has successfully loaded it. The main way this would fail // is for the RDD-level cache eviction policy if someone else has loaded the same RDD // partition but we didn't want to make space for it. However, that case is unlikely @@ -57,7 +56,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { case Some(values) => return values.asInstanceOf[Iterator[T]] case None => - logInfo("Whoever was loading " + key + " failed; we'll try it ourselves") + logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key)) loading.add(key) } } else { @@ -66,7 +65,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { } try { // If we got here, we have to load the split - logInfo("Computing partition " + split) + logInfo("Partition %s not found, computing it".format(key)) val computedValues = rdd.computeOrReadCheckpoint(split, context) // Persist the result, so long as the task is not running locally if (context.runningLocally) { return computedValues } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8e9bd937/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 60fdc5f..f30e74d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -498,7 +498,17 @@ private[spark] class BlockManager( * Get a block from the block manager (either local or remote). */ def get(blockId: String): Option[Iterator[Any]] = { - getLocal(blockId).orElse(getRemote(blockId)) + val local = getLocal(blockId) + if (local.isDefined) { + logInfo("Found block %s locally".format(blockId)) + return local + } + val remote = getRemote(blockId) + if (remote.isDefined) { + logInfo("Found block %s remotely".format(blockId)) + return remote + } + None } /**