[ https://issues.apache.org/jira/browse/SPARK-27562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16826602#comment-16826602 ]
feiwang commented on SPARK-27562: --------------------------------- [~irashid] > Complete the verification mechanism for shuffle transmitted data > ---------------------------------------------------------------- > > Key: SPARK-27562 > URL: https://issues.apache.org/jira/browse/SPARK-27562 > Project: Spark > Issue Type: Improvement > Components: Shuffle > Affects Versions: 2.4.0 > Reporter: feiwang > Priority: Major > > We've seen some shuffle data corruption during shuffle read phase. > As described in SPARK-26089, spark only checks small shuffle blocks before > PR #23453, which is proposed by ankuriitg. > There are two changes/improvements that are made in PR #23453. > 1. Large blocks are checked upto maxBytesInFlight/3 size in a similar way as > smaller blocks, so if a > large block is corrupt in the starting, that block will be re-fetched and if > that also fails, > FetchFailureException will be thrown. > 2. If large blocks are corrupt after size maxBytesInFlight/3, then any > IOException thrown while > reading the stream will be converted to FetchFailureException. This is > slightly more aggressive > than was originally intended but since the consumer of the stream may have > already read some records and processed them, we can't just re-fetch the > block, we need to fail the whole task. Additionally, we also thought about > maybe adding a new type of TaskEndReason, which would re-try the task couple > of times before failing the previous stage, but given the complexity involved > in that solution we decided to not proceed in that direction. > However, I think there still exists some problems with the current shuffle > transmitted data verification mechanism: > - For a large block, it is checked upto maxBytesInFlight/3 size when > fetching shuffle data. So if a large block is corrupt after size > maxBytesInFlight/3, it can not be detected in data fetch phase. This has > been described in the previous section. > - Only the compressed or wrapped blocks are checked, I think we should also > check thease blocks which are not wrapped. > We complete the verification mechanism for shuffle transmitted data: > Firstly, we choose crc32 for the checksum verification of shuffle data. > Crc is also used for checksum verification in hadoop, it is simple and fast. > In shuffle write phase, after completing the partitionedFile, we compute > the crc32 value for each partition and then write these digests with the > indexs into shuffle index file. > For the sortShuffleWriter and unsafe shuffle writer, there is only one > partitionedFile for a shuffleMapTask, so the compution of digests(compute the > digests for each partition depend on the indexs of this partitionedFile) is > cheap. > For the bypassShuffleWriter, the reduce partitions is little than > byPassMergeThreshold, the cost of digests compution is acceptable. > In shuffle read phase, the digest value will be passed with the block data. > And we will recompute the digest of the data obtained to compare with the > origin digest value. > When recomputing the digest of data obtained, it only need an additional > buffer(2048Bytes) for computing crc32 value. > After recomputing, we will reset the obtained data inputStream, if it is > markSupported we only need reset it, otherwise it is a > fileSegmentManagerBuffer, we need recreate it. > So, this verification mechanism proposed for shuffle transmitted data is > cheap and complete. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org