Repository: spark Updated Branches: refs/heads/master 2e52e4f81 -> c68344400
[SPARK-4071] Unroll fails silently if BlockManager is small In tests, we may want to have BlockManagers of size < 1MB (spark.storage.unrollMemoryThreshold). However, these BlockManagers are useless because we can't unroll anything in them ever. At the very least we need to log a warning. tdas Author: Andrew Or <[email protected]> Closes #2917 from andrewor14/unroll-safely-logging and squashes the following commits: 38947e3 [Andrew Or] Warn against starting a block manager that's too small fd621b4 [Andrew Or] Warn against failure to reserve initial memory threshold Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6834440 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6834440 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6834440 Branch: refs/heads/master Commit: c6834440085b79f6d3e011f9e55ffd672be855fe Parents: 2e52e4f Author: Andrew Or <[email protected]> Authored: Sat Oct 25 20:07:44 2014 -0700 Committer: Josh Rosen <[email protected]> Committed: Sat Oct 25 20:07:44 2014 -0700 ---------------------------------------------------------------------- .../org/apache/spark/storage/MemoryStore.scala | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c6834440/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index edbc729..71305a4 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -56,6 +56,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) (maxMemory * unrollFraction).toLong } + // Initial memory to request before unrolling any block + private val unrollMemoryThreshold: Long = + conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024) + + if (maxMemory < unrollMemoryThreshold) { + logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " + + s"threshold ${Utils.bytesToString(unrollMemoryThreshold)} needed to store a block in " + + s"memory. Please configure Spark with more memory.") + } + logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory))) /** Free memory not occupied by existing blocks. Note that this does not include unroll memory. */ @@ -213,7 +223,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // Whether there is still enough memory for us to continue unrolling this block var keepUnrolling = true // Initial per-thread memory to request for unrolling blocks (bytes). Exposed for testing. - val initialMemoryThreshold = conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024) + val initialMemoryThreshold = unrollMemoryThreshold // How often to check whether we need to request more memory val memoryCheckPeriod = 16 // Memory currently reserved by this thread for this particular unrolling operation @@ -228,6 +238,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // Request enough memory to begin unrolling keepUnrolling = reserveUnrollMemoryForThisThread(initialMemoryThreshold) + if (!keepUnrolling) { + logWarning(s"Failed to reserve initial memory threshold of " + + s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") + } + // Unroll this block safely, checking whether we have exceeded our threshold periodically try { while (values.hasNext && keepUnrolling) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
