Repository: spark
Updated Branches:
  refs/heads/master 442a7715a -> aec5ea000


[SPARK-12165][SPARK-12189] Fix bugs in eviction of storage memory by execution

This patch fixes a bug in the eviction of storage memory by execution.

## The bug:

In general, execution should be able to evict storage memory when the total 
storage memory usage is greater than `maxMemory * 
spark.memory.storageFraction`. Due to a bug, however, Spark might wind up 
evicting no storage memory in certain cases where the storage memory usage was 
between `maxMemory * spark.memory.storageFraction` and `maxMemory`. For 
example, here is a regression test which illustrates the bug:

```scala
    val maxMemory = 1000L
    val taskAttemptId = 0L
    val (mm, ms) = makeThings(maxMemory)
    // Since we used the default storage fraction (0.5), we should be able to 
allocate 500 bytes
    // of storage memory which are immune to eviction by execution memory 
pressure.

    // Acquire enough storage memory to exceed the storage region size
    assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
    assertEvictBlocksToFreeSpaceNotCalled(ms)
    assert(mm.executionMemoryUsed === 0L)
    assert(mm.storageMemoryUsed === 750L)

    // At this point, storage is using 250 more bytes of memory than it is 
guaranteed, so execution
    // should be able to reclaim up to 250 bytes of storage memory.
    // Therefore, execution should now be able to require up to 500 bytes of 
memory:
    assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 500L) // <--- fails by only returning 250L
    assert(mm.storageMemoryUsed === 500L)
    assert(mm.executionMemoryUsed === 500L)
    assertEvictBlocksToFreeSpaceCalled(ms, 250L)
```

The problem relates to the control flow / interaction between 
`StorageMemoryPool.shrinkPoolToReclaimSpace()` and 
`MemoryStore.ensureFreeSpace()`. While trying to allocate the 500 bytes of 
execution memory, the `UnifiedMemoryManager` discovers that it will need to 
reclaim 250 bytes of memory from storage, so it calls 
`StorageMemoryPool.shrinkPoolToReclaimSpace(250L)`. This method, in turn, calls 
`MemoryStore.ensureFreeSpace(250L)`. However, `ensureFreeSpace()` first checks 
whether the requested space is less than `maxStorageMemory - 
storageMemoryUsed`, which will be true if there is any free execution memory 
because it turns out that `MemoryStore.maxStorageMemory = (maxMemory - 
onHeapExecutionMemoryPool.memoryUsed)` when the `UnifiedMemoryManager` is used.

The control flow here is somewhat confusing (it grew to be messy / confusing 
over time / as a result of the merging / refactoring of several components). In 
the pre-Spark 1.6 code, `ensureFreeSpace` was called directly by the 
`MemoryStore` itself, whereas in 1.6 it's involved in a confusing control flow 
where `MemoryStore` calls `MemoryManager.acquireStorageMemory`, which then 
calls back into `MemoryStore.ensureFreeSpace`, which, in turn, calls 
`MemoryManager.freeStorageMemory`.

## The solution:

The solution implemented in this patch is to remove the confusing circular 
control flow between `MemoryManager` and `MemoryStore`, making the storage 
memory acquisition process much more linear / straightforward. The key changes:

