Repository: spark
Updated Branches:
refs/heads/branch-1.6 acd462420 -> 05e441e12
[SPARK-12165][SPARK-12189] Fix bugs in eviction of storage memory by execution
This patch fixes a bug in the eviction of storage memory by execution.
## The bug:
In general, execution should be able to evict storage memory when the total
storage memory usage is greater than `maxMemory *
spark.memory.storageFraction`. Due to a bug, however, Spark might wind up
evicting no storage memory in certain cases where the storage memory usage was
between `maxMemory * spark.memory.storageFraction` and `maxMemory`. For
example, here is a regression test which illustrates the bug:
```scala
val maxMemory = 1000L
val taskAttemptId = 0L
val (mm, ms) = makeThings(maxMemory)
// Since we used the default storage fraction (0.5), we should be able to
allocate 500 bytes
// of storage memory which are immune to eviction by execution memory
pressure.
// Acquire enough storage memory to exceed the storage region size
assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.executionMemoryUsed === 0L)
assert(mm.storageMemoryUsed === 750L)
// At this point, storage is using 250 more bytes of memory than it is
guaranteed, so execution
// should be able to reclaim up to 250 bytes of storage memory.
// Therefore, execution should now be able to require up to 500 bytes of
memory:
assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP)
=== 500L) // <--- fails by only returning 250L
assert(mm.storageMemoryUsed === 500L)
assert(mm.executionMemoryUsed === 500L)
assertEvictBlocksToFreeSpaceCalled(ms, 250L)
```
The problem relates to the control flow / interaction between
`StorageMemoryPool.shrinkPoolToReclaimSpace()` and
`MemoryStore.ensureFreeSpace()`. While trying to allocate the 500 bytes of
execution memory, the `UnifiedMemoryManager` discovers that it will need to
reclaim 250 bytes of memory from storage, so it calls
`StorageMemoryPool.shrinkPoolToReclaimSpace(250L)`. This method, in turn, calls
`MemoryStore.ensureFreeSpace(250L)`. However, `ensureFreeSpace()` first checks
whether the requested space is less than `maxStorageMemory -
storageMemoryUsed`, which will be true if there is any free execution memory
because it turns out that `MemoryStore.maxStorageMemory = (maxMemory -
onHeapExecutionMemoryPool.memoryUsed)` when the `UnifiedMemoryManager` is used.
The control flow here is somewhat confusing (it grew to be messy / confusing
over time / as a result of the merging / refactoring of several components). In
the pre-Spark 1.6 code, `ensureFreeSpace` was called directly by the
`MemoryStore` itself, whereas in 1.6 it's involved in a confusing control flow
where `MemoryStore` calls `MemoryManager.acquireStorageMemory`, which then
calls back into `MemoryStore.ensureFreeSpace`, which, in turn, calls
`MemoryManager.freeStorageMemory`.
## The solution:
The solution implemented in this patch is to remove the confusing circular
control flow between `MemoryManager` and `MemoryStore`, making the storage
memory acquisition process much more linear / straightforward. The key changes:
- Remove a layer of inheritance which made the memory manager code harder to
understand (53841174760a24a0df3eb1562af1f33dbe340eb9).
- Move some bounds checks earlier in the call chain
(13ba7ada77f87ef1ec362aec35c89a924e6987cb).
- Refactor `ensureFreeSpace()` so that the part which evicts blocks can be
called independently from the part which checks whether there is enough free
space to avoid eviction (7c68ca09cb1b12f157400866983f753ac863380e).
- Realize that this lets us remove a layer of overloads from `ensureFreeSpace`
(eec4f6c87423d5e482b710e098486b3bbc4daf06).
- Realize that `ensureFreeSpace()` can simply be replaced with an
`evictBlocksToFreeSpace()` method which is called [after we've already figured
out](https://github.com/apache/spark/blob/2dc842aea82c8895125d46a00aa43dfb0d121de9/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala#L88)
how much memory needs to be reclaimed via eviction;
(2dc842aea82c8895125d46a00aa43dfb0d121de9).
Along the way, I fixed some problems with the mocks in `MemoryManagerSuite`:
the old mocks would
[unconditionally](https://github.com/apache/spark/blob/80a824d36eec9d9a9f092ee1741453851218ec73/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala#L84)
report that a block had been evicted even if there was enough space in the
storage pool such that eviction would be avoided.
I also fixed a problem where `StorageMemoryPool._memoryUsed` might become
negative due to freed memory being double-counted when excution evicts storage.
The problem was that `StorageMemoryPoolshrinkPoolToFreeSpace` would [decrement
`_memoryUsed`](https://github.com/apache/spark/commit/7c68ca09cb1b12f157400866983f753ac863380e#diff-935c68a9803be144ed7bafdd2f756a0fL133)
even though `StorageMemoryPool.freeMemory` had already decremented it as each
evicted block was freed. See SPARK-12189 for details.
Author: Josh Rosen <[email protected]>
Author: Andrew Or <[email protected]>
Closes #10170 from JoshRosen/SPARK-12165.
(cherry picked from commit aec5ea000ebb8921f42f006b694ef26f5df67d83)
Signed-off-by: Andrew Or <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/05e441e1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/05e441e1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/05e441e1
Branch: refs/heads/branch-1.6
Commit: 05e441e121a86e0c105ad25010e4678f2f9e73e3
Parents: acd4624
Author: Josh Rosen <[email protected]>
Authored: Wed Dec 9 11:39:59 2015 -0800
Committer: Andrew Or <[email protected]>
Committed: Wed Dec 9 11:40:09 2015 -0800
----------------------------------------------------------------------
.../org/apache/spark/memory/MemoryManager.scala | 11 +-
.../spark/memory/StaticMemoryManager.scala | 37 ++++-
.../apache/spark/memory/StorageMemoryPool.scala | 37 ++---
.../spark/memory/UnifiedMemoryManager.scala | 8 +-
.../org/apache/spark/storage/MemoryStore.scala | 76 +++-------
.../spark/memory/MemoryManagerSuite.scala | 137 ++++++++++---------
.../spark/memory/StaticMemoryManagerSuite.scala | 52 +++----
.../memory/UnifiedMemoryManagerSuite.scala | 76 ++++++----
8 files changed, 230 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/05e441e1/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index ceb8ea4..ae9e1ac 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -77,9 +77,7 @@ private[spark] abstract class MemoryManager(
def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean =
synchronized {
- storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
- }
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
/**
* Acquire N bytes of memory to unroll the given block, evicting existing
ones if necessary.
@@ -109,12 +107,7 @@ private[spark] abstract class MemoryManager(
def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
- memoryMode: MemoryMode): Long = synchronized {
- memoryMode match {
- case MemoryMode.ON_HEAP =>
onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
- case MemoryMode.OFF_HEAP =>
offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
- }
- }
+ memoryMode: MemoryMode): Long
/**
* Release numBytes of execution memory belonging to the given task.
http://git-wip-us.apache.org/repos/asf/spark/blob/05e441e1/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
----------------------------------------------------------------------
diff --git
a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index 12a0943..3554b55 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -49,19 +49,50 @@ private[spark] class StaticMemoryManager(
}
// Max number of bytes worth of blocks to evict when unrolling
- private val maxMemoryToEvictForUnroll: Long = {
+ private val maxUnrollMemory: Long = {
(maxStorageMemory * conf.getDouble("spark.storage.unrollFraction",
0.2)).toLong
}
+ override def acquireStorageMemory(
+ blockId: BlockId,
+ numBytes: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean =
synchronized {
+ if (numBytes > maxStorageMemory) {
+ // Fail fast if the block simply won't fit
+ logInfo(s"Will not store $blockId as the required space ($numBytes
bytes) exceeds our " +
+ s"memory limit ($maxStorageMemory bytes)")
+ false
+ } else {
+ storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
+ }
+ }
+
override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean =
synchronized {
val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory
- val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll -
currentUnrollMemory)
- val numBytesToFree = math.min(numBytes, maxNumBytesToFree)
+ val freeMemory = storageMemoryPool.memoryFree
+ // When unrolling, we will use all of the existing free memory, and, if
necessary,
+ // some extra space freed from evicting cached blocks. We must place a cap
on the
+ // amount of memory to be evicted by unrolling, however, otherwise
unrolling one
+ // big block can blow away the entire cache.
+ val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory
- freeMemory)
+ // Keep it within the range 0 <= X <= maxNumBytesToFree
+ val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes -
freeMemory))
storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree,
evictedBlocks)
}
+
+ private[memory]
+ override def acquireExecutionMemory(
+ numBytes: Long,
+ taskAttemptId: Long,
+ memoryMode: MemoryMode): Long = synchronized {
+ memoryMode match {
+ case MemoryMode.ON_HEAP =>
onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
+ case MemoryMode.OFF_HEAP =>
offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/05e441e1/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
----------------------------------------------------------------------
diff --git
a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
index fc4f035..70af83b 100644
--- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
@@ -65,7 +65,8 @@ private[memory] class StorageMemoryPool(lock: Object) extends
MemoryPool(lock) w
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean =
lock.synchronized {
- acquireMemory(blockId, numBytes, numBytes, evictedBlocks)
+ val numBytesToFree = math.max(0, numBytes - memoryFree)
+ acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
}
/**
@@ -73,7 +74,7 @@ private[memory] class StorageMemoryPool(lock: Object) extends
MemoryPool(lock) w
*
* @param blockId the ID of the block we are acquiring storage memory for
* @param numBytesToAcquire the size of this block
- * @param numBytesToFree the size of space to be freed through evicting
blocks
+ * @param numBytesToFree the amount of space to be freed through evicting
blocks
* @return whether all N bytes were successfully granted.
*/
def acquireMemory(
@@ -84,16 +85,18 @@ private[memory] class StorageMemoryPool(lock: Object)
extends MemoryPool(lock) w
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
assert(memoryUsed <= poolSize)
- memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
- // Register evicted blocks, if any, with the active task metrics
- Option(TaskContext.get()).foreach { tc =>
- val metrics = tc.taskMetrics()
- val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId,
BlockStatus)]())
- metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
+ if (numBytesToFree > 0) {
+ memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree,
evictedBlocks)
+ // Register evicted blocks, if any, with the active task metrics
+ Option(TaskContext.get()).foreach { tc =>
+ val metrics = tc.taskMetrics()
+ val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId,
BlockStatus)]())
+ metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
+ }
}
// NOTE: If the memory store evicts blocks, then those evictions will
synchronously call
- // back into this StorageMemoryPool in order to free. Therefore, these
variables should have
- // been updated.
+ // back into this StorageMemoryPool in order to free memory. Therefore,
these variables
+ // should have been updated.
val enoughMemory = numBytesToAcquire <= memoryFree
if (enoughMemory) {
_memoryUsed += numBytesToAcquire
@@ -121,18 +124,20 @@ private[memory] class StorageMemoryPool(lock: Object)
extends MemoryPool(lock) w
*/
def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {
// First, shrink the pool by reclaiming free memory:
- val spaceFreedByReleasingUnusedMemory = Math.min(spaceToFree, memoryFree)
+ val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
decrementPoolSize(spaceFreedByReleasingUnusedMemory)
- if (spaceFreedByReleasingUnusedMemory == spaceToFree) {
- spaceFreedByReleasingUnusedMemory
- } else {
+ val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
+ if (remainingSpaceToFree > 0) {
// If reclaiming free memory did not adequately shrink the pool, begin
evicting blocks:
val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
- memoryStore.ensureFreeSpace(spaceToFree -
spaceFreedByReleasingUnusedMemory, evictedBlocks)
+ memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree,
evictedBlocks)
val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum
- _memoryUsed -= spaceFreedByEviction
+ // When a block is released, BlockManager.dropFromMemory() calls
releaseMemory(), so we do
+ // not need to decrement _memoryUsed here. However, we do need to
decrement the pool size.
decrementPoolSize(spaceFreedByEviction)
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
+ } else {
+ spaceFreedByReleasingUnusedMemory
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/05e441e1/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
----------------------------------------------------------------------
diff --git
a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 0f1ea9a..0b9f6a9 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -100,7 +100,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
case MemoryMode.OFF_HEAP =>
// For now, we only support on-heap caching of data, so we do not need
to interact with
// the storage pool when allocating off-heap memory. This will change
in the future, though.
- super.acquireExecutionMemory(numBytes, taskAttemptId, memoryMode)
+ offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
}
}
@@ -110,6 +110,12 @@ private[spark] class UnifiedMemoryManager private[memory] (
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean =
synchronized {
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize ==
maxMemory)
assert(numBytes >= 0)
+ if (numBytes > maxStorageMemory) {
+ // Fail fast if the block simply won't fit
+ logInfo(s"Will not store $blockId as the required space ($numBytes
bytes) exceeds our " +
+ s"memory limit ($maxStorageMemory bytes)")
+ return false
+ }
if (numBytes > storageMemoryPool.memoryFree) {
// There is not enough free memory in the storage pool, so try to borrow
free memory from
// the execution pool.
http://git-wip-us.apache.org/repos/asf/spark/blob/05e441e1/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 4dbac38..bdab8c2 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -406,85 +406,41 @@ private[spark] class MemoryStore(blockManager:
BlockManager, memoryManager: Memo
}
/**
- * Try to free up a given amount of space by evicting existing blocks.
- *
- * @param space the amount of memory to free, in bytes
- * @param droppedBlocks a holder for blocks evicted in the process
- * @return whether the requested free space is freed.
- */
- private[spark] def ensureFreeSpace(
- space: Long,
- droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
- ensureFreeSpace(None, space, droppedBlocks)
- }
-
- /**
- * Try to free up a given amount of space to store a block by evicting
existing ones.
- *
- * @param space the amount of memory to free, in bytes
- * @param droppedBlocks a holder for blocks evicted in the process
- * @return whether the requested free space is freed.
- */
- private[spark] def ensureFreeSpace(
- blockId: BlockId,
- space: Long,
- droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
- ensureFreeSpace(Some(blockId), space, droppedBlocks)
- }
-
- /**
- * Try to free up a given amount of space to store a particular block, but
can fail if
- * either the block is bigger than our memory or it would require replacing
another block
- * from the same RDD (which leads to a wasteful cyclic replacement pattern
for RDDs that
- * don't fit into memory that we want to avoid).
- *
- * @param blockId the ID of the block we are freeing space for, if any
- * @param space the size of this block
- * @param droppedBlocks a holder for blocks evicted in the process
- * @return whether the requested free space is freed.
- */
- private def ensureFreeSpace(
+ * Try to evict blocks to free up a given amount of space to store a
particular block.
+ * Can fail if either the block is bigger than our memory or it would
require replacing
+ * another block from the same RDD (which leads to a wasteful cyclic
replacement pattern for
+ * RDDs that don't fit into memory that we want to avoid).
+ *
+ * @param blockId the ID of the block we are freeing space for, if any
+ * @param space the size of this block
+ * @param droppedBlocks a holder for blocks evicted in the process
+ * @return whether the requested free space is freed.
+ */
+ private[spark] def evictBlocksToFreeSpace(
blockId: Option[BlockId],
space: Long,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
+ assert(space > 0)
memoryManager.synchronized {
- val freeMemory = maxMemory - memoryUsed
+ var freedMemory = 0L
val rddToAdd = blockId.flatMap(getRddId)
val selectedBlocks = new ArrayBuffer[BlockId]
- var selectedMemory = 0L
-
- logInfo(s"Ensuring $space bytes of free space " +
- blockId.map { id => s"for block $id" }.getOrElse("") +
- s"(free: $freeMemory, max: $maxMemory)")
-
- // Fail fast if the block simply won't fit
- if (space > maxMemory) {
- logInfo("Will not " + blockId.map { id => s"store $id"
}.getOrElse("free memory") +
- s" as the required space ($space bytes) exceeds our memory limit
($maxMemory bytes)")
- return false
- }
-
- // No need to evict anything if there is already enough free space
- if (freeMemory >= space) {
- return true
- }
-
// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as
that
// can lead to exceptions.
entries.synchronized {
val iterator = entries.entrySet().iterator()
- while (freeMemory + selectedMemory < space && iterator.hasNext) {
+ while (freedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
selectedBlocks += blockId
- selectedMemory += pair.getValue.size
+ freedMemory += pair.getValue.size
}
}
}
- if (freeMemory + selectedMemory >= space) {
+ if (freedMemory >= space) {
logInfo(s"${selectedBlocks.size} blocks selected for dropping")
for (blockId <- selectedBlocks) {
val entry = entries.synchronized { entries.get(blockId) }
http://git-wip-us.apache.org/repos/asf/spark/blob/05e441e1/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git
a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index f55d435..555b640 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -24,9 +24,10 @@ import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
import org.mockito.Matchers.{any, anyLong}
-import org.mockito.Mockito.{mock, when}
+import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
+import org.scalatest.BeforeAndAfterEach
import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkFunSuite
@@ -36,105 +37,105 @@ import org.apache.spark.storage.{BlockId, BlockStatus,
MemoryStore, StorageLevel
/**
* Helper trait for sharing code among [[MemoryManager]] tests.
*/
-private[memory] trait MemoryManagerSuite extends SparkFunSuite {
+private[memory] trait MemoryManagerSuite extends SparkFunSuite with
BeforeAndAfterEach {
- import MemoryManagerSuite.DEFAULT_ENSURE_FREE_SPACE_CALLED
+ protected val evictedBlocks = new mutable.ArrayBuffer[(BlockId, BlockStatus)]
+
+ import MemoryManagerSuite.DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED
// Note: Mockito's verify mechanism does not provide a way to reset method
call counts
// without also resetting stubbed methods. Since our test code relies on the
latter,
- // we need to use our own variable to track invocations of `ensureFreeSpace`.
+ // we need to use our own variable to track invocations of
`evictBlocksToFreeSpace`.
/**
- * The amount of free space requested in the last call to
[[MemoryStore.ensureFreeSpace]]
+ * The amount of space requested in the last call to
[[MemoryStore.evictBlocksToFreeSpace]].
*
- * This set whenever [[MemoryStore.ensureFreeSpace]] is called, and cleared
when the test
- * code makes explicit assertions on this variable through
[[assertEnsureFreeSpaceCalled]].
+ * This set whenever [[MemoryStore.evictBlocksToFreeSpace]] is called, and
cleared when the test
+ * code makes explicit assertions on this variable through
+ * [[assertEvictBlocksToFreeSpaceCalled]].
*/
- private val ensureFreeSpaceCalled = new
AtomicLong(DEFAULT_ENSURE_FREE_SPACE_CALLED)
+ private val evictBlocksToFreeSpaceCalled = new AtomicLong(0)
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ evictedBlocks.clear()
+ evictBlocksToFreeSpaceCalled.set(DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED)
+ }
/**
- * Make a mocked [[MemoryStore]] whose [[MemoryStore.ensureFreeSpace]]
method is stubbed.
+ * Make a mocked [[MemoryStore]] whose
[[MemoryStore.evictBlocksToFreeSpace]] method is stubbed.
*
- * This allows our test code to release storage memory when
[[MemoryStore.ensureFreeSpace]]
- * is called without relying on [[org.apache.spark.storage.BlockManager]]
and all of its
- * dependencies.
+ * This allows our test code to release storage memory when these methods
are called
+ * without relying on [[org.apache.spark.storage.BlockManager]] and all of
its dependencies.
*/
protected def makeMemoryStore(mm: MemoryManager): MemoryStore = {
- val ms = mock(classOf[MemoryStore])
- when(ms.ensureFreeSpace(anyLong(),
any())).thenAnswer(ensureFreeSpaceAnswer(mm, 0))
- when(ms.ensureFreeSpace(any(), anyLong(),
any())).thenAnswer(ensureFreeSpaceAnswer(mm, 1))
+ val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS)
+ when(ms.evictBlocksToFreeSpace(any(), anyLong(), any()))
+ .thenAnswer(evictBlocksToFreeSpaceAnswer(mm))
mm.setMemoryStore(ms)
ms
}
/**
- * Make an [[Answer]] that stubs [[MemoryStore.ensureFreeSpace]] with the
right arguments.
- */
- private def ensureFreeSpaceAnswer(mm: MemoryManager, numBytesPos: Int):
Answer[Boolean] = {
+ * Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that
releases storage memory.
+ *
+ * This is a significant simplification of the real method, which actually
drops existing
+ * blocks based on the size of each block. Instead, here we simply release
as many bytes
+ * as needed to ensure the requested amount of free space. This allows us
to set up the
+ * test without relying on the [[org.apache.spark.storage.BlockManager]],
which brings in
+ * many other dependencies.
+ *
+ * Every call to this method will set a global variable,
[[evictBlocksToFreeSpaceCalled]], that
+ * records the number of bytes this is called with. This variable is
expected to be cleared
+ * by the test code later through [[assertEvictBlocksToFreeSpaceCalled]].
+ */
+ private def evictBlocksToFreeSpaceAnswer(mm: MemoryManager): Answer[Boolean]
= {
new Answer[Boolean] {
override def answer(invocation: InvocationOnMock): Boolean = {
val args = invocation.getArguments
- require(args.size > numBytesPos, s"bad test: expected >$numBytesPos
arguments " +
- s"in ensureFreeSpace, found ${args.size}")
- require(args(numBytesPos).isInstanceOf[Long], s"bad test: expected
ensureFreeSpace " +
- s"argument at index $numBytesPos to be a Long: ${args.mkString(",
")}")
- val numBytes = args(numBytesPos).asInstanceOf[Long]
- val success = mockEnsureFreeSpace(mm, numBytes)
- if (success) {
+ val numBytesToFree = args(1).asInstanceOf[Long]
+ assert(numBytesToFree > 0)
+ require(evictBlocksToFreeSpaceCalled.get() ===
DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED,
+ "bad test: evictBlocksToFreeSpace() variable was not reset")
+ evictBlocksToFreeSpaceCalled.set(numBytesToFree)
+ if (numBytesToFree <= mm.storageMemoryUsed) {
+ // We can evict enough blocks to fulfill the request for space
+ mm.releaseStorageMemory(numBytesToFree)
args.last.asInstanceOf[mutable.Buffer[(BlockId,
BlockStatus)]].append(
- (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytes, 0L, 0L)))
+ (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L,
0L)))
+ // We need to add this call so that that the suite-level
`evictedBlocks` is updated when
+ // execution evicts storage; in that case, args.last will not be
equal to evictedBlocks
+ // because it will be a temporary buffer created inside of the
MemoryManager rather than
+ // being passed in by the test code.
+ if (!(evictedBlocks eq args.last)) {
+ evictedBlocks.append(
+ (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L,
0L)))
+ }
+ true
+ } else {
+ // No blocks were evicted because eviction would not free enough
space.
+ false
}
- success
- }
- }
- }
-
- /**
- * Simulate the part of [[MemoryStore.ensureFreeSpace]] that releases
storage memory.
- *
- * This is a significant simplification of the real method, which actually
drops existing
- * blocks based on the size of each block. Instead, here we simply release
as many bytes
- * as needed to ensure the requested amount of free space. This allows us to
set up the
- * test without relying on the [[org.apache.spark.storage.BlockManager]],
which brings in
- * many other dependencies.
- *
- * Every call to this method will set a global variable,
[[ensureFreeSpaceCalled]], that
- * records the number of bytes this is called with. This variable is
expected to be cleared
- * by the test code later through [[assertEnsureFreeSpaceCalled]].
- */
- private def mockEnsureFreeSpace(mm: MemoryManager, numBytes: Long): Boolean
= mm.synchronized {
- require(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED,
- "bad test: ensure free space variable was not reset")
- // Record the number of bytes we freed this call
- ensureFreeSpaceCalled.set(numBytes)
- if (numBytes <= mm.maxStorageMemory) {
- def freeMemory = mm.maxStorageMemory - mm.storageMemoryUsed
- val spaceToRelease = numBytes - freeMemory
- if (spaceToRelease > 0) {
- mm.releaseStorageMemory(spaceToRelease)
}
- freeMemory >= numBytes
- } else {
- // We attempted to free more bytes than our max allowable memory
- false
}
}
/**
- * Assert that [[MemoryStore.ensureFreeSpace]] is called with the given
parameters.
+ * Assert that [[MemoryStore.evictBlocksToFreeSpace]] is called with the
given parameters.
*/
- protected def assertEnsureFreeSpaceCalled(ms: MemoryStore, numBytes: Long):
Unit = {
- assert(ensureFreeSpaceCalled.get() === numBytes,
- s"expected ensure free space to be called with $numBytes")
- ensureFreeSpaceCalled.set(DEFAULT_ENSURE_FREE_SPACE_CALLED)
+ protected def assertEvictBlocksToFreeSpaceCalled(ms: MemoryStore, numBytes:
Long): Unit = {
+ assert(evictBlocksToFreeSpaceCalled.get() === numBytes,
+ s"expected evictBlocksToFreeSpace() to be called with $numBytes")
+ evictBlocksToFreeSpaceCalled.set(DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED)
}
/**
- * Assert that [[MemoryStore.ensureFreeSpace]] is NOT called.
+ * Assert that [[MemoryStore.evictBlocksToFreeSpace]] is NOT called.
*/
- protected def assertEnsureFreeSpaceNotCalled[T](ms: MemoryStore): Unit = {
- assert(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED,
- "ensure free space should not have been called!")
+ protected def assertEvictBlocksToFreeSpaceNotCalled[T](ms: MemoryStore):
Unit = {
+ assert(evictBlocksToFreeSpaceCalled.get() ===
DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED,
+ "evictBlocksToFreeSpace() should not have been called!")
+ assert(evictedBlocks.isEmpty)
}
/**
@@ -291,5 +292,5 @@ private[memory] trait MemoryManagerSuite extends
SparkFunSuite {
}
private object MemoryManagerSuite {
- private val DEFAULT_ENSURE_FREE_SPACE_CALLED = -1L
+ private val DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED = -1L
}
http://git-wip-us.apache.org/repos/asf/spark/blob/05e441e1/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git
a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index 54cb28c..6700b94 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -17,16 +17,13 @@
package org.apache.spark.memory
-import scala.collection.mutable.ArrayBuffer
-
import org.mockito.Mockito.when
import org.apache.spark.SparkConf
-import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore,
TestBlockId}
+import org.apache.spark.storage.{MemoryStore, TestBlockId}
class StaticMemoryManagerSuite extends MemoryManagerSuite {
private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4")
- private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
/**
* Make a [[StaticMemoryManager]] and a [[MemoryStore]] with limited class
dependencies.
@@ -85,33 +82,38 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
assert(mm.storageMemoryUsed === 0L)
assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
- // `ensureFreeSpace` should be called with the number of bytes requested
- assertEnsureFreeSpaceCalled(ms, 10L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 10L)
+
assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 100L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 110L)
// Acquire more than the max, not granted
assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L,
evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, maxStorageMem + 1L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 110L)
// Acquire up to the max, requests after this are still granted due to LRU
eviction
assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 1000L)
+ assertEvictBlocksToFreeSpaceCalled(ms, 110L)
assert(mm.storageMemoryUsed === 1000L)
assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 1L)
+ assertEvictBlocksToFreeSpaceCalled(ms, 1L)
+ assert(evictedBlocks.nonEmpty)
+ evictedBlocks.clear()
+ // Note: We evicted 1 byte to put another 1-byte block in, so the storage
memory used remains at
+ // 1000 bytes. This is different from real behavior, where the 1-byte
block would have evicted
+ // the 1000-byte block entirely. This is set up differently so we can
write finer-grained tests.
assert(mm.storageMemoryUsed === 1000L)
mm.releaseStorageMemory(800L)
assert(mm.storageMemoryUsed === 200L)
// Acquire after release
assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 1L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 201L)
mm.releaseAllStorageMemory()
assert(mm.storageMemoryUsed === 0L)
assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 1L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 1L)
// Release beyond what was acquired
mm.releaseStorageMemory(100L)
@@ -133,7 +135,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
assert(mm.executionMemoryUsed === 200L)
// Only storage memory should increase
assert(mm.acquireStorageMemory(dummyBlock, 50L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 50L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 50L)
assert(mm.executionMemoryUsed === 200L)
// Only execution memory should be released
@@ -151,21 +153,25 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite
{
val dummyBlock = TestBlockId("lonely water")
val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
assert(mm.acquireUnrollMemory(dummyBlock, 100L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 100L)
+ when(ms.currentUnrollMemory).thenReturn(100L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 100L)
mm.releaseUnrollMemory(40L)
assert(mm.storageMemoryUsed === 60L)
when(ms.currentUnrollMemory).thenReturn(60L)
- assert(mm.acquireUnrollMemory(dummyBlock, 500L, evictedBlocks))
+ assert(mm.acquireStorageMemory(dummyBlock, 800L, evictedBlocks))
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
+ assert(mm.storageMemoryUsed === 860L)
// `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400
bytes.
- // Since we already occupy 60 bytes, we will try to ensure only 400 - 60 =
340 bytes.
- assertEnsureFreeSpaceCalled(ms, 340L)
- assert(mm.storageMemoryUsed === 560L)
- when(ms.currentUnrollMemory).thenReturn(560L)
- assert(!mm.acquireUnrollMemory(dummyBlock, 800L, evictedBlocks))
- assert(mm.storageMemoryUsed === 560L)
- // We already have 560 bytes > the max unroll space of 400 bytes, so no
bytes are freed
- assertEnsureFreeSpaceCalled(ms, 0L)
+ // Since we already occupy 60 bytes, we will try to evict only 400 - 60 =
340 bytes.
+ assert(mm.acquireUnrollMemory(dummyBlock, 240L, evictedBlocks))
+ assertEvictBlocksToFreeSpaceCalled(ms, 100L)
+ when(ms.currentUnrollMemory).thenReturn(300L) // 60 + 240
+ assert(mm.storageMemoryUsed === 1000L)
+ evictedBlocks.clear()
+ assert(!mm.acquireUnrollMemory(dummyBlock, 150L, evictedBlocks))
+ assertEvictBlocksToFreeSpaceCalled(ms, 100L) // 400 - 300
+ assert(mm.storageMemoryUsed === 900L) // 100 bytes were evicted
// Release beyond what was acquired
mm.releaseUnrollMemory(maxStorageMem)
assert(mm.storageMemoryUsed === 0L)
http://git-wip-us.apache.org/repos/asf/spark/blob/05e441e1/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index e97c898..71221de 100644
---
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -17,16 +17,13 @@
package org.apache.spark.memory
-import scala.collection.mutable.ArrayBuffer
-
import org.scalatest.PrivateMethodTester
import org.apache.spark.SparkConf
-import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore,
TestBlockId}
+import org.apache.spark.storage.{MemoryStore, TestBlockId}
class UnifiedMemoryManagerSuite extends MemoryManagerSuite with
PrivateMethodTester {
private val dummyBlock = TestBlockId("--")
- private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
private val storageFraction: Double = 0.5
@@ -78,33 +75,40 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite
with PrivateMethodTes
val (mm, ms) = makeThings(maxMemory)
assert(mm.storageMemoryUsed === 0L)
assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
- // `ensureFreeSpace` should be called with the number of bytes requested
- assertEnsureFreeSpaceCalled(ms, 10L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 10L)
+
assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 100L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 110L)
// Acquire more than the max, not granted
assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, maxMemory + 1L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 110L)
// Acquire up to the max, requests after this are still granted due to LRU
eviction
assert(mm.acquireStorageMemory(dummyBlock, maxMemory, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 1000L)
+ assertEvictBlocksToFreeSpaceCalled(ms, 110L)
assert(mm.storageMemoryUsed === 1000L)
+ assert(evictedBlocks.nonEmpty)
+ evictedBlocks.clear()
assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 1L)
+ assertEvictBlocksToFreeSpaceCalled(ms, 1L)
+ assert(evictedBlocks.nonEmpty)
+ evictedBlocks.clear()
+ // Note: We evicted 1 byte to put another 1-byte block in, so the storage
memory used remains at
+ // 1000 bytes. This is different from real behavior, where the 1-byte
block would have evicted
+ // the 1000-byte block entirely. This is set up differently so we can
write finer-grained tests.
assert(mm.storageMemoryUsed === 1000L)
mm.releaseStorageMemory(800L)
assert(mm.storageMemoryUsed === 200L)
// Acquire after release
assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 1L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 201L)
mm.releaseAllStorageMemory()
assert(mm.storageMemoryUsed === 0L)
assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 1L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 1L)
// Release beyond what was acquired
mm.releaseStorageMemory(100L)
@@ -117,25 +121,27 @@ class UnifiedMemoryManagerSuite extends
MemoryManagerSuite with PrivateMethodTes
val (mm, ms) = makeThings(maxMemory)
// Acquire enough storage memory to exceed the storage region
assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 750L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.executionMemoryUsed === 0L)
assert(mm.storageMemoryUsed === 750L)
// Execution needs to request 250 bytes to evict storage memory
assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP)
=== 100L)
assert(mm.executionMemoryUsed === 100L)
assert(mm.storageMemoryUsed === 750L)
- assertEnsureFreeSpaceNotCalled(ms)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
// Execution wants 200 bytes but only 150 are free, so storage is evicted
assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP)
=== 200L)
assert(mm.executionMemoryUsed === 300L)
- assertEnsureFreeSpaceCalled(ms, 50L)
- assert(mm.executionMemoryUsed === 300L)
+ assert(mm.storageMemoryUsed === 700L)
+ assertEvictBlocksToFreeSpaceCalled(ms, 50L)
+ assert(evictedBlocks.nonEmpty)
+ evictedBlocks.clear()
mm.releaseAllStorageMemory()
require(mm.executionMemoryUsed === 300L)
require(mm.storageMemoryUsed === 0, "bad test: all storage memory should
have been released")
// Acquire some storage memory again, but this time keep it within the
storage region
assert(mm.acquireStorageMemory(dummyBlock, 400L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 400L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 400L)
assert(mm.executionMemoryUsed === 300L)
// Execution cannot evict storage because the latter is within the storage
fraction,
@@ -143,7 +149,27 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite
with PrivateMethodTes
assert(mm.acquireExecutionMemory(400L, taskAttemptId, MemoryMode.ON_HEAP)
=== 300L)
assert(mm.executionMemoryUsed === 600L)
assert(mm.storageMemoryUsed === 400L)
- assertEnsureFreeSpaceNotCalled(ms)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
+ }
+
+ test("execution memory requests smaller than free memory should evict
storage (SPARK-12165)") {
+ val maxMemory = 1000L
+ val taskAttemptId = 0L
+ val (mm, ms) = makeThings(maxMemory)
+ // Acquire enough storage memory to exceed the storage region size
+ assert(mm.acquireStorageMemory(dummyBlock, 700L, evictedBlocks))
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
+ assert(mm.executionMemoryUsed === 0L)
+ assert(mm.storageMemoryUsed === 700L)
+ // SPARK-12165: previously, MemoryStore would not evict anything because
it would
+ // mistakenly think that the 300 bytes of free space was still available
even after
+ // using it to expand the execution pool. Consequently, no storage memory
was released
+ // and the following call granted only 300 bytes to execution.
+ assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP)
=== 500L)
+ assertEvictBlocksToFreeSpaceCalled(ms, 200L)
+ assert(mm.storageMemoryUsed === 500L)
+ assert(mm.executionMemoryUsed === 500L)
+ assert(evictedBlocks.nonEmpty)
}
test("storage does not evict execution") {
@@ -154,32 +180,34 @@ class UnifiedMemoryManagerSuite extends
MemoryManagerSuite with PrivateMethodTes
assert(mm.acquireExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP)
=== 800L)
assert(mm.executionMemoryUsed === 800L)
assert(mm.storageMemoryUsed === 0L)
- assertEnsureFreeSpaceNotCalled(ms)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
// Storage should not be able to evict execution
assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
assert(mm.executionMemoryUsed === 800L)
assert(mm.storageMemoryUsed === 100L)
- assertEnsureFreeSpaceCalled(ms, 100L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(!mm.acquireStorageMemory(dummyBlock, 250L, evictedBlocks))
assert(mm.executionMemoryUsed === 800L)
assert(mm.storageMemoryUsed === 100L)
- assertEnsureFreeSpaceCalled(ms, 250L)
+ // Do not attempt to evict blocks, since evicting will not free enough
memory:
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP)
mm.releaseStorageMemory(maxMemory)
// Acquire some execution memory again, but this time keep it within the
execution region
assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP)
=== 200L)
assert(mm.executionMemoryUsed === 200L)
assert(mm.storageMemoryUsed === 0L)
- assertEnsureFreeSpaceNotCalled(ms)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
// Storage should still not be able to evict execution
assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
assert(mm.executionMemoryUsed === 200L)
assert(mm.storageMemoryUsed === 750L)
- assertEnsureFreeSpaceCalled(ms, 750L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms) // since there were 800 bytes
free
assert(!mm.acquireStorageMemory(dummyBlock, 850L, evictedBlocks))
assert(mm.executionMemoryUsed === 200L)
assert(mm.storageMemoryUsed === 750L)
- assertEnsureFreeSpaceCalled(ms, 850L)
+ // Do not attempt to evict blocks, since evicting will not free enough
memory:
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
}
test("small heap") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]