[
https://issues.apache.org/jira/browse/SPARK-14560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15239559#comment-15239559
]
Imran Rashid commented on SPARK-14560:
--------------------------------------
After some doing some more testing, I realized my "fix" wasn't really a fix at
all -- its not too hard to force the {{Spillable}} to spill, but the problem is
if you've already got an iterator, that keeps a reference to the in-memory
data. So it was really just cheating, by having the execution layer use more
memory than it was reporting (since it was reporting that it had freed memory
after the spill, when in fact it had not). Getting the inflight iterators to
switch from data in memory to the new spill is pretty gnarly, and I would need
to do a bunch of testing before I felt confident in it.
However I realized there are two solutions that are both much easier to
implement, safer, and can easily be put behind a conf to turn the behavior on
and off:
* always spill shuffle-read. Anytime there is a shuffle-to-stage stage and the
shuffle-read creates a {{Spillable}}, after its done inserting all records, it
can spill all of its data to disk. This will prevent starvation. The downside
is that you force spilling even if its not necessary. The benefit is that it
is really easy to do, and will prevent OOMs.
* reserve a fraction of execution memory for future operations. This is a
little trickier because (a) you've got to figure out if there are any future
execution operations in the same task and (b) come up with some sensible amount
of memory to reserve. Furthermore, though it can prevent an unnecessary spill
when there is enough memory, it actually may not be more efficient in all
scenarios. If there is a huge shuffle-read, which requires a lot of spills,
rather than keeping a little bit of it in memory at the very end, it probably
makes more sense to just create fewer spills in the first place by using all
available memory, and then spilling at the end to free memory for the next step.
true cooperative memory management for spillables would still be nice, but
given the complexity of that I think something simpler which will prevent OOM
is a better first step.
> Cooperative Memory Management for Spillables
> --------------------------------------------
>
> Key: SPARK-14560
> URL: https://issues.apache.org/jira/browse/SPARK-14560
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.6.1
> Reporter: Imran Rashid
> Assignee: Imran Rashid
>
> SPARK-10432 introduced cooperative memory management for SQL operators that
> can spill; however, {{Spillable}} s used by the old RDD api still do not
> cooperate. This can lead to memory starvation, in particular on a
> shuffle-to-shuffle stage, eventually resulting in errors like:
> {noformat}
> 16/03/28 08:59:54 INFO memory.TaskMemoryManager: Memory used in task 3081
> 16/03/28 08:59:54 INFO memory.TaskMemoryManager: Acquired by
> org.apache.spark.shuffle.sort.ShuffleExternalSorter@69ab0291: 32.0 KB
> 16/03/28 08:59:54 INFO memory.TaskMemoryManager: 1317230346 bytes of memory
> were used by task 3081 but are not associated with specific consumers
> 16/03/28 08:59:54 INFO memory.TaskMemoryManager: 1317263114 bytes of memory
> are used for execution and 1710484 bytes of memory are used for storage
> 16/03/28 08:59:54 ERROR executor.Executor: Managed memory leak detected; size
> = 1317230346 bytes, TID = 3081
> 16/03/28 08:59:54 ERROR executor.Executor: Exception in task 533.0 in stage
> 3.0 (TID 3081)
> java.lang.OutOfMemoryError: Unable to acquire 75 bytes of memory, got 0
> at
> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)
> at
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:346)
> at
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:367)
> at
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)
> at
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> This can happen anytime the shuffle read side requires more memory than what
> is available for the task. Since the shuffle-read side doubles its memory
> request each time, it can easily end up acquiring all of the available
> memory, even if it does not use it. Eg., say that after the final spill, the
> shuffle-read side requires 10 MB more memory, and there is 15 MB of memory
> available. But if it starts at 2 MB, it will double to 4, 8, and then
> request 16 MB of memory, and in fact get all available 15 MB. Since the 15
> MB of memory is sufficient, it will not spill, and will continue holding on
> to all available memory. But this leaves *no* memory available for the
> shuffle-write side. Since the shuffle-write side cannot request the
> shuffle-read side to free up memory, this leads to an OOM.
> The simple solution is to make {{Spillable}} implement {{MemoryConsumer}} as
> well, so RDDs can benefit from the cooperative memory management introduced
> by SPARK-10342.
> Note that an additional improvement would be for the shuffle-read side to
> simple release unused memory, without spilling, in case that would leave
> enough memory, and only spill if that was inadequate. However that can come
> as a later improvement.
> *Workaround*: You can set
> {{spark.shuffle.spill.numElementsForceSpillThreshold=N}} to force spilling to
> occur every {{N}} elements, thus preventing the shuffle-read side from ever
> grabbing all of the available memory. However, this requires careful tuning
> of {{N}} to specific workloads: too big, and you will still get an OOM; too
> small, and there will be so much spilling that performance will suffer
> drastically. Furthermore, this workaround uses an *undocumented*
> configuration with *no compatibility guarantees* for future versions of spark.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]