[
https://issues.apache.org/jira/browse/SPARK-27876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
feiwang updated SPARK-27876:
----------------------------
Affects Version/s: (was: 3.1.0)
(was: 2.4.3)
2.3.2
> Split large shuffle partition to multi-segments to enable transfer oversize
> shuffle partition block.
> ----------------------------------------------------------------------------------------------------
>
> Key: SPARK-27876
> URL: https://issues.apache.org/jira/browse/SPARK-27876
> Project: Spark
> Issue Type: Improvement
> Components: Shuffle, Spark Core
> Affects Versions: 2.3.2
> Reporter: feiwang
> Priority: Major
>
> There is a limit for shuffle read.
> If a shuffle partition block's size is large than Integer.MaxValue(2GB) and
> this block is fetched from remote, an Exception will be thrown.
> {code:java}
> 2019-05-24 06:46:30,333 [9935] - WARN
> [shuffle-client-6-2:TransportChannelHandler@78] - Exception in connection
> from hadoop3747.jd.163.org/10.196.76.172:7337
> java.lang.IllegalArgumentException: Too large frame: 2991947178
> at
> org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
> at
> org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133)
> at
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> {code}
> Then this task would throw a fetchFailedException.
> This task will retry and it would execute successfully only when this task
> was reScheduled to a executor whose host is same to this oversize shuffle
> partition block.
> However, if there are more than one oversize(>2GB) shuffle partitions block,
> this task would never execute successfully and it may cause the failure of
> application.
> In this PR, I propose a new method to fetch shuffle block, it would fetch
> multi times when the relative shuffle partition block is oversize.
> The simple brief introduction:
> 1. Set a shuffle fetch threshold(SHUFFLE_FETCH_THRESHOLD) to Int.MaxValue(2GB)
> 2. Set a parameter spark.shuffle.fetch.split to control whether enable fetch
> large partition multi times
> 3. When creating mapStatus, caucluate the segemens of shuffle block
> (Math.ceil(size /SHUFFLE_FETCH_THRESHOLD )), and only record the segment
> number which is large than 1.
> 4. Define a new BlockId type, ShuffleBlockSegmentId, used to identifiy the
> fetch method.
> 5. When spark.shuffle.fetch.split is enabled, send ShuffleBlockSegmentId
> message to shuffleService instead of ShuffleBlockId message.
> 6. For a ShuffleBlockId, use a sequence of ManagedBuffers to present its
> block instead of a ManagedBuffer.
> 7. In ShuffleBlockFetcherIterator, create a PriorityBlockQueue for a
> ShuffleBlockId to store the fetched SegmentManagedBuffer, when all segments
> of a ShuffleBlockId are fetched, take relative sequence of
> managedBuffers(which are ordered by segmentId) as a successResult for a
> ShuffleBlockID.
> 8. In the shuffle serivice side, if the blockId of openBlocks is a
> ShuffleBlockSegmentId, response a segment managedBuffer of block , if the
> blockId is a ShuffleBlockId response a whole managedBuffer of block as before.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]