- Remove a layer of inheritance which made the memory manager code harder to 
understand (53841174760a24a0df3eb1562af1f33dbe340eb9).
- Move some bounds checks earlier in the call chain 
(13ba7ada77f87ef1ec362aec35c89a924e6987cb).
- Refactor `ensureFreeSpace()` so that the part which evicts blocks can be 
called independently from the part which checks whether there is enough free 
space to avoid eviction (7c68ca09cb1b12f157400866983f753ac863380e).
- Realize that this lets us remove a layer of overloads from `ensureFreeSpace` 
(eec4f6c87423d5e482b710e098486b3bbc4daf06).
- Realize that `ensureFreeSpace()` can simply be replaced with an 
`evictBlocksToFreeSpace()` method which is called [after we've already figured 
out](https://github.com/apache/spark/blob/2dc842aea82c8895125d46a00aa43dfb0d121de9/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala#L88)
 how much memory needs to be reclaimed via eviction; 
(2dc842aea82c8895125d46a00aa43dfb0d121de9).

Along the way, I fixed some problems with the mocks in `MemoryManagerSuite`: 
the old mocks would 
[unconditionally](https://github.com/apache/spark/blob/80a824d36eec9d9a9f092ee1741453851218ec73/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala#L84)
 report that a block had been evicted even if there was enough space in the 
storage pool such that eviction would be avoided.

I also fixed a problem where `StorageMemoryPool._memoryUsed` might become 
negative due to freed memory being double-counted when excution evicts storage. 
The problem was that `StorageMemoryPoolshrinkPoolToFreeSpace` would [decrement 
`_memoryUsed`](https://github.com/apache/spark/commit/7c68ca09cb1b12f157400866983f753ac863380e#diff-935c68a9803be144ed7bafdd2f756a0fL133)
 even though `StorageMemoryPool.freeMemory` had already decremented it as each 
evicted block was freed. See SPARK-12189 for details.

Author: Josh Rosen <joshro...@databricks.com>
Author: Andrew Or <and...@databricks.com>

Closes #10170 from JoshRosen/SPARK-12165.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aec5ea00
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aec5ea00
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aec5ea00

Branch: refs/heads/master
Commit: aec5ea000ebb8921f42f006b694ef26f5df67d83
Parents: 442a771
Author: Josh Rosen <joshro...@databricks.com>
Authored: Wed Dec 9 11:39:59 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Wed Dec 9 11:39:59 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/memory/MemoryManager.scala |  11 +-
 .../spark/memory/StaticMemoryManager.scala      |  37 ++++-
 .../apache/spark/memory/StorageMemoryPool.scala |  37 ++---
 .../spark/memory/UnifiedMemoryManager.scala     |   8 +-
 .../org/apache/spark/storage/MemoryStore.scala  |  76 +++-------
 .../spark/memory/MemoryManagerSuite.scala       | 137 ++++++++++---------
 .../spark/memory/StaticMemoryManagerSuite.scala |  52 +++----
 .../memory/UnifiedMemoryManagerSuite.scala      |  76 ++++++----
 8 files changed, 230 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/aec5ea00/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index ceb8ea4..ae9e1ac 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -77,9 +77,7 @@ private[spark] abstract class MemoryManager(
   def acquireStorageMemory(
       blockId: BlockId,
       numBytes: Long,
-      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = 
synchronized {
-    storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
-  }
+      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
 
   /**
    * Acquire N bytes of memory to unroll the given block, evicting existing 
ones if necessary.
@@ -109,12 +107,7 @@ private[spark] abstract class MemoryManager(
   def acquireExecutionMemory(
       numBytes: Long,
       taskAttemptId: Long,
-      memoryMode: MemoryMode): Long = synchronized {
-    memoryMode match {
-      case MemoryMode.ON_HEAP => 
onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
-      case MemoryMode.OFF_HEAP => 
offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
-    }
-  }
+      memoryMode: MemoryMode): Long
 
   /**
    * Release numBytes of execution memory belonging to the given task.

http://git-wip-us.apache.org/repos/asf/spark/blob/aec5ea00/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index 12a0943..3554b55 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -49,19 +49,50 @@ private[spark] class StaticMemoryManager(
   }
 
   // Max number of bytes worth of blocks to evict when unrolling
-  private val maxMemoryToEvictForUnroll: Long = {
+  private val maxUnrollMemory: Long = {
     (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 
0.2)).toLong
   }
 
+  override def acquireStorageMemory(
+      blockId: BlockId,
+      numBytes: Long,
+      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = 
synchronized {
+    if (numBytes > maxStorageMemory) {
+      // Fail fast if the block simply won't fit
+      logInfo(s"Will not store $blockId as the required space ($numBytes 
bytes) exceeds our " +
+        s"memory limit ($maxStorageMemory bytes)")
+      false
+    } else {
+      storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
+    }
+  }
+
   override def acquireUnrollMemory(
       blockId: BlockId,
       numBytes: Long,
       evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = 
synchronized {
     val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory
-    val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - 
currentUnrollMemory)
-    val numBytesToFree = math.min(numBytes, maxNumBytesToFree)
+    val freeMemory = storageMemoryPool.memoryFree
+    // When unrolling, we will use all of the existing free memory, and, if 
necessary,
+    // some extra space freed from evicting cached blocks. We must place a cap 
on the
+    // amount of memory to be evicted by unrolling, however, otherwise 
unrolling one
+    // big block can blow away the entire cache.
+    val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory 
- freeMemory)
+    // Keep it within the range 0 <= X <= maxNumBytesToFree
+    val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - 
freeMemory))
     storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree, 
evictedBlocks)
   }
+
+  private[memory]
+  override def acquireExecutionMemory(
+      numBytes: Long,
+      taskAttemptId: Long,
+      memoryMode: MemoryMode): Long = synchronized {
+    memoryMode match {
+      case MemoryMode.ON_HEAP => 
onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
+      case MemoryMode.OFF_HEAP => 
offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
+    }
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/aec5ea00/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala 
b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
index fc4f035..70af83b 100644
--- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
@@ -65,7 +65,8 @@ private[memory] class StorageMemoryPool(lock: Object) extends 
MemoryPool(lock) w
       blockId: BlockId,
       numBytes: Long,
       evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = 
lock.synchronized {
-    acquireMemory(blockId, numBytes, numBytes, evictedBlocks)
+    val numBytesToFree = math.max(0, numBytes - memoryFree)
+    acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
   }
 
   /**
@@ -73,7 +74,7 @@ private[memory] class StorageMemoryPool(lock: Object) extends 
MemoryPool(lock) w
    *
    * @param blockId the ID of the block we are acquiring storage memory for
    * @param numBytesToAcquire the size of this block
-   * @param numBytesToFree the size of space to be freed through evicting 
blocks
+   * @param numBytesToFree the amount of space to be freed through evicting 
blocks
    * @return whether all N bytes were successfully granted.
    */
   def acquireMemory(
@@ -84,16 +85,18 @@ private[memory] class StorageMemoryPool(lock: Object) 
extends MemoryPool(lock) w
     assert(numBytesToAcquire >= 0)
     assert(numBytesToFree >= 0)
     assert(memoryUsed <= poolSize)
-    memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
-    // Register evicted blocks, if any, with the active task metrics
-    Option(TaskContext.get()).foreach { tc =>
-      val metrics = tc.taskMetrics()
-      val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, 
BlockStatus)]())
-      metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
+    if (numBytesToFree > 0) {
+      memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, 
evictedBlocks)
+      // Register evicted blocks, if any, with the active task metrics
+      Option(TaskContext.get()).foreach { tc =>
+        val metrics = tc.taskMetrics()
+        val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, 
BlockStatus)]())
+        metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
+      }
     }
     // NOTE: If the memory store evicts blocks, then those evictions will 
synchronously call
-    // back into this StorageMemoryPool in order to free. Therefore, these 
variables should have
-    // been updated.
+    // back into this StorageMemoryPool in order to free memory. Therefore, 
these variables
+    // should have been updated.
     val enoughMemory = numBytesToAcquire <= memoryFree
     if (enoughMemory) {
       _memoryUsed += numBytesToAcquire
@@ -121,18 +124,20 @@ private[memory] class StorageMemoryPool(lock: Object) 
extends MemoryPool(lock) w
    */
   def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {
     // First, shrink the pool by reclaiming free memory:
-    val spaceFreedByReleasingUnusedMemory = Math.min(spaceToFree, memoryFree)
+    val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
     decrementPoolSize(spaceFreedByReleasingUnusedMemory)
-    if (spaceFreedByReleasingUnusedMemory == spaceToFree) {
-      spaceFreedByReleasingUnusedMemory
-    } else {
+    val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
+    if (remainingSpaceToFree > 0) {
       // If reclaiming free memory did not adequately shrink the pool, begin 
evicting blocks:
       val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
-      memoryStore.ensureFreeSpace(spaceToFree - 
spaceFreedByReleasingUnusedMemory, evictedBlocks)
+      memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, 
evictedBlocks)
       val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum
-      _memoryUsed -= spaceFreedByEviction
+      // When a block is released, BlockManager.dropFromMemory() calls 
releaseMemory(), so we do
+      // not need to decrement _memoryUsed here. However, we do need to 
decrement the pool size.
       decrementPoolSize(spaceFreedByEviction)
       spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
+    } else {
+      spaceFreedByReleasingUnusedMemory
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/aec5ea00/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 0f1ea9a..0b9f6a9 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -100,7 +100,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
       case MemoryMode.OFF_HEAP =>
         // For now, we only support on-heap caching of data, so we do not need 
to interact with
         // the storage pool when allocating off-heap memory. This will change 
in the future, though.
-        super.acquireExecutionMemory(numBytes, taskAttemptId, memoryMode)
+        offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
     }
   }
 
@@ -110,6 +110,12 @@ private[spark] class UnifiedMemoryManager private[memory] (
       evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = 
synchronized {
     assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == 
maxMemory)
     assert(numBytes >= 0)
+    if (numBytes > maxStorageMemory) {
+      // Fail fast if the block simply won't fit
+      logInfo(s"Will not store $blockId as the required space ($numBytes 
bytes) exceeds our " +
+        s"memory limit ($maxStorageMemory bytes)")
+      return false
+    }
     if (numBytes > storageMemoryPool.memoryFree) {
       // There is not enough free memory in the storage pool, so try to borrow 
free memory from
       // the execution pool.

http://git-wip-us.apache.org/repos/asf/spark/blob/aec5ea00/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 4dbac38..bdab8c2 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -406,85 +406,41 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, memoryManager: Memo
   }
 
   /**
-   * Try to free up a given amount of space by evicting existing blocks.
-   *
-   * @param space the amount of memory to free, in bytes
-   * @param droppedBlocks a holder for blocks evicted in the process
-   * @return whether the requested free space is freed.
-   */
-  private[spark] def ensureFreeSpace(
-      space: Long,
-      droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
-    ensureFreeSpace(None, space, droppedBlocks)
-  }
-
-  /**
-   * Try to free up a given amount of space to store a block by evicting 
existing ones.
-   *
-   * @param space the amount of memory to free, in bytes
-   * @param droppedBlocks a holder for blocks evicted in the process
-   * @return whether the requested free space is freed.
-   */
-  private[spark] def ensureFreeSpace(
-      blockId: BlockId,
-      space: Long,
-      droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
-    ensureFreeSpace(Some(blockId), space, droppedBlocks)
-  }
-
-  /**
-   * Try to free up a given amount of space to store a particular block, but 
can fail if
-   * either the block is bigger than our memory or it would require replacing 
another block
-   * from the same RDD (which leads to a wasteful cyclic replacement pattern 
for RDDs that
-   * don't fit into memory that we want to avoid).
-   *
-   * @param blockId the ID of the block we are freeing space for, if any
-   * @param space the size of this block
-   * @param droppedBlocks a holder for blocks evicted in the process
-   * @return whether the requested free space is freed.
-   */
-  private def ensureFreeSpace(
+    * Try to evict blocks to free up a given amount of space to store a 
particular block.
+    * Can fail if either the block is bigger than our memory or it would 
require replacing
+    * another block from the same RDD (which leads to a wasteful cyclic 
replacement pattern for
+    * RDDs that don't fit into memory that we want to avoid).
+    *
+    * @param blockId the ID of the block we are freeing space for, if any
+    * @param space the size of this block
+    * @param droppedBlocks a holder for blocks evicted in the process
+    * @return whether the requested free space is freed.
+    */
+  private[spark] def evictBlocksToFreeSpace(
       blockId: Option[BlockId],
       space: Long,
       droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
+    assert(space > 0)
     memoryManager.synchronized {
-      val freeMemory = maxMemory - memoryUsed
+      var freedMemory = 0L
       val rddToAdd = blockId.flatMap(getRddId)
       val selectedBlocks = new ArrayBuffer[BlockId]
-      var selectedMemory = 0L
-
-      logInfo(s"Ensuring $space bytes of free space " +
-        blockId.map { id => s"for block $id" }.getOrElse("") +
-        s"(free: $freeMemory, max: $maxMemory)")
-
-      // Fail fast if the block simply won't fit
-      if (space > maxMemory) {
-        logInfo("Will not " + blockId.map { id => s"store $id" 
}.getOrElse("free memory") +
-          s" as the required space ($space bytes) exceeds our memory limit 
($maxMemory bytes)")
-        return false
-      }
-
-      // No need to evict anything if there is already enough free space
-      if (freeMemory >= space) {
-        return true
-      }
-
       // This is synchronized to ensure that the set of entries is not changed
       // (because of getValue or getBytes) while traversing the iterator, as 
that
       // can lead to exceptions.
       entries.synchronized {
         val iterator = entries.entrySet().iterator()
-        while (freeMemory + selectedMemory < space && iterator.hasNext) {
+        while (freedMemory < space && iterator.hasNext) {
           val pair = iterator.next()
           val blockId = pair.getKey
           if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
             selectedBlocks += blockId
-            selectedMemory += pair.getValue.size
+            freedMemory += pair.getValue.size
           }
         }
       }
 
-      if (freeMemory + selectedMemory >= space) {
+      if (freedMemory >= space) {
         logInfo(s"${selectedBlocks.size} blocks selected for dropping")
         for (blockId <- selectedBlocks) {
           val entry = entries.synchronized { entries.get(blockId) }

http://git-wip-us.apache.org/repos/asf/spark/blob/aec5ea00/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index f55d435..555b640 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -24,9 +24,10 @@ import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, ExecutionContext, Future}
 
 import org.mockito.Matchers.{any, anyLong}
-import org.mockito.Mockito.{mock, when}
+import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
+import org.scalatest.BeforeAndAfterEach
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkFunSuite
@@ -36,105 +37,105 @@ import org.apache.spark.storage.{BlockId, BlockStatus, 
MemoryStore, StorageLevel
 /**
  * Helper trait for sharing code among [[MemoryManager]] tests.
  */
-private[memory] trait MemoryManagerSuite extends SparkFunSuite {
+private[memory] trait MemoryManagerSuite extends SparkFunSuite with 
BeforeAndAfterEach {
 
-  import MemoryManagerSuite.DEFAULT_ENSURE_FREE_SPACE_CALLED
+  protected val evictedBlocks = new mutable.ArrayBuffer[(BlockId, BlockStatus)]
+
+  import MemoryManagerSuite.DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED
 
   // Note: Mockito's verify mechanism does not provide a way to reset method 
call counts
   // without also resetting stubbed methods. Since our test code relies on the 
latter,
-  // we need to use our own variable to track invocations of `ensureFreeSpace`.
+  // we need to use our own variable to track invocations of 
`evictBlocksToFreeSpace`.
 
   /**
-   * The amount of free space requested in the last call to 
[[MemoryStore.ensureFreeSpace]]
+   * The amount of space requested in the last call to 
[[MemoryStore.evictBlocksToFreeSpace]].
    *
-   * This set whenever [[MemoryStore.ensureFreeSpace]] is called, and cleared 
when the test
-   * code makes explicit assertions on this variable through 
[[assertEnsureFreeSpaceCalled]].
+   * This set whenever [[MemoryStore.evictBlocksToFreeSpace]] is called, and 
cleared when the test
+   * code makes explicit assertions on this variable through
+   * [[assertEvictBlocksToFreeSpaceCalled]].
    */
-  private val ensureFreeSpaceCalled = new 
AtomicLong(DEFAULT_ENSURE_FREE_SPACE_CALLED)
+  private val evictBlocksToFreeSpaceCalled = new AtomicLong(0)
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    evictedBlocks.clear()
+    evictBlocksToFreeSpaceCalled.set(DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED)
+  }
 
   /**
-   * Make a mocked [[MemoryStore]] whose [[MemoryStore.ensureFreeSpace]] 
method is stubbed.
+   * Make a mocked [[MemoryStore]] whose 
[[MemoryStore.evictBlocksToFreeSpace]] method is stubbed.
    *
-   * This allows our test code to release storage memory when 
[[MemoryStore.ensureFreeSpace]]
-   * is called without relying on [[org.apache.spark.storage.BlockManager]] 
and all of its
-   * dependencies.
+   * This allows our test code to release storage memory when these methods 
are called
+   * without relying on [[org.apache.spark.storage.BlockManager]] and all of 
its dependencies.
    */
   protected def makeMemoryStore(mm: MemoryManager): MemoryStore = {
-    val ms = mock(classOf[MemoryStore])
-    when(ms.ensureFreeSpace(anyLong(), 
any())).thenAnswer(ensureFreeSpaceAnswer(mm, 0))
-    when(ms.ensureFreeSpace(any(), anyLong(), 
any())).thenAnswer(ensureFreeSpaceAnswer(mm, 1))
+    val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS)
+    when(ms.evictBlocksToFreeSpace(any(), anyLong(), any()))
+      .thenAnswer(evictBlocksToFreeSpaceAnswer(mm))
     mm.setMemoryStore(ms)
     ms
   }
 
   /**
-   * Make an [[Answer]] that stubs [[MemoryStore.ensureFreeSpace]] with the 
right arguments.
-   */
-  private def ensureFreeSpaceAnswer(mm: MemoryManager, numBytesPos: Int): 
Answer[Boolean] = {
+    * Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that 
releases storage memory.
+    *
+    * This is a significant simplification of the real method, which actually 
drops existing
+    * blocks based on the size of each block. Instead, here we simply release 
as many bytes
+    * as needed to ensure the requested amount of free space. This allows us 
to set up the
+    * test without relying on the [[org.apache.spark.storage.BlockManager]], 
which brings in
+    * many other dependencies.
+    *
+    * Every call to this method will set a global variable, 
[[evictBlocksToFreeSpaceCalled]], that
+    * records the number of bytes this is called with. This variable is 
expected to be cleared
+    * by the test code later through [[assertEvictBlocksToFreeSpaceCalled]].
+    */
+  private def evictBlocksToFreeSpaceAnswer(mm: MemoryManager): Answer[Boolean] 
= {
     new Answer[Boolean] {
       override def answer(invocation: InvocationOnMock): Boolean = {
         val args = invocation.getArguments
-        require(args.size > numBytesPos, s"bad test: expected >$numBytesPos 
arguments " +
-          s"in ensureFreeSpace, found ${args.size}")
-        require(args(numBytesPos).isInstanceOf[Long], s"bad test: expected 
ensureFreeSpace " +
-          s"argument at index $numBytesPos to be a Long: ${args.mkString(", 
")}")
-        val numBytes = args(numBytesPos).asInstanceOf[Long]
-        val success = mockEnsureFreeSpace(mm, numBytes)
-        if (success) {
+        val numBytesToFree = args(1).asInstanceOf[Long]
+        assert(numBytesToFree > 0)
+        require(evictBlocksToFreeSpaceCalled.get() === 
DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED,
+          "bad test: evictBlocksToFreeSpace() variable was not reset")
+        evictBlocksToFreeSpaceCalled.set(numBytesToFree)
+        if (numBytesToFree <= mm.storageMemoryUsed) {
+          // We can evict enough blocks to fulfill the request for space
+          mm.releaseStorageMemory(numBytesToFree)
           args.last.asInstanceOf[mutable.Buffer[(BlockId, 
BlockStatus)]].append(
-            (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytes, 0L, 0L)))
+            (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L, 
0L)))
+          // We need to add this call so that that the suite-level 
`evictedBlocks` is updated when
+          // execution evicts storage; in that case, args.last will not be 
equal to evictedBlocks
+          // because it will be a temporary buffer created inside of the 
MemoryManager rather than
+          // being passed in by the test code.
+          if (!(evictedBlocks eq args.last)) {
+            evictedBlocks.append(
+              (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L, 
0L)))
+          }
+          true
+        } else {
+          // No blocks were evicted because eviction would not free enough 
space.
+          false
         }
-        success
-      }
-    }
-  }
-
-  /**
-   * Simulate the part of [[MemoryStore.ensureFreeSpace]] that releases 
storage memory.
-   *
-   * This is a significant simplification of the real method, which actually 
drops existing
-   * blocks based on the size of each block. Instead, here we simply release 
as many bytes
-   * as needed to ensure the requested amount of free space. This allows us to 
set up the
-   * test without relying on the [[org.apache.spark.storage.BlockManager]], 
which brings in
-   * many other dependencies.
-   *
-   * Every call to this method will set a global variable, 
[[ensureFreeSpaceCalled]], that
-   * records the number of bytes this is called with. This variable is 
expected to be cleared
-   * by the test code later through [[assertEnsureFreeSpaceCalled]].
-   */
-  private def mockEnsureFreeSpace(mm: MemoryManager, numBytes: Long): Boolean 
= mm.synchronized {
-    require(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED,
-      "bad test: ensure free space variable was not reset")
-    // Record the number of bytes we freed this call
-    ensureFreeSpaceCalled.set(numBytes)
-    if (numBytes <= mm.maxStorageMemory) {
-      def freeMemory = mm.maxStorageMemory - mm.storageMemoryUsed
-      val spaceToRelease = numBytes - freeMemory
-      if (spaceToRelease > 0) {
-        mm.releaseStorageMemory(spaceToRelease)
       }
-      freeMemory >= numBytes
-    } else {
-      // We attempted to free more bytes than our max allowable memory
-      false
     }
   }
 
   /**
-   * Assert that [[MemoryStore.ensureFreeSpace]] is called with the given 
parameters.
+   * Assert that [[MemoryStore.evictBlocksToFreeSpace]] is called with the 
given parameters.
    */
-  protected def assertEnsureFreeSpaceCalled(ms: MemoryStore, numBytes: Long): 
Unit = {
-    assert(ensureFreeSpaceCalled.get() === numBytes,
-      s"expected ensure free space to be called with $numBytes")
-    ensureFreeSpaceCalled.set(DEFAULT_ENSURE_FREE_SPACE_CALLED)
+  protected def assertEvictBlocksToFreeSpaceCalled(ms: MemoryStore, numBytes: 
Long): Unit = {
+    assert(evictBlocksToFreeSpaceCalled.get() === numBytes,
+      s"expected evictBlocksToFreeSpace() to be called with $numBytes")
+    evictBlocksToFreeSpaceCalled.set(DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED)
   }
 
   /**
-   * Assert that [[MemoryStore.ensureFreeSpace]] is NOT called.
+   * Assert that [[MemoryStore.evictBlocksToFreeSpace]] is NOT called.
    */
-  protected def assertEnsureFreeSpaceNotCalled[T](ms: MemoryStore): Unit = {
-    assert(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED,
-      "ensure free space should not have been called!")
+  protected def assertEvictBlocksToFreeSpaceNotCalled[T](ms: MemoryStore): 
Unit = {
+    assert(evictBlocksToFreeSpaceCalled.get() === 
DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED,
+      "evictBlocksToFreeSpace() should not have been called!")
+    assert(evictedBlocks.isEmpty)
   }
 
   /**
@@ -291,5 +292,5 @@ private[memory] trait MemoryManagerSuite extends 
SparkFunSuite {
 }
 
 private object MemoryManagerSuite {
-  private val DEFAULT_ENSURE_FREE_SPACE_CALLED = -1L
+  private val DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED = -1L
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/aec5ea00/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index 54cb28c..6700b94 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -17,16 +17,13 @@
 
 package org.apache.spark.memory
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.mockito.Mockito.when
 
 import org.apache.spark.SparkConf
-import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, 
TestBlockId}
+import org.apache.spark.storage.{MemoryStore, TestBlockId}
 
 class StaticMemoryManagerSuite extends MemoryManagerSuite {
   private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4")
-  private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
 
   /**
    * Make a [[StaticMemoryManager]] and a [[MemoryStore]] with limited class 
dependencies.
@@ -85,33 +82,38 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
     val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
     assert(mm.storageMemoryUsed === 0L)
     assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
-    // `ensureFreeSpace` should be called with the number of bytes requested
-    assertEnsureFreeSpaceCalled(ms, 10L)
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 10L)
+
     assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
-    assertEnsureFreeSpaceCalled(ms, 100L)
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 110L)
     // Acquire more than the max, not granted
     assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L, 
evictedBlocks))
-    assertEnsureFreeSpaceCalled(ms, maxStorageMem + 1L)
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 110L)
     // Acquire up to the max, requests after this are still granted due to LRU 
eviction
     assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem, evictedBlocks))
-    assertEnsureFreeSpaceCalled(ms, 1000L)
+    assertEvictBlocksToFreeSpaceCalled(ms, 110L)
     assert(mm.storageMemoryUsed === 1000L)
     assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
-    assertEnsureFreeSpaceCalled(ms, 1L)
+    assertEvictBlocksToFreeSpaceCalled(ms, 1L)
+    assert(evictedBlocks.nonEmpty)
+    evictedBlocks.clear()
+    // Note: We evicted 1 byte to put another 1-byte block in, so the storage 
memory used remains at
+    // 1000 bytes. This is different from real behavior, where the 1-byte 
block would have evicted
+    // the 1000-byte block entirely. This is set up differently so we can 
write finer-grained tests.
     assert(mm.storageMemoryUsed === 1000L)
     mm.releaseStorageMemory(800L)
     assert(mm.storageMemoryUsed === 200L)
     // Acquire after release
     assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
-    assertEnsureFreeSpaceCalled(ms, 1L)
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 201L)
     mm.releaseAllStorageMemory()
     assert(mm.storageMemoryUsed === 0L)
     assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
-    assertEnsureFreeSpaceCalled(ms, 1L)
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 1L)
     // Release beyond what was acquired
     mm.releaseStorageMemory(100L)
@@ -133,7 +135,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
     assert(mm.executionMemoryUsed === 200L)
     // Only storage memory should increase
     assert(mm.acquireStorageMemory(dummyBlock, 50L, evictedBlocks))
-    assertEnsureFreeSpaceCalled(ms, 50L)
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 50L)
     assert(mm.executionMemoryUsed === 200L)
     // Only execution memory should be released
@@ -151,21 +153,25 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite 
{
     val dummyBlock = TestBlockId("lonely water")
     val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
     assert(mm.acquireUnrollMemory(dummyBlock, 100L, evictedBlocks))
-    assertEnsureFreeSpaceCalled(ms, 100L)
+    when(ms.currentUnrollMemory).thenReturn(100L)
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 100L)
     mm.releaseUnrollMemory(40L)
     assert(mm.storageMemoryUsed === 60L)
     when(ms.currentUnrollMemory).thenReturn(60L)
-    assert(mm.acquireUnrollMemory(dummyBlock, 500L, evictedBlocks))
+    assert(mm.acquireStorageMemory(dummyBlock, 800L, evictedBlocks))
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
+    assert(mm.storageMemoryUsed === 860L)
     // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 
bytes.
-    // Since we already occupy 60 bytes, we will try to ensure only 400 - 60 = 
340 bytes.
-    assertEnsureFreeSpaceCalled(ms, 340L)
-    assert(mm.storageMemoryUsed === 560L)
-    when(ms.currentUnrollMemory).thenReturn(560L)
-    assert(!mm.acquireUnrollMemory(dummyBlock, 800L, evictedBlocks))
-    assert(mm.storageMemoryUsed === 560L)
-    // We already have 560 bytes > the max unroll space of 400 bytes, so no 
bytes are freed
-    assertEnsureFreeSpaceCalled(ms, 0L)
+    // Since we already occupy 60 bytes, we will try to evict only 400 - 60 = 
340 bytes.
+    assert(mm.acquireUnrollMemory(dummyBlock, 240L, evictedBlocks))
+    assertEvictBlocksToFreeSpaceCalled(ms, 100L)
+    when(ms.currentUnrollMemory).thenReturn(300L) // 60 + 240
+    assert(mm.storageMemoryUsed === 1000L)
+    evictedBlocks.clear()
+    assert(!mm.acquireUnrollMemory(dummyBlock, 150L, evictedBlocks))
+    assertEvictBlocksToFreeSpaceCalled(ms, 100L) // 400 - 300
+    assert(mm.storageMemoryUsed === 900L) // 100 bytes were evicted
     // Release beyond what was acquired
     mm.releaseUnrollMemory(maxStorageMem)
     assert(mm.storageMemoryUsed === 0L)

http://git-wip-us.apache.org/repos/asf/spark/blob/aec5ea00/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index e97c898..71221de 100644
--- 
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -17,16 +17,13 @@
 
 package org.apache.spark.memory
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.scalatest.PrivateMethodTester
 
 import org.apache.spark.SparkConf
-import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, 
TestBlockId}
+import org.apache.spark.storage.{MemoryStore, TestBlockId}
 
 class UnifiedMemoryManagerSuite extends MemoryManagerSuite with 
PrivateMethodTester {
   private val dummyBlock = TestBlockId("--")
-  private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
 
   private val storageFraction: Double = 0.5
 
@@ -78,33 +75,40 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite 
with PrivateMethodTes
     val (mm, ms) = makeThings(maxMemory)
     assert(mm.storageMemoryUsed === 0L)
     assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
-    // `ensureFreeSpace` should be called with the number of bytes requested
-    assertEnsureFreeSpaceCalled(ms, 10L)
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 10L)
+
     assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
-    assertEnsureFreeSpaceCalled(ms, 100L)
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 110L)
     // Acquire more than the max, not granted
     assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L, evictedBlocks))
-    assertEnsureFreeSpaceCalled(ms, maxMemory + 1L)
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 110L)
     // Acquire up to the max, requests after this are still granted due to LRU 
eviction
     assert(mm.acquireStorageMemory(dummyBlock, maxMemory, evictedBlocks))
-    assertEnsureFreeSpaceCalled(ms, 1000L)
+    assertEvictBlocksToFreeSpaceCalled(ms, 110L)
     assert(mm.storageMemoryUsed === 1000L)
+    assert(evictedBlocks.nonEmpty)
+    evictedBlocks.clear()
     assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
-    assertEnsureFreeSpaceCalled(ms, 1L)
+    assertEvictBlocksToFreeSpaceCalled(ms, 1L)
+    assert(evictedBlocks.nonEmpty)
+    evictedBlocks.clear()
+    // Note: We evicted 1 byte to put another 1-byte block in, so the storage 
memory used remains at
+    // 1000 bytes. This is different from real behavior, where the 1-byte 
block would have evicted
+    // the 1000-byte block entirely. This is set up differently so we can 
write finer-grained tests.
     assert(mm.storageMemoryUsed === 1000L)
     mm.releaseStorageMemory(800L)
     assert(mm.storageMemoryUsed === 200L)
     // Acquire after release
     assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
-    assertEnsureFreeSpaceCalled(ms, 1L)
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 201L)
     mm.releaseAllStorageMemory()
     assert(mm.storageMemoryUsed === 0L)
     assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
-    assertEnsureFreeSpaceCalled(ms, 1L)
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 1L)
     // Release beyond what was acquired
     mm.releaseStorageMemory(100L)
@@ -117,25 +121,27 @@ class UnifiedMemoryManagerSuite extends 
MemoryManagerSuite with PrivateMethodTes
     val (mm, ms) = makeThings(maxMemory)
     // Acquire enough storage memory to exceed the storage region
     assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
-    assertEnsureFreeSpaceCalled(ms, 750L)
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.executionMemoryUsed === 0L)
     assert(mm.storageMemoryUsed === 750L)
     // Execution needs to request 250 bytes to evict storage memory
     assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 100L)
     assert(mm.executionMemoryUsed === 100L)
     assert(mm.storageMemoryUsed === 750L)
-    assertEnsureFreeSpaceNotCalled(ms)
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
     // Execution wants 200 bytes but only 150 are free, so storage is evicted
     assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 200L)
     assert(mm.executionMemoryUsed === 300L)
-    assertEnsureFreeSpaceCalled(ms, 50L)
-    assert(mm.executionMemoryUsed === 300L)
+    assert(mm.storageMemoryUsed === 700L)
+    assertEvictBlocksToFreeSpaceCalled(ms, 50L)
+    assert(evictedBlocks.nonEmpty)
+    evictedBlocks.clear()
     mm.releaseAllStorageMemory()
     require(mm.executionMemoryUsed === 300L)
     require(mm.storageMemoryUsed === 0, "bad test: all storage memory should 
have been released")
     // Acquire some storage memory again, but this time keep it within the 
storage region
     assert(mm.acquireStorageMemory(dummyBlock, 400L, evictedBlocks))
-    assertEnsureFreeSpaceCalled(ms, 400L)
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(mm.storageMemoryUsed === 400L)
     assert(mm.executionMemoryUsed === 300L)
     // Execution cannot evict storage because the latter is within the storage 
fraction,
@@ -143,7 +149,27 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite 
with PrivateMethodTes
     assert(mm.acquireExecutionMemory(400L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 300L)
     assert(mm.executionMemoryUsed === 600L)
     assert(mm.storageMemoryUsed === 400L)
-    assertEnsureFreeSpaceNotCalled(ms)
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
+  }
+
+  test("execution memory requests smaller than free memory should evict 
storage (SPARK-12165)") {
+    val maxMemory = 1000L
+    val taskAttemptId = 0L
+    val (mm, ms) = makeThings(maxMemory)
+    // Acquire enough storage memory to exceed the storage region size
+    assert(mm.acquireStorageMemory(dummyBlock, 700L, evictedBlocks))
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
+    assert(mm.executionMemoryUsed === 0L)
+    assert(mm.storageMemoryUsed === 700L)
+    // SPARK-12165: previously, MemoryStore would not evict anything because 
it would
+    // mistakenly think that the 300 bytes of free space was still available 
even after
+    // using it to expand the execution pool. Consequently, no storage memory 
was released
+    // and the following call granted only 300 bytes to execution.
+    assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 500L)
+    assertEvictBlocksToFreeSpaceCalled(ms, 200L)
+    assert(mm.storageMemoryUsed === 500L)
+    assert(mm.executionMemoryUsed === 500L)
+    assert(evictedBlocks.nonEmpty)
   }
 
   test("storage does not evict execution") {
@@ -154,32 +180,34 @@ class UnifiedMemoryManagerSuite extends 
MemoryManagerSuite with PrivateMethodTes
     assert(mm.acquireExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 800L)
     assert(mm.executionMemoryUsed === 800L)
     assert(mm.storageMemoryUsed === 0L)
-    assertEnsureFreeSpaceNotCalled(ms)
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
     // Storage should not be able to evict execution
     assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
     assert(mm.executionMemoryUsed === 800L)
     assert(mm.storageMemoryUsed === 100L)
-    assertEnsureFreeSpaceCalled(ms, 100L)
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
     assert(!mm.acquireStorageMemory(dummyBlock, 250L, evictedBlocks))
     assert(mm.executionMemoryUsed === 800L)
     assert(mm.storageMemoryUsed === 100L)
-    assertEnsureFreeSpaceCalled(ms, 250L)
+    // Do not attempt to evict blocks, since evicting will not free enough 
memory:
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
     mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP)
     mm.releaseStorageMemory(maxMemory)
     // Acquire some execution memory again, but this time keep it within the 
execution region
     assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 200L)
     assert(mm.executionMemoryUsed === 200L)
     assert(mm.storageMemoryUsed === 0L)
-    assertEnsureFreeSpaceNotCalled(ms)
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
     // Storage should still not be able to evict execution
     assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
     assert(mm.executionMemoryUsed === 200L)
     assert(mm.storageMemoryUsed === 750L)
-    assertEnsureFreeSpaceCalled(ms, 750L)
+    assertEvictBlocksToFreeSpaceNotCalled(ms) // since there were 800 bytes 
free
     assert(!mm.acquireStorageMemory(dummyBlock, 850L, evictedBlocks))
     assert(mm.executionMemoryUsed === 200L)
     assert(mm.storageMemoryUsed === 750L)
-    assertEnsureFreeSpaceCalled(ms, 850L)
+    // Do not attempt to evict blocks, since evicting will not free enough 
memory:
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
   }
 
   test("small heap") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to