[ https://issues.apache.org/jira/browse/SPARK-24989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565577#comment-16565577 ]
Apache Spark commented on SPARK-24989: -------------------------------------- User 'xuanyuanking' has created a pull request for this issue: https://github.com/apache/spark/pull/21945 > BlockFetcher should retry while getting OutOfDirectMemoryError > -------------------------------------------------------------- > > Key: SPARK-24989 > URL: https://issues.apache.org/jira/browse/SPARK-24989 > Project: Spark > Issue Type: Improvement > Components: Shuffle > Affects Versions: 2.2.0 > Reporter: Li Yuanjian > Priority: Major > Attachments: FailedStage.png > > > h3. Description > This problem can be reproduced stably by a large parallelism job migrate from > map reduce to Spark in our practice, some metrics list below: > ||Item||Value|| > |spark.executor.instances|1000| > |spark.executor.cores|5| > |task number of shuffle writer stage|18038| > |task number of shuffle reader stage|80000| > While the shuffle writer stage successful ended, the shuffle reader stage > starting and keep failing by FetchFail. Each fetch request need the netty > sever allocate a buffer in 16MB(detailed stack attached below), the huge > amount of fetch request will use up default maxDirectMemory rapidly, even > though we bump up io.netty.maxDirectMemory to 50GB! > {code:java} > org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 > byte(s) of direct memory (used: 21474836480, max: 21474836480) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:514) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:445) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) > at > org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:119) > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate > 16777216 byte(s) of direct memory (used: 21474836480, max: 21474836480) > at > io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:530) > at > io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:484) > at > io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:711) > at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:700) > at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:237) > at io.netty.buffer.PoolArena.allocate(PoolArena.java:221) > at io.netty.buffer.PoolArena.allocate(PoolArena.java:141) > at > io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:296) > at > io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177) > at > io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168) > at > io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129) > at > io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) > ... 1 more > {code} > h3. Solution > Add retry support and bump up java option io.netty.maxDirectMemory and try > lager spark.shuffle.io.retryWait can help the job passing and , I think we > need more discussion about load balance of fetch requests, but maybe the > retry support is necessary first. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org