[
https://issues.apache.org/jira/browse/SPARK-5928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14332490#comment-14332490
]
Imran Rashid commented on SPARK-5928:
-------------------------------------
so I'm pretty sure the issue is not just large records, its from large shuffle
blocks. I tried a slight variations on my original failing program, with the
same shuffle volume but smaller individual records, and both fail.
{code}
val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ i =>
val n = 3e3.toInt
val arr = new Array[Byte](n)
//need to make sure the array doesn't compress to something small
scala.util.Random.nextBytes(arr)
(2 * i, arr)
}
rdd.partitionBy(new org.apache.spark.HashPartitioner(2)).count()
{code}
or
{code}
val rdd = sc.parallelize(1 to 2e6.toInt, 2).mapPartitionsWithIndex{
case(part,itr) =>
if(part == 0) {
itr.map{ ignore =>
val n = 3e3.toInt
val arr = new Array[Byte](n)
//need to make sure the array doesn't compress to something small
scala.util.Random.nextBytes(arr)
arr
}
} else {
Iterator()
}
}
rdd.map{x => (scala.util.Random.nextInt % 1000 * 2,
x)}.groupByKey(2).count()
{code}
However, the failure mode is slightly different, and I don't fully understand.
It does always have a {{FetchFailedException: Adjusted frame length exceeds
2147483647}}. But then during the retry, it decides to change from two tasks
in the second stage, to just one task. I don't understand why the number of
tasks would change. I understand that one of my executors might get removed,
and so both tasks might get executed locally -- but I don't see why it would
convert it into one task.
In any case, these programs should have smaller records, but still have a 2gb
remote fetch shuffle block. Note that I'm trying to play some games w/ getting
data from one partition to another to make a remote fetch -- but I can't
actually guarantee that. Sometimes the programs succeed when they are local
fetches. So they don't exhibit the issue everytime, but they do fail
regularly. Sometimes they go on to succeed (I guess after one of the executors
is removed completely, and so there are no remote fetches anymore), and
sometimes they fail repeatedly.
> Remote Shuffle Blocks cannot be more than 2 GB
> ----------------------------------------------
>
> Key: SPARK-5928
> URL: https://issues.apache.org/jira/browse/SPARK-5928
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Reporter: Imran Rashid
>
> If a shuffle block is over 2GB, the shuffle fails, with an uninformative
> exception. The tasks get retried a few times and then eventually the job
> fails.
> Here is an example program which can cause the exception:
> {code}
> val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore =>
> val n = 3e3.toInt
> val arr = new Array[Byte](n)
> //need to make sure the array doesn't compress to something small
> scala.util.Random.nextBytes(arr)
> arr
> }
> rdd.map { x => (1, x)}.groupByKey().count()
> {code}
> Note that you can't trigger this exception in local mode, it only happens on
> remote fetches. I triggered these exceptions running with
> {{MASTER=yarn-client spark-shell --num-executors 2 --executor-memory 4000m}}
> {noformat}
> 15/02/20 11:10:23 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3,
> imran-3.ent.cloudera.com): FetchFailed(BlockManagerId(1,
> imran-2.ent.cloudera.com, 55028), shuffleId=1, mapId=0, reduceId=0, message=
> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds
> 2147483647: 3021252889 - discarded
> at
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
> at
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
> at
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
> at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
> at
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: io.netty.handler.codec.TooLongFrameException: Adjusted frame
> length exceeds 2147483647: 3021252889 - discarded
> at
> io.netty.handler.codec.LengthFieldBasedFrameDecoder.fail(LengthFieldBasedFrameDecoder.java:501)
> at
> io.netty.handler.codec.LengthFieldBasedFrameDecoder.failIfNecessary(LengthFieldBasedFrameDecoder.java:477)
> at
> io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:403)
> at
> io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343)
> at
> io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:249)
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:149)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
> ... 1 more
> )
> {noformat}
> or if you use "spark.shuffle.blockTransferService=nio", then you get:
> {noformat}
> 15/02/20 12:48:07 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1,
> imran-2.ent.cloudera.com): FetchFailed(BlockManagerId(2,
> imran-3.ent.cloudera.com, 42827), shuffleId=0, mapId=0, reduceId=0, message=
> org.apache.spark.shuffle.FetchFailedException: sendMessageReliably failed
> with ACK that signalled a remote error: java.lang.IllegalArgumentException:
> Size exceeds Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
> at
> org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76)
> at
> org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215)
> at
> org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191)
> at
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
> at
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at
> org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at
> org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28)
> at
> org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165)
> at
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
> at
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
> at
> org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750)
> at
> org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> at
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
> at
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
> at
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
> at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
> at
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: sendMessageReliably failed with ACK that
> signalled a remote error: java.lang.IllegalArgumentException: Size exceeds
> Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
> at
> org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76)
> at
> org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215)
> at
> org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191)
> at
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
> at
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at
> org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at
> org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28)
> at
> org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165)
> at
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
> at
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
> at
> org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750)
> at
> org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> at
> org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:954)
> at
> org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:940)
> at
> org.apache.spark.network.nio.ConnectionManager$MessageStatus.success(ConnectionManager.scala:67)
> at
> org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:728)
> at
> org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
> ... 3 more
> )
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]