[ 
https://issues.apache.org/jira/browse/SPARK-5928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14329584#comment-14329584
 ] 

Imran Rashid commented on SPARK-5928:
-------------------------------------

Here are some thoughts on we *might* fix this.  I am definitely not saying an 
eventual solution has to do it this way, just figured that I should write down 
my thoughts in case its useful to anyone who takes a look at this in the 
future.  There may be alternate solutions that are worth considering as well.

Right now, we always try to send at least one full shuffle block in each 
message.  We could change the implementation to break one shuffle block into 
mutliple messages.  The key locations for that are 
https://github.com/apache/spark/blob/master/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java#L93
 on the receiving end, which assumes each "chunk" corresponds to exactly one 
block.  And on the sending side, that happens here: 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala#L55.
  The protocol there could change to allow for one block to get sent in 
multiple chunks.

Ok I know that isn't a lot of info, but maybe at least the pointers to the code 
are helpful to somebody :)


> Remote Shuffle Blocks cannot be more than 2 GB
> ----------------------------------------------
>
>                 Key: SPARK-5928
>                 URL: https://issues.apache.org/jira/browse/SPARK-5928
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>            Reporter: Imran Rashid
>
> If a shuffle block is over 2GB, the shuffle fails, with an uninformative 
> exception.  The tasks get retried a few times and then eventually the job 
> fails.
> Here is an example program which can cause the exception:
> {code}
>     val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore =>
>       val n = 3e3.toInt
>       val arr = new Array[Byte](n)
>       //need to make sure the array doesn't compress to something small
>       scala.util.Random.nextBytes(arr)
>       arr
>     }
>     rdd.map { x => (1, x)}.groupByKey().count()
> {code}
> Note that you can't trigger this exception in local mode, it only happens on 
> remote fetches.   I triggered these exceptions running with 
> {{MASTER=yarn-client spark-shell --num-executors 2 --executor-memory 4000m}}
> {noformat}
> 15/02/20 11:10:23 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3, 
> imran-3.ent.cloudera.com): FetchFailed(BlockManagerId(1, 
> imran-2.ent.cloudera.com, 55028), shuffleId=1, mapId=0, reduceId=0, message=
> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 
> 2147483647: 3021252889 - discarded
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>       at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>       at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>       at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
>       at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
>       at 
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
>       at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>       at org.apache.spark.scheduler.Task.run(Task.scala:56)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: io.netty.handler.codec.TooLongFrameException: Adjusted frame 
> length exceeds 2147483647: 3021252889 - discarded
>       at 
> io.netty.handler.codec.LengthFieldBasedFrameDecoder.fail(LengthFieldBasedFrameDecoder.java:501)
>       at 
> io.netty.handler.codec.LengthFieldBasedFrameDecoder.failIfNecessary(LengthFieldBasedFrameDecoder.java:477)
>       at 
> io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:403)
>       at 
> io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343)
>       at 
> io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:249)
>       at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:149)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>       at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>       at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>       ... 1 more
> )
> {noformat}
> or if you use "spark.shuffle.blockTransferService=nio", then you get:
> {noformat}
> 15/02/20 12:48:07 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 
> imran-2.ent.cloudera.com): FetchFailed(BlockManagerId(2, 
> imran-3.ent.cloudera.com, 42827), shuffleId=0, mapId=0, reduceId=0, message=
> org.apache.spark.shuffle.FetchFailedException: sendMessageReliably failed 
> with ACK that signalled a remote error: java.lang.IllegalArgumentException: 
> Size exceeds Integer.MAX_VALUE
>       at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
>       at 
> org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at 
> org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>       at 
> org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
>       at 
> org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750)
>       at 
> org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>       at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>       at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>       at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
>       at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
>       at 
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
>       at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>       at org.apache.spark.scheduler.Task.run(Task.scala:56)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: sendMessageReliably failed with ACK that 
> signalled a remote error: java.lang.IllegalArgumentException: Size exceeds 
> Integer.MAX_VALUE
>       at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
>       at 
> org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at 
> org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>       at 
> org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
>       at 
> org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750)
>       at 
> org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
>       at 
> org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:954)
>       at 
> org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:940)
>       at 
> org.apache.spark.network.nio.ConnectionManager$MessageStatus.success(ConnectionManager.scala:67)
>       at 
> org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:728)
>       at 
> org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
>       ... 3 more
> )
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to