[ 
https://issues.apache.org/jira/browse/SPARK-27562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106846#comment-17106846
 ] 

Apache Spark commented on SPARK-27562:
--------------------------------------

User 'turboFei' has created a pull request for this issue:
https://github.com/apache/spark/pull/28525

> Complete the verification mechanism for shuffle transmitted data
> ----------------------------------------------------------------
>
>                 Key: SPARK-27562
>                 URL: https://issues.apache.org/jira/browse/SPARK-27562
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle
>    Affects Versions: 3.1.0
>            Reporter: feiwang
>            Priority: Major
>
> We've seen some shuffle data corruption during shuffle read phase. 
> As described in SPARK-26089, spark only checks small  shuffle blocks before 
> PR #23453, which is proposed by ankuriitg.
> There are two changes/improvements that are made in PR #23453.
> 1. Large blocks are checked upto maxBytesInFlight/3 size in a similar way as 
> smaller blocks, so if a
> large block is corrupt in the starting, that block will be re-fetched and if 
> that also fails,
> FetchFailureException will be thrown.
> 2. If large blocks are corrupt after size maxBytesInFlight/3, then any 
> IOException thrown while
> reading the stream will be converted to FetchFailureException. This is 
> slightly more aggressive
> than was originally intended but since the consumer of the stream may have 
> already read some records and processed them, we can't just re-fetch the 
> block, we need to fail the whole task. Additionally, we also thought about 
> maybe adding a new type of TaskEndReason, which would re-try the task couple 
> of times before failing the previous stage, but given the complexity involved 
> in that solution we decided to not proceed in that direction.
> However, I think there still exists some problems with the current shuffle 
> transmitted data verification mechanism:
> - For a large block, it is checked upto  maxBytesInFlight/3 size when 
> fetching shuffle data. So if a large block is corrupt after size 
> maxBytesInFlight/3, it can not be detected in data fetch phase.  This has 
> been described in the previous section.
> - Only the compressed or wrapped blocks are checked, I think we should also 
> check thease blocks which are not wrapped.
> We complete the verification mechanism for shuffle transmitted data:
> Firstly, we choose crc32 for the checksum verification  of shuffle data.
> Crc is also used for checksum verification in hadoop, it is simple and fast.
> In shuffle write phase, after completing the partitionedFile, we compute 
> the crc32 value for each partition and then write these digests with the 
> indexs into shuffle index file.
> For the sortShuffleWriter and unsafe shuffle writer, there is only one 
> partitionedFile for a shuffleMapTask, so the compution of digests(compute the 
> digests for each partition depend on the indexs of this partitionedFile) is  
> cheap.
> For the bypassShuffleWriter, the reduce partitions is little than 
> byPassMergeThreshold, the cost of digests compution is acceptable.
> In shuffle read phase, the digest value will be passed with the block data.
> And we will recompute the digest of the data obtained to compare with the 
> origin digest value.
> When recomputing the digest of data obtained, it only need an additional 
> buffer(2048Bytes) for computing crc32 value.
> After recomputing, we will reset the obtained data inputStream, if it is 
> markSupported we only need reset it, otherwise it is a 
> fileSegmentManagerBuffer, we need recreate it.
> So, this verification mechanism  proposed for shuffle transmitted data is 
> cheap and complete.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to