[ 
https://issues.apache.org/jira/browse/SPARK-14560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666005#comment-15666005
 ] 

KaiXu commented on SPARK-14560:
-------------------------------

spark1.6.2 has the same issue:

16/11/15 10:11:15 INFO scheduler.TaskSetManager: Finished task 16761.0 in stage 
1.0 (TID 17639) in 11657 ms on eurus-node4 (16634/22488)
16/11/15 10:11:15 INFO scheduler.DAGScheduler: Executor lost: 13 (epoch 1)
16/11/15 10:11:15 WARN scheduler.TaskSetManager: Lost task 10002.0 in stage 1.0 
(TID 10880, eurus-node3): java.lang.OutOfMemoryError: Unable to acquire 51 
bytes of memory, got 0
        at 
org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)
        at 
org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:359)
        at 
org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:380)
        at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)
        at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

16/11/15 10:11:15 INFO scheduler.TaskSetManager: Finished task 16766.0 in stage 
1.0 (TID 17644) in 11508 ms on eurus-node4 (16635/22488)
16/11/15 10:11:15 INFO scheduler.TaskSetManager: Finished task 16574.0 in stage 
1.0 (TID 17452) in 21354 ms on eurus-node1 (16636/22488)
16/11/15 10:11:15 INFO scheduler.TaskSetManager: Finished task 16733.0 in stage 
1.0 (TID 17611) in 13122 ms on eurus-node2 (16637/22488)
16/11/15 10:11:15 INFO storage.BlockManagerMasterEndpoint: Trying to remove 
executor 13 from BlockManagerMaster.
16/11/15 10:11:15 INFO scheduler.TaskSetManager: Finished task 16735.0 in stage 
1.0 (TID 17613) in 12888 ms on eurus-node2 (16638/22488)
16/11/15 10:11:15 INFO scheduler.TaskSetManager: Finished task 16723.0 in stage 
1.0 (TID 17601) in 13572 ms on eurus-node2 (16639/22488)
16/11/15 10:11:15 INFO scheduler.TaskSetManager: Finished task 16521.0 in stage 
1.0 (TID 17399) in 24803 ms on eurus-node4 (16640/22488)
16/11/15 10:11:15 INFO scheduler.TaskSetManager: Finished task 16722.0 in stage 
1.0 (TID 17600) in 13621 ms on eurus-node2 (16641/22488)
16/11/15 10:11:15 INFO storage.BlockManagerMasterEndpoint: Removing block 
manager BlockManagerId(13, eurus-node3, 42404)
16/11/15 10:11:15 WARN scheduler.TaskSetManager: Lost task 16962.0 in stage 1.0 
(TID 17840, eurus-node4): FetchFailed(BlockManagerId(13, eurus-node3, 42404), 
shuffleId=4, mapId=35, reduceId=16962, message=
org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: 
/mnt/disk1/yarn/nm/usercache/root/appcache/application_1479174444013_0002/blockmgr-0161eaa3-ea02-40b2-81f7-082606db1271/1b/shuffle_4_35_0.index
 (No such file or directory)
        at java.io.FileInputStream.open(Native Method)
        at java.io.FileInputStream.<init>(FileInputStream.java:146)
        at 
org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:191)
        at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:298)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:58)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:58)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:58)
        at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:149)
        at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:102)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:745)

        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197)
        at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:103)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: 
/mnt/disk1/yarn/nm/usercache/root/appcache/application_1479174444013_0002/blockmgr-0161eaa3-ea02-40b2-81f7-082606db1271/1b/shuffle_4_35_0.index
 (No such file or directory)
        at java.io.FileInputStream.open(Native Method)

> Cooperative Memory Management for Spillables
> --------------------------------------------
>
>                 Key: SPARK-14560
>                 URL: https://issues.apache.org/jira/browse/SPARK-14560
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.6.1
>            Reporter: Imran Rashid
>            Assignee: Lianhui Wang
>             Fix For: 2.0.0
>
>
> SPARK-10432 introduced cooperative memory management for SQL operators that 
> can spill; however, {{Spillable}} s used by the old RDD api still do not 
> cooperate.  This can lead to memory starvation, in particular on a 
> shuffle-to-shuffle stage, eventually resulting in errors like:
> {noformat}
> 16/03/28 08:59:54 INFO memory.TaskMemoryManager: Memory used in task 3081
> 16/03/28 08:59:54 INFO memory.TaskMemoryManager: Acquired by 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter@69ab0291: 32.0 KB
> 16/03/28 08:59:54 INFO memory.TaskMemoryManager: 1317230346 bytes of memory 
> were used by task 3081 but are not associated with specific consumers
> 16/03/28 08:59:54 INFO memory.TaskMemoryManager: 1317263114 bytes of memory 
> are used for execution and 1710484 bytes of memory are used for storage
> 16/03/28 08:59:54 ERROR executor.Executor: Managed memory leak detected; size 
> = 1317230346 bytes, TID = 3081
> 16/03/28 08:59:54 ERROR executor.Executor: Exception in task 533.0 in stage 
> 3.0 (TID 3081)
> java.lang.OutOfMemoryError: Unable to acquire 75 bytes of memory, got 0
>         at 
> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)
>         at 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:346)
>         at 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:367)
>         at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)
>         at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {noformat}
> This can happen anytime the shuffle read side requires more memory than what 
> is available for the task.  Since the shuffle-read side doubles its memory 
> request each time, it can easily end up acquiring all of the available 
> memory, even if it does not use it.  Eg., say that after the final spill, the 
> shuffle-read side requires 10 MB more memory, and there is 15 MB of memory 
> available.  But if it starts at 2 MB, it will double to 4, 8, and then 
> request 16 MB of memory, and in fact get all available 15 MB.  Since the 15 
> MB of memory is sufficient, it will not spill, and will continue holding on 
> to all available memory.  But this leaves *no* memory available for the 
> shuffle-write side.  Since the shuffle-write side cannot request the 
> shuffle-read side to free up memory, this leads to an OOM.
> The simple solution is to make {{Spillable}} implement {{MemoryConsumer}} as 
> well, so RDDs can benefit from the cooperative memory management introduced 
> by SPARK-10342.
> Note that an additional improvement would be for the shuffle-read side to 
> simple release unused memory, without spilling, in case that would leave 
> enough memory, and only spill if that was inadequate.  However that can come 
> as a later improvement.
> *Workaround*:  You can set 
> {{spark.shuffle.spill.numElementsForceSpillThreshold=N}} to force spilling to 
> occur every {{N}} elements, thus preventing the shuffle-read side from ever 
> grabbing all of the available memory.  However, this requires careful tuning 
> of {{N}} to specific workloads: too big, and you will still get an OOM; too 
> small, and there will be so much spilling that performance will suffer 
> drastically.  Furthermore, this workaround uses an *undocumented* 
> configuration with *no compatibility guarantees* for future versions of spark.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to