Repository: spark Updated Branches: refs/heads/branch-2.2 c0d4acce5 -> d3c79b77a
[SPARK-21090][CORE] Optimize the unified memory manager code ## What changes were proposed in this pull request? 1.In `acquireStorageMemory`, when the Memory Mode is OFF_HEAP ,the `maxOffHeapMemory` should be modified to `maxOffHeapStorageMemory`. after this PR,it will same as ON_HEAP Memory Mode. Because when acquire memory is between `maxOffHeapStorageMemory` and `maxOffHeapMemory`,it will fail surely, so if acquire memory is greater than `maxOffHeapStorageMemory`(not greater than `maxOffHeapMemory`),we should fail fast. 2. Borrow memory from execution, `numBytes` modified to `numBytes - storagePool.memoryFree` will be more reasonable. Because we just acquire `(numBytes - storagePool.memoryFree)`, unnecessary borrowed `numBytes` from execution ## How was this patch tested? added unit test case Author: liuxian <[email protected]> Closes #18296 from 10110346/wip-lx-0614. (cherry picked from commit 112bd9bfc5b9729f6f86518998b5d80c5e79fe5e) Signed-off-by: Wenchen Fan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3c79b77 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3c79b77 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3c79b77 Branch: refs/heads/branch-2.2 Commit: d3c79b77ab6df5f20d8fc07db089051a064ecb16 Parents: c0d4acc Author: liuxian <[email protected]> Authored: Mon Jun 19 11:46:58 2017 +0800 Committer: Wenchen Fan <[email protected]> Committed: Mon Jun 19 11:52:19 2017 +0800 ---------------------------------------------------------------------- .../spark/memory/UnifiedMemoryManager.scala | 5 +-- .../spark/memory/MemoryManagerSuite.scala | 2 +- .../memory/UnifiedMemoryManagerSuite.scala | 32 ++++++++++++++++++++ 3 files changed, 36 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d3c79b77/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index fea2808..df19355 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -160,7 +160,7 @@ private[spark] class UnifiedMemoryManager private[memory] ( case MemoryMode.OFF_HEAP => ( offHeapExecutionMemoryPool, offHeapStorageMemoryPool, - maxOffHeapMemory) + maxOffHeapStorageMemory) } if (numBytes > maxMemory) { // Fail fast if the block simply won't fit @@ -171,7 +171,8 @@ private[spark] class UnifiedMemoryManager private[memory] ( if (numBytes > storagePool.memoryFree) { // There is not enough free memory in the storage pool, so try to borrow free memory from // the execution pool. - val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes) + val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, + numBytes - storagePool.memoryFree) executionPool.decrementPoolSize(memoryBorrowedFromExecution) storagePool.incrementPoolSize(memoryBorrowedFromExecution) } http://git-wip-us.apache.org/repos/asf/spark/blob/d3c79b77/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index eb2b3ff..85eeb50 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -117,7 +117,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft evictBlocksToFreeSpaceCalled.set(numBytesToFree) if (numBytesToFree <= mm.storageMemoryUsed) { // We can evict enough blocks to fulfill the request for space - mm.releaseStorageMemory(numBytesToFree, MemoryMode.ON_HEAP) + mm.releaseStorageMemory(numBytesToFree, mm.tungstenMemoryMode) evictedBlocks += Tuple2(null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L)) numBytesToFree } else { http://git-wip-us.apache.org/repos/asf/spark/blob/d3c79b77/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index c821054..02b04cd 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -303,4 +303,36 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes mm.invokePrivate[Unit](assertInvariants()) } + test("not enough free memory in the storage pool --OFF_HEAP") { + val conf = new SparkConf() + .set("spark.memory.offHeap.size", "1000") + .set("spark.testing.memory", "1000") + .set("spark.memory.offHeap.enabled", "true") + val taskAttemptId = 0L + val mm = UnifiedMemoryManager(conf, numCores = 1) + val ms = makeMemoryStore(mm) + val memoryMode = MemoryMode.OFF_HEAP + + assert(mm.acquireExecutionMemory(400L, taskAttemptId, memoryMode) === 400L) + assert(mm.storageMemoryUsed === 0L) + assert(mm.executionMemoryUsed === 400L) + + // Fail fast + assert(!mm.acquireStorageMemory(dummyBlock, 700L, memoryMode)) + assert(mm.storageMemoryUsed === 0L) + + assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode)) + assert(mm.storageMemoryUsed === 100L) + assertEvictBlocksToFreeSpaceNotCalled(ms) + + // Borrow 50 from execution memory + assert(mm.acquireStorageMemory(dummyBlock, 450L, memoryMode)) + assertEvictBlocksToFreeSpaceNotCalled(ms) + assert(mm.storageMemoryUsed === 550L) + + // Borrow 50 from execution memory and evict 50 to free space + assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode)) + assertEvictBlocksToFreeSpaceCalled(ms, 50) + assert(mm.storageMemoryUsed === 600L) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
