Repository: spark
Updated Branches:
  refs/heads/master 23a9e62ba -> 5030923ea


[SPARK-12155][SPARK-12253] Fix executor OOM in unified memory management

**Problem.** In unified memory management, acquiring execution memory may lead 
to eviction of storage memory. However, the space freed from evicting cached 
blocks is distributed among all active tasks. Thus, an incorrect upper bound on 
the execution memory per task can cause the acquisition to fail, leading to 
OOM's and premature spills.

**Example.** Suppose total memory is 1000B, cached blocks occupy 900B, 
`spark.memory.storageFraction` is 0.4, and there are two active tasks. In this 
case, the cap on task execution memory is 100B / 2 = 50B. If task A tries to 
acquire 200B, it will evict 100B of storage but can only acquire 50B because of 
the incorrect cap. For another example, see this [regression 
test](https://github.com/andrewor14/spark/blob/fix-oom/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala#L233)
 that I stole from JoshRosen.

**Solution.** Fix the cap on task execution memory. It should take into account 
the space that could have been freed by storage in addition to the current 
amount of memory available to execution. In the example above, the correct cap 
should have been 600B / 2 = 300B.

This patch also guards against the race condition (SPARK-12253):
(1) Existing tasks collectively occupy all execution memory
(2) New task comes in and blocks while existing tasks spill
(3) After tasks finish spilling, another task jumps in and puts in a large 
block, stealing the freed memory
(4) New task still cannot acquire memory and goes back to sleep

Author: Andrew Or <and...@databricks.com>

Closes #10240 from andrewor14/fix-oom.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5030923e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5030923e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5030923e

Branch: refs/heads/master
Commit: 5030923ea8bb94ac8fa8e432de9fc7089aa93986
Parents: 23a9e62
Author: Andrew Or <and...@databricks.com>
Authored: Thu Dec 10 15:30:08 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Thu Dec 10 15:30:08 2015 -0800

----------------------------------------------------------------------
 .../spark/memory/ExecutionMemoryPool.scala      | 57 ++++++++++++++------
 .../spark/memory/UnifiedMemoryManager.scala     | 57 +++++++++++++++-----
 .../scala/org/apache/spark/scheduler/Task.scala |  6 +++
 .../memory/UnifiedMemoryManagerSuite.scala      | 25 +++++++++
 4 files changed, 114 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5030923e/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala 
b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
index 9023e1a..dbb0ad8 100644
--- a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
@@ -70,11 +70,28 @@ private[memory] class ExecutionMemoryPool(
    * active tasks) before it is forced to spill. This can happen if the number 
of tasks increase
    * but an older task had a lot of memory already.
    *
+   * @param numBytes number of bytes to acquire
+   * @param taskAttemptId the task attempt acquiring memory
+   * @param maybeGrowPool a callback that potentially grows the size of this 
pool. It takes in
+   *                      one parameter (Long) that represents the desired 
amount of memory by
+   *                      which this pool should be expanded.
+   * @param computeMaxPoolSize a callback that returns the maximum allowable 
size of this pool
+   *                           at this given moment. This is not a field 
because the max pool
+   *                           size is variable in certain cases. For 
instance, in unified
+   *                           memory management, the execution pool can be 
expanded by evicting
+   *                           cached blocks, thereby shrinking the storage 
pool.
+   *
    * @return the number of bytes granted to the task.
    */
-  def acquireMemory(numBytes: Long, taskAttemptId: Long): Long = 
lock.synchronized {
+  private[memory] def acquireMemory(
+      numBytes: Long,
+      taskAttemptId: Long,
+      maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
+      computeMaxPoolSize: () => Long = () => poolSize): Long = 
lock.synchronized {
     assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")
 
+    // TODO: clean up this clunky method signature
+
     // Add this task to the taskMemory map just so we can keep an accurate 
count of the number
     // of active tasks, to let other tasks ramp down their memory in calls to 
`acquireMemory`
     if (!memoryForTask.contains(taskAttemptId)) {
@@ -91,25 +108,31 @@ private[memory] class ExecutionMemoryPool(
       val numActiveTasks = memoryForTask.keys.size
       val curMem = memoryForTask(taskAttemptId)
 
-      // How much we can grant this task; don't let it grow to more than 1 / 
numActiveTasks;
-      // don't let it be negative
-      val maxToGrant =
-        math.min(numBytes, math.max(0, (poolSize / numActiveTasks) - curMem))
+      // In every iteration of this loop, we should first try to reclaim any 
borrowed execution
+      // space from storage. This is necessary because of the potential race 
condition where new
+      // storage blocks may steal the free execution memory that this task was 
waiting for.
+      maybeGrowPool(numBytes - memoryFree)
+
+      // Maximum size the pool would have after potentially growing the pool.
+      // This is used to compute the upper bound of how much memory each task 
can occupy. This
+      // must take into account potential free memory as well as the amount 
this pool currently
+      // occupies. Otherwise, we may run into SPARK-12155 where, in unified 
memory management,
+      // we did not take into account space that could have been freed by 
evicting cached blocks.
+      val maxPoolSize = computeMaxPoolSize()
+      val maxMemoryPerTask = maxPoolSize / numActiveTasks
+      val minMemoryPerTask = poolSize / (2 * numActiveTasks)
+
+      // How much we can grant this task; keep its share within 0 <= X <= 1 / 
numActiveTasks
+      val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - 
curMem))
       // Only give it as much memory as is free, which might be none if it 
reached 1 / numTasks
       val toGrant = math.min(maxToGrant, memoryFree)
 
-      if (curMem < poolSize / (2 * numActiveTasks)) {
-        // We want to let each task get at least 1 / (2 * numActiveTasks) 
before blocking;
-        // if we can't give it this much now, wait for other tasks to free up 
memory
-        // (this happens if older tasks allocated lots of memory before N grew)
-        if (memoryFree >= math.min(maxToGrant, poolSize / (2 * numActiveTasks) 
- curMem)) {
-          memoryForTask(taskAttemptId) += toGrant
-          return toGrant
-        } else {
-          logInfo(
-            s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool 
to be free")
-          lock.wait()
-        }
+      // We want to let each task get at least 1 / (2 * numActiveTasks) before 
blocking;
+      // if we can't give it this much now, wait for other tasks to free up 
memory
+      // (this happens if older tasks allocated lots of memory before N grew)
+      if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
+        logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName 
pool to be free")
+        lock.wait()
       } else {
         memoryForTask(taskAttemptId) += toGrant
         return toGrant

http://git-wip-us.apache.org/repos/asf/spark/blob/5030923e/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 0b9f6a9..829f054 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -81,22 +81,51 @@ private[spark] class UnifiedMemoryManager private[memory] (
     assert(numBytes >= 0)
     memoryMode match {
       case MemoryMode.ON_HEAP =>
-        if (numBytes > onHeapExecutionMemoryPool.memoryFree) {
-          val extraMemoryNeeded = numBytes - 
onHeapExecutionMemoryPool.memoryFree
-          // There is not enough free memory in the execution pool, so try to 
reclaim memory from
-          // storage. We can reclaim any free memory from the storage pool. If 
the storage pool
-          // has grown to become larger than `storageRegionSize`, we can evict 
blocks and reclaim
-          // the memory that storage has borrowed from execution.
-          val memoryReclaimableFromStorage =
-            math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize 
- storageRegionSize)
-          if (memoryReclaimableFromStorage > 0) {
-            // Only reclaim as much space as is necessary and available:
-            val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
-              math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
-            onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
+
+        /**
+         * Grow the execution pool by evicting cached blocks, thereby 
shrinking the storage pool.
+         *
+         * When acquiring memory for a task, the execution pool may need to 
make multiple
+         * attempts. Each attempt must be able to evict storage in case 
another task jumps in
+         * and caches a large block between the attempts. This is called once 
per attempt.
+         */
+        def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
+          if (extraMemoryNeeded > 0) {
+            // There is not enough free memory in the execution pool, so try 
to reclaim memory from
+            // storage. We can reclaim any free memory from the storage pool. 
If the storage pool
+            // has grown to become larger than `storageRegionSize`, we can 
evict blocks and reclaim
+            // the memory that storage has borrowed from execution.
+            val memoryReclaimableFromStorage =
+              math.max(storageMemoryPool.memoryFree, 
storageMemoryPool.poolSize - storageRegionSize)
+            if (memoryReclaimableFromStorage > 0) {
+              // Only reclaim as much space as is necessary and available:
+              val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
+                math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
+              onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
+            }
           }
         }
-        onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
+
+        /**
+         * The size the execution pool would have after evicting storage 
memory.
+         *
+         * The execution memory pool divides this quantity among the active 
tasks evenly to cap
+         * the execution memory allocation for each task. It is important to 
keep this greater
+         * than the execution pool size, which doesn't take into account 
potential memory that
+         * could be freed by evicting storage. Otherwise we may hit 
SPARK-12155.
+         *
+         * Additionally, this quantity should be kept below `maxMemory` to 
arbitrate fairness
+         * in execution memory allocation across tasks, Otherwise, a task may 
occupy more than
+         * its fair share of execution memory, mistakenly thinking that other 
tasks can acquire
+         * the portion of storage memory that cannot be evicted.
+         */
+        def computeMaxExecutionPoolSize(): Long = {
+          maxMemory - math.min(storageMemoryUsed, storageRegionSize)
+        }
+
+        onHeapExecutionMemoryPool.acquireMemory(
+          numBytes, taskAttemptId, maybeGrowExecutionPool, 
computeMaxExecutionPoolSize)
+
       case MemoryMode.OFF_HEAP =>
         // For now, we only support on-heap caching of data, so we do not need 
to interact with
         // the storage pool when allocating off-heap memory. This will change 
in the future, though.

http://git-wip-us.apache.org/repos/asf/spark/blob/5030923e/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index d4bc3a5..9f27eed 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -92,6 +92,12 @@ private[spark] abstract class Task[T](
         Utils.tryLogNonFatalError {
           // Release memory used by this thread for unrolling blocks
           
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
+          // Notify any tasks waiting for execution memory to be freed to wake 
up and try to
+          // acquire memory again. This makes impossible the scenario where a 
task sleeps forever
+          // because there are no other tasks left to notify it. Since this is 
safe to do but may
+          // not be strictly necessary, we should revisit whether we can 
remove this in the future.
+          val memoryManager = SparkEnv.get.memoryManager
+          memoryManager.synchronized { memoryManager.notifyAll() }
         }
       } finally {
         TaskContext.unset()

http://git-wip-us.apache.org/repos/asf/spark/blob/5030923e/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index e21a028..6cc4859 100644
--- 
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -230,4 +230,29 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite 
with PrivateMethodTes
     assert(exception.getMessage.contains("larger heap size"))
   }
 
+  test("execution can evict cached blocks when there are multiple active tasks 
(SPARK-12155)") {
+    val conf = new SparkConf()
+      .set("spark.memory.fraction", "1")
+      .set("spark.memory.storageFraction", "0")
+      .set("spark.testing.memory", "1000")
+    val mm = UnifiedMemoryManager(conf, numCores = 2)
+    val ms = makeMemoryStore(mm)
+    assert(mm.maxMemory === 1000)
+    // Have two tasks each acquire some execution memory so that the memory 
pool registers that
+    // there are two active tasks:
+    assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L)
+    assert(mm.acquireExecutionMemory(100L, 1, MemoryMode.ON_HEAP) === 100L)
+    // Fill up all of the remaining memory with storage.
+    assert(mm.acquireStorageMemory(dummyBlock, 800L, evictedBlocks))
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
+    assert(mm.storageMemoryUsed === 800)
+    assert(mm.executionMemoryUsed === 200)
+    // A task should still be able to allocate 100 bytes execution memory by 
evicting blocks
+    assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L)
+    assertEvictBlocksToFreeSpaceCalled(ms, 100L)
+    assert(mm.executionMemoryUsed === 300)
+    assert(mm.storageMemoryUsed === 700)
+    assert(evictedBlocks.nonEmpty)
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to