Repository: spark
Updated Branches:
  refs/heads/branch-1.6 e2a43d007 -> fd2da7b91


[SPARK-15260] Atomically resize memory pools (branch 1.6)

## What changes were proposed in this pull request?

(This is the branch-1.6 version of #13039)

When we acquire execution memory, we do a lot of things between shrinking the 
storage memory pool and enlarging the execution memory pool. In particular, we 
call memoryStore.evictBlocksToFreeSpace, which may do a lot of I/O and can 
throw exceptions. If an exception is thrown, the pool sizes on that executor 
will be in a bad state.

This patch minimizes the things we do between the two calls to make the 
resizing more atomic.

## How was this patch tested?

Jenkins.

Author: Andrew Or <[email protected]>

Closes #13058 from andrewor14/safer-pool-1.6.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fd2da7b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fd2da7b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fd2da7b9

Branch: refs/heads/branch-1.6
Commit: fd2da7b91e33e8fc994c4a6a0524831807f1324f
Parents: e2a43d0
Author: Andrew Or <[email protected]>
Authored: Wed May 11 17:25:57 2016 -0700
Committer: Davies Liu <[email protected]>
Committed: Wed May 11 17:25:57 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/memory/StorageMemoryPool.scala | 11 +++++-----
 .../spark/memory/UnifiedMemoryManager.scala     | 15 ++++++++-----
 .../spark/memory/MemoryManagerSuite.scala       | 15 +++++++++++++
 .../memory/UnifiedMemoryManagerSuite.scala      | 23 ++++++++++++++++++++
 4 files changed, 53 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fd2da7b9/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala 
b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
index 70af83b..89edaf5 100644
--- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
@@ -119,13 +119,13 @@ private[memory] class StorageMemoryPool(lock: Object) 
extends MemoryPool(lock) w
   }
 
   /**
-   * Try to shrink the size of this storage memory pool by `spaceToFree` 
bytes. Return the number
-   * of bytes removed from the pool's capacity.
+   * Free space to shrink the size of this storage memory pool by 
`spaceToFree` bytes.
+   * Note: this method doesn't actually reduce the pool size but relies on the 
caller to do so.
+   *
+   * @return number of bytes to be removed from the pool's capacity.
    */
-  def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {
-    // First, shrink the pool by reclaiming free memory:
+  def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
     val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
-    decrementPoolSize(spaceFreedByReleasingUnusedMemory)
     val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
     if (remainingSpaceToFree > 0) {
       // If reclaiming free memory did not adequately shrink the pool, begin 
evicting blocks:
@@ -134,7 +134,6 @@ private[memory] class StorageMemoryPool(lock: Object) 
extends MemoryPool(lock) w
       val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum
       // When a block is released, BlockManager.dropFromMemory() calls 
releaseMemory(), so we do
       // not need to decrement _memoryUsed here. However, we do need to 
decrement the pool size.
-      decrementPoolSize(spaceFreedByEviction)
       spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
     } else {
       spaceFreedByReleasingUnusedMemory

http://git-wip-us.apache.org/repos/asf/spark/blob/fd2da7b9/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 829f054..802087c 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -57,8 +57,12 @@ private[spark] class UnifiedMemoryManager private[memory] (
     storageRegionSize,
     maxMemory - storageRegionSize) {
 
+  assertInvariant()
+
   // We always maintain this invariant:
-  assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == 
maxMemory)
+  private def assertInvariant(): Unit = {
+    assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == 
maxMemory)
+  }
 
   override def maxStorageMemory: Long = synchronized {
     maxMemory - onHeapExecutionMemoryPool.memoryUsed
@@ -77,7 +81,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
       numBytes: Long,
       taskAttemptId: Long,
       memoryMode: MemoryMode): Long = synchronized {
-    assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == 
maxMemory)
+    assertInvariant()
     assert(numBytes >= 0)
     memoryMode match {
       case MemoryMode.ON_HEAP =>
@@ -99,9 +103,10 @@ private[spark] class UnifiedMemoryManager private[memory] (
               math.max(storageMemoryPool.memoryFree, 
storageMemoryPool.poolSize - storageRegionSize)
             if (memoryReclaimableFromStorage > 0) {
               // Only reclaim as much space as is necessary and available:
-              val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
+              val spaceToReclaim = storageMemoryPool.freeSpaceToShrinkPool(
                 math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
-              onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
+              storageMemoryPool.decrementPoolSize(spaceToReclaim)
+              onHeapExecutionMemoryPool.incrementPoolSize(spaceToReclaim)
             }
           }
         }
@@ -137,7 +142,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
       blockId: BlockId,
       numBytes: Long,
       evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = 
synchronized {
-    assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == 
maxMemory)
+    assertInvariant()
     assert(numBytes >= 0)
     if (numBytes > maxStorageMemory) {
       // Fail fast if the block simply won't fit

http://git-wip-us.apache.org/repos/asf/spark/blob/fd2da7b9/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index 555b640..6a195ef 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -77,6 +77,21 @@ private[memory] trait MemoryManagerSuite extends 
SparkFunSuite with BeforeAndAft
   }
 
   /**
+   * Make a mocked [[MemoryStore]] whose 
[[MemoryStore.evictBlocksToFreeSpace]] method is
+   * stubbed to always throw [[RuntimeException]].
+   */
+  protected def makeBadMemoryStore(mm: MemoryManager): MemoryStore = {
+    val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS)
+    when(ms.evictBlocksToFreeSpace(any(), anyLong(), any())).thenAnswer(new 
Answer[Long] {
+      override def answer(invocation: InvocationOnMock): Long = {
+        throw new RuntimeException("bad memory store!")
+      }
+    })
+    mm.setMemoryStore(ms)
+    ms
+  }
+
+  /**
     * Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that 
releases storage memory.
     *
     * This is a significant simplification of the real method, which actually 
drops existing

http://git-wip-us.apache.org/repos/asf/spark/blob/fd2da7b9/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 6cc4859..46b6916 100644
--- 
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -255,4 +255,27 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite 
with PrivateMethodTes
     assert(evictedBlocks.nonEmpty)
   }
 
+  test("SPARK-15260: atomically resize memory pools") {
+    val conf = new SparkConf()
+      .set("spark.memory.fraction", "1")
+      .set("spark.memory.storageFraction", "0")
+      .set("spark.testing.memory", "1000")
+    val mm = UnifiedMemoryManager(conf, numCores = 2)
+    makeBadMemoryStore(mm)
+    val memoryMode = MemoryMode.ON_HEAP
+    // Acquire 1000 then release 600 bytes of storage memory, leaving the
+    // storage memory pool at 1000 bytes but only 400 bytes of which are used.
+    assert(mm.acquireStorageMemory(dummyBlock, 1000L, evictedBlocks))
+    mm.releaseStorageMemory(600L)
+    // Before the fix for SPARK-15260, we would first shrink the storage pool 
by the amount of
+    // unused storage memory (600 bytes), try to evict blocks, then enlarge 
the execution pool
+    // by the same amount. If the eviction threw an exception, then we would 
shrink one pool
+    // without enlarging the other, resulting in an assertion failure.
+    intercept[RuntimeException] {
+      mm.acquireExecutionMemory(1000L, 0, memoryMode)
+    }
+    val assertInvariant = PrivateMethod[Unit]('assertInvariant)
+    mm.invokePrivate[Unit](assertInvariant())
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to