Repository: spark
Updated Branches:
  refs/heads/branch-2.2 c0d4acce5 -> d3c79b77a


[SPARK-21090][CORE] Optimize the unified memory manager code

## What changes were proposed in this pull request?
1.In `acquireStorageMemory`, when the Memory Mode is OFF_HEAP ,the 
`maxOffHeapMemory` should be modified to `maxOffHeapStorageMemory`. after this 
PR,it will same as ON_HEAP Memory Mode.
Because when acquire memory is between `maxOffHeapStorageMemory` and 
`maxOffHeapMemory`,it will fail surely, so if acquire memory is greater than  
`maxOffHeapStorageMemory`(not greater than `maxOffHeapMemory`),we should fail 
fast.
2. Borrow memory from execution, `numBytes` modified to `numBytes - 
storagePool.memoryFree` will be more reasonable.
Because we just acquire `(numBytes - storagePool.memoryFree)`, unnecessary 
borrowed `numBytes` from execution

## How was this patch tested?
added unit test case

Author: liuxian <[email protected]>

Closes #18296 from 10110346/wip-lx-0614.

(cherry picked from commit 112bd9bfc5b9729f6f86518998b5d80c5e79fe5e)
Signed-off-by: Wenchen Fan <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3c79b77
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3c79b77
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3c79b77

Branch: refs/heads/branch-2.2
Commit: d3c79b77ab6df5f20d8fc07db089051a064ecb16
Parents: c0d4acc
Author: liuxian <[email protected]>
Authored: Mon Jun 19 11:46:58 2017 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Mon Jun 19 11:52:19 2017 +0800

----------------------------------------------------------------------
 .../spark/memory/UnifiedMemoryManager.scala     |  5 +--
 .../spark/memory/MemoryManagerSuite.scala       |  2 +-
 .../memory/UnifiedMemoryManagerSuite.scala      | 32 ++++++++++++++++++++
 3 files changed, 36 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d3c79b77/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index fea2808..df19355 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -160,7 +160,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
       case MemoryMode.OFF_HEAP => (
         offHeapExecutionMemoryPool,
         offHeapStorageMemoryPool,
-        maxOffHeapMemory)
+        maxOffHeapStorageMemory)
     }
     if (numBytes > maxMemory) {
       // Fail fast if the block simply won't fit
@@ -171,7 +171,8 @@ private[spark] class UnifiedMemoryManager private[memory] (
     if (numBytes > storagePool.memoryFree) {
       // There is not enough free memory in the storage pool, so try to borrow 
free memory from
       // the execution pool.
-      val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, 
numBytes)
+      val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree,
+        numBytes - storagePool.memoryFree)
       executionPool.decrementPoolSize(memoryBorrowedFromExecution)
       storagePool.incrementPoolSize(memoryBorrowedFromExecution)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/d3c79b77/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index eb2b3ff..85eeb50 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -117,7 +117,7 @@ private[memory] trait MemoryManagerSuite extends 
SparkFunSuite with BeforeAndAft
         evictBlocksToFreeSpaceCalled.set(numBytesToFree)
         if (numBytesToFree <= mm.storageMemoryUsed) {
           // We can evict enough blocks to fulfill the request for space
-          mm.releaseStorageMemory(numBytesToFree, MemoryMode.ON_HEAP)
+          mm.releaseStorageMemory(numBytesToFree, mm.tungstenMemoryMode)
           evictedBlocks += Tuple2(null, BlockStatus(StorageLevel.MEMORY_ONLY, 
numBytesToFree, 0L))
           numBytesToFree
         } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/d3c79b77/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index c821054..02b04cd 100644
--- 
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -303,4 +303,36 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite 
with PrivateMethodTes
     mm.invokePrivate[Unit](assertInvariants())
   }
 
+  test("not enough free memory in the storage pool --OFF_HEAP") {
+    val conf = new SparkConf()
+      .set("spark.memory.offHeap.size", "1000")
+      .set("spark.testing.memory", "1000")
+      .set("spark.memory.offHeap.enabled", "true")
+    val taskAttemptId = 0L
+    val mm = UnifiedMemoryManager(conf, numCores = 1)
+    val ms = makeMemoryStore(mm)
+    val memoryMode = MemoryMode.OFF_HEAP
+
+    assert(mm.acquireExecutionMemory(400L, taskAttemptId, memoryMode) === 400L)
+    assert(mm.storageMemoryUsed === 0L)
+    assert(mm.executionMemoryUsed === 400L)
+
+    // Fail fast
+    assert(!mm.acquireStorageMemory(dummyBlock, 700L, memoryMode))
+    assert(mm.storageMemoryUsed === 0L)
+
+    assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode))
+    assert(mm.storageMemoryUsed === 100L)
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
+
+    // Borrow 50 from execution memory
+    assert(mm.acquireStorageMemory(dummyBlock, 450L, memoryMode))
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
+    assert(mm.storageMemoryUsed === 550L)
+
+    // Borrow 50 from execution memory and evict 50 to free space
+    assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode))
+    assertEvictBlocksToFreeSpaceCalled(ms, 50)
+    assert(mm.storageMemoryUsed === 600L)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to