[ 
https://issues.apache.org/jira/browse/SPARK-5928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14332490#comment-14332490
 ] 

Imran Rashid commented on SPARK-5928:
-------------------------------------

so I'm pretty sure the issue is not just large records, its from large shuffle 
blocks.  I tried a slight variations on my original failing program, with the 
same shuffle volume but smaller individual records, and both fail.

{code}
    val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ i =>
      val n = 3e3.toInt
      val arr = new Array[Byte](n)
      //need to make sure the array doesn't compress to something small
      scala.util.Random.nextBytes(arr)
      (2 * i, arr)
    }
    rdd.partitionBy(new org.apache.spark.HashPartitioner(2)).count()
{code}

or
 
{code}
    val rdd = sc.parallelize(1 to 2e6.toInt, 2).mapPartitionsWithIndex{ 
case(part,itr) =>
      if(part == 0) {
        itr.map{ ignore =>
          val n = 3e3.toInt
          val arr = new Array[Byte](n)
          //need to make sure the array doesn't compress to something small
          scala.util.Random.nextBytes(arr)
          arr
        }
      } else {
        Iterator()
      }
    }
    rdd.map{x => (scala.util.Random.nextInt % 1000 * 2, 
x)}.groupByKey(2).count()
{code}


However, the failure mode is slightly different, and I don't fully understand.  
It does always have a {{FetchFailedException:  Adjusted frame length exceeds 
2147483647}}.  But then during the retry, it decides to change from two tasks 
in the second stage, to just one task.  I don't understand why the number of 
tasks would change.  I understand that one of my executors might get removed, 
and so both tasks might get executed locally -- but I don't see why it would 
convert it into one task.

In any case, these programs should have smaller records, but still have a 2gb 
remote fetch shuffle block.  Note that I'm trying to play some games w/ getting 
data from one partition to another to make a remote fetch -- but I can't 
actually guarantee that.  Sometimes the programs succeed when they are local 
fetches.  So they don't exhibit the issue everytime, but they do fail 
regularly.  Sometimes they go on to succeed (I guess after one of the executors 
is removed completely, and so there are no remote fetches anymore), and 
sometimes they fail repeatedly.

> Remote Shuffle Blocks cannot be more than 2 GB
> ----------------------------------------------
>
>                 Key: SPARK-5928
>                 URL: https://issues.apache.org/jira/browse/SPARK-5928
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>            Reporter: Imran Rashid
>
> If a shuffle block is over 2GB, the shuffle fails, with an uninformative 
> exception.  The tasks get retried a few times and then eventually the job 
> fails.
> Here is an example program which can cause the exception:
> {code}
>     val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore =>
>       val n = 3e3.toInt
>       val arr = new Array[Byte](n)
>       //need to make sure the array doesn't compress to something small
>       scala.util.Random.nextBytes(arr)
>       arr
>     }
>     rdd.map { x => (1, x)}.groupByKey().count()
> {code}
> Note that you can't trigger this exception in local mode, it only happens on 
> remote fetches.   I triggered these exceptions running with 
> {{MASTER=yarn-client spark-shell --num-executors 2 --executor-memory 4000m}}
> {noformat}
> 15/02/20 11:10:23 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3, 
> imran-3.ent.cloudera.com): FetchFailed(BlockManagerId(1, 
> imran-2.ent.cloudera.com, 55028), shuffleId=1, mapId=0, reduceId=0, message=
> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 
> 2147483647: 3021252889 - discarded
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>       at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>       at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>       at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
>       at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
>       at 
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
>       at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>       at org.apache.spark.scheduler.Task.run(Task.scala:56)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: io.netty.handler.codec.TooLongFrameException: Adjusted frame 
> length exceeds 2147483647: 3021252889 - discarded
>       at 
> io.netty.handler.codec.LengthFieldBasedFrameDecoder.fail(LengthFieldBasedFrameDecoder.java:501)
>       at 
> io.netty.handler.codec.LengthFieldBasedFrameDecoder.failIfNecessary(LengthFieldBasedFrameDecoder.java:477)
>       at 
> io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:403)
>       at 
> io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343)
>       at 
> io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:249)
>       at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:149)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>       at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>       at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>       ... 1 more
> )
> {noformat}
> or if you use "spark.shuffle.blockTransferService=nio", then you get:
> {noformat}
> 15/02/20 12:48:07 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 
> imran-2.ent.cloudera.com): FetchFailed(BlockManagerId(2, 
> imran-3.ent.cloudera.com, 42827), shuffleId=0, mapId=0, reduceId=0, message=
> org.apache.spark.shuffle.FetchFailedException: sendMessageReliably failed 
> with ACK that signalled a remote error: java.lang.IllegalArgumentException: 
> Size exceeds Integer.MAX_VALUE
>       at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
>       at 
> org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at 
> org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>       at 
> org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
>       at 
> org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750)
>       at 
> org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>       at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>       at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>       at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
>       at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
>       at 
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
>       at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>       at org.apache.spark.scheduler.Task.run(Task.scala:56)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: sendMessageReliably failed with ACK that 
> signalled a remote error: java.lang.IllegalArgumentException: Size exceeds 
> Integer.MAX_VALUE
>       at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
>       at 
> org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at 
> org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>       at 
> org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
>       at 
> org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750)
>       at 
> org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
>       at 
> org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:954)
>       at 
> org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:940)
>       at 
> org.apache.spark.network.nio.ConnectionManager$MessageStatus.success(ConnectionManager.scala:67)
>       at 
> org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:728)
>       at 
> org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
>       ... 3 more
> )
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to