Repository: spark
Updated Branches:
  refs/heads/master bc87cc410 -> 553737c6e


[SPARK-3825] Log more detail when unrolling a block fails

Before:
```
14/10/06 16:45:42 WARN CacheManager: Not enough space to cache partition rdd_0_2
in memory! Free memory is 481861527 bytes.
```
After:
```
14/10/07 11:08:24 WARN MemoryStore: Not enough space to cache rdd_2_0 in memory!
(computed 68.8 MB so far)
14/10/07 11:08:24 INFO MemoryStore: Memory use = 1088.0 B (blocks) + 445.1 MB
(scratch space shared across 8 thread(s)) = 445.1 MB. Storage limit = 459.5 MB.
```

Author: Andrew Or <andrewo...@gmail.com>

Closes #2688 from andrewor14/cache-log-message and squashes the following 
commits:

28e33d6 [Andrew Or] Shy away from "unrolling"
5638c49 [Andrew Or] Grammar
39a0c28 [Andrew Or] Log more detail when unrolling a block fails


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/553737c6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/553737c6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/553737c6

Branch: refs/heads/master
Commit: 553737c6e6d5ffa3b52a9888444f4beece5c5b1a
Parents: bc87cc4
Author: Andrew Or <andrewo...@gmail.com>
Authored: Tue Oct 7 12:52:10 2014 -0700
Committer: Andrew Or <andrewo...@gmail.com>
Committed: Tue Oct 7 12:52:10 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/CacheManager.scala   |  2 -
 .../org/apache/spark/storage/MemoryStore.scala  | 45 +++++++++++++++++---
 2 files changed, 39 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/553737c6/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala 
b/core/src/main/scala/org/apache/spark/CacheManager.scala
index f8584b9..d89bb50 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -168,8 +168,6 @@ private[spark] class CacheManager(blockManager: 
BlockManager) extends Logging {
           arr.iterator.asInstanceOf[Iterator[T]]
         case Right(it) =>
           // There is not enough space to cache this partition in memory
-          logWarning(s"Not enough space to cache partition $key in memory! " +
-            s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.")
           val returnValues = it.asInstanceOf[Iterator[T]]
           if (putLevel.useDisk) {
             logWarning(s"Persisting partition $key to disk instead.")

http://git-wip-us.apache.org/repos/asf/spark/blob/553737c6/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 0a09c24..edbc729 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -132,8 +132,6 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, maxMemory: Long)
         PutResult(res.size, res.data, droppedBlocks)
       case Right(iteratorValues) =>
         // Not enough space to unroll this block; drop to disk if applicable
-        logWarning(s"Not enough space to store block $blockId in memory! " +
-          s"Free memory is $freeMemory bytes.")
         if (level.useDisk && allowPersistToDisk) {
           logWarning(s"Persisting block $blockId to disk instead.")
           val res = blockManager.diskStore.putIterator(blockId, 
iteratorValues, level, returnValues)
@@ -265,6 +263,7 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, maxMemory: Long)
         Left(vector.toArray)
       } else {
         // We ran out of space while unrolling the values for this block
+        logUnrollFailureMessage(blockId, vector.estimateSize())
         Right(vector.iterator ++ values)
       }
 
@@ -424,7 +423,7 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, maxMemory: Long)
    * Reserve additional memory for unrolling blocks used by this thread.
    * Return whether the request is granted.
    */
-  private[spark] def reserveUnrollMemoryForThisThread(memory: Long): Boolean = 
{
+  def reserveUnrollMemoryForThisThread(memory: Long): Boolean = {
     accountingLock.synchronized {
       val granted = freeMemory > currentUnrollMemory + memory
       if (granted) {
@@ -439,7 +438,7 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, maxMemory: Long)
    * Release memory used by this thread for unrolling blocks.
    * If the amount is not specified, remove the current thread's allocation 
altogether.
    */
-  private[spark] def releaseUnrollMemoryForThisThread(memory: Long = -1L): 
Unit = {
+  def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = {
     val threadId = Thread.currentThread().getId
     accountingLock.synchronized {
       if (memory < 0) {
@@ -457,16 +456,50 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, maxMemory: Long)
   /**
    * Return the amount of memory currently occupied for unrolling blocks 
across all threads.
    */
-  private[spark] def currentUnrollMemory: Long = accountingLock.synchronized {
+  def currentUnrollMemory: Long = accountingLock.synchronized {
     unrollMemoryMap.values.sum
   }
 
   /**
    * Return the amount of memory currently occupied for unrolling blocks by 
this thread.
    */
-  private[spark] def currentUnrollMemoryForThisThread: Long = 
accountingLock.synchronized {
+  def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized {
     unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L)
   }
+
+  /**
+   * Return the number of threads currently unrolling blocks.
+   */
+  def numThreadsUnrolling: Int = accountingLock.synchronized { 
unrollMemoryMap.keys.size }
+
+  /**
+   * Log information about current memory usage.
+   */
+  def logMemoryUsage(): Unit = {
+    val blocksMemory = currentMemory
+    val unrollMemory = currentUnrollMemory
+    val totalMemory = blocksMemory + unrollMemory
+    logInfo(
+      s"Memory use = ${Utils.bytesToString(blocksMemory)} (blocks) + " +
+      s"${Utils.bytesToString(unrollMemory)} (scratch space shared across " +
+      s"$numThreadsUnrolling thread(s)) = ${Utils.bytesToString(totalMemory)}. 
" +
+      s"Storage limit = ${Utils.bytesToString(maxMemory)}."
+    )
+  }
+
+  /**
+   * Log a warning for failing to unroll a block.
+   *
+   * @param blockId ID of the block we are trying to unroll.
+   * @param finalVectorSize Final size of the vector before unrolling failed.
+   */
+  def logUnrollFailureMessage(blockId: BlockId, finalVectorSize: Long): Unit = 
{
+    logWarning(
+      s"Not enough space to cache $blockId in memory! " +
+      s"(computed ${Utils.bytesToString(finalVectorSize)} so far)"
+    )
+    logMemoryUsage()
+  }
 }
 
 private[spark] case class ResultWithDroppedBlocks(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to