[ 
https://issues.apache.org/jira/browse/SPARK-5928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14329720#comment-14329720
 ] 

Imran Rashid commented on SPARK-5928:
-------------------------------------

Actually, there *is* some weirdness in how spark handles this failure.  
[~kayousterhout], maybe you can take a look -- I think it might be related to 
this change: 
https://github.com/apache/spark/commit/18ad59e2c6b7bd009e8ba5ebf8fcf99630863029,
 specifically here: 
https://github.com/apache/spark/commit/18ad59e2c6b7bd009e8ba5ebf8fcf99630863029#diff-bad3987c83bd22d46416d3dd9d208e76R488.
  Maybe the issue existed before that change, I am not sure, but at least you 
might be a bit more knowledgeable about this code :)

So the strange thing is, every time there is a {{FetchFailedException}}, spark 
marks the stage as failed, with logs something like:

{noformat}
15/02/20 13:09:24 INFO YarnClientClusterScheduler: Removed TaskSet 1.0, whose 
tasks have all completed, from pool 
15/02/20 13:09:24 INFO DAGScheduler: Marking Stage 1 (count at <console>:15) as 
failed due to a fetch failure from Stage 0 (map at <console>:15)
15/02/20 13:09:24 INFO DAGScheduler: Stage 1 (count at <console>:15) failed in 
0.736 s
15/02/20 13:09:24 INFO DAGScheduler: Resubmitting Stage 0 (map at <console>:15) 
and Stage 1 (count at <console>:15) due to fetch failure
15/02/20 13:09:24 INFO DAGScheduler: Executor lost: 2 (epoch 1)
15/02/20 13:09:24 INFO BlockManagerMasterActor: Trying to remove executor 2 
from BlockManagerMaster.
15/02/20 13:09:24 INFO BlockManagerMasterActor: Removing block manager 
BlockManagerId(2, imran-3.ent.cloudera.com, 45980)
{noformat}

but then it shortly re-registers that same block manager:

{noformat}
15/02/20 13:09:24 INFO BlockManagerMasterActor: Registering block manager 
imran-3.ent.cloudera.com:45980 with 2.0 GB RAM, BlockManagerId(2, 
imran-3.ent.cloudera.com, 45980)
{noformat}

then it reruns the same stage, and goes through the same thing over and over 
again.  It only breaks out of this loop if instead I get the 
{{java.io.Exception}} from snappy mentioned above (no idea why that exception 
only occurs sometimes).  In that case, we instead get a a message like:

{noformat}
15/02/20 13:19:23 WARN TaskSetManager: Lost task 0.0 in stage 1.3 (TID 7, 
imran-2.ent.cloudera.com): java.io.IOException: failed to uncompress the chunk: 
PARSING_ERROR(2)
        at 
org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:361)
...
15/02/20 13:19:23 INFO TaskSetManager: Starting task 0.1 in stage 1.3 (TID 8, 
imran-3.ent.cloudera.com, PROCESS_LOCAL, 1056 bytes)
{noformat}

so it just retries the *task*, rather than failing the stage.  If we're lucky 
enough to have that exception occur 4 times in a row, before any 
{{FetchFailedException}} s, then spark aborts the job and the stage retrying 
loop is finally broken.  In my last try w/ the code given for this issue, it 
resulted in 6 retries of the stage.

This should probably go under another issue, but I don't quite understand what 
is going on well enough to formulate that issue clearly.  To me it looks like 
{{FetchFailedException}} should just result in a normal task failure, but it 
looks like that special handling was put in there for some reason.  But I can 
just file another issue if its not clear what is going on here.

thanks

> Remote Shuffle Blocks cannot be more than 2 GB
> ----------------------------------------------
>
>                 Key: SPARK-5928
>                 URL: https://issues.apache.org/jira/browse/SPARK-5928
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>            Reporter: Imran Rashid
>
> If a shuffle block is over 2GB, the shuffle fails, with an uninformative 
> exception.  The tasks get retried a few times and then eventually the job 
> fails.
> Here is an example program which can cause the exception:
> {code}
>     val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore =>
>       val n = 3e3.toInt
>       val arr = new Array[Byte](n)
>       //need to make sure the array doesn't compress to something small
>       scala.util.Random.nextBytes(arr)
>       arr
>     }
>     rdd.map { x => (1, x)}.groupByKey().count()
> {code}
> Note that you can't trigger this exception in local mode, it only happens on 
> remote fetches.   I triggered these exceptions running with 
> {{MASTER=yarn-client spark-shell --num-executors 2 --executor-memory 4000m}}
> {noformat}
> 15/02/20 11:10:23 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3, 
> imran-3.ent.cloudera.com): FetchFailed(BlockManagerId(1, 
> imran-2.ent.cloudera.com, 55028), shuffleId=1, mapId=0, reduceId=0, message=
> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 
> 2147483647: 3021252889 - discarded
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>       at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>       at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>       at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
>       at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
>       at 
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
>       at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>       at org.apache.spark.scheduler.Task.run(Task.scala:56)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: io.netty.handler.codec.TooLongFrameException: Adjusted frame 
> length exceeds 2147483647: 3021252889 - discarded
>       at 
> io.netty.handler.codec.LengthFieldBasedFrameDecoder.fail(LengthFieldBasedFrameDecoder.java:501)
>       at 
> io.netty.handler.codec.LengthFieldBasedFrameDecoder.failIfNecessary(LengthFieldBasedFrameDecoder.java:477)
>       at 
> io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:403)
>       at 
> io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343)
>       at 
> io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:249)
>       at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:149)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>       at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>       at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>       ... 1 more
> )
> {noformat}
> or if you use "spark.shuffle.blockTransferService=nio", then you get:
> {noformat}
> 15/02/20 12:48:07 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 
> imran-2.ent.cloudera.com): FetchFailed(BlockManagerId(2, 
> imran-3.ent.cloudera.com, 42827), shuffleId=0, mapId=0, reduceId=0, message=
> org.apache.spark.shuffle.FetchFailedException: sendMessageReliably failed 
> with ACK that signalled a remote error: java.lang.IllegalArgumentException: 
> Size exceeds Integer.MAX_VALUE
>       at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
>       at 
> org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at 
> org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>       at 
> org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
>       at 
> org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750)
>       at 
> org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>       at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>       at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>       at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
>       at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
>       at 
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
>       at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>       at org.apache.spark.scheduler.Task.run(Task.scala:56)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: sendMessageReliably failed with ACK that 
> signalled a remote error: java.lang.IllegalArgumentException: Size exceeds 
> Integer.MAX_VALUE
>       at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
>       at 
> org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at 
> org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>       at 
> org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
>       at 
> org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750)
>       at 
> org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
>       at 
> org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:954)
>       at 
> org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:940)
>       at 
> org.apache.spark.network.nio.ConnectionManager$MessageStatus.success(ConnectionManager.scala:67)
>       at 
> org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:728)
>       at 
> org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
>       ... 3 more
> )
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to