[ 
https://issues.apache.org/jira/browse/SPARK-27562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

feiwang updated SPARK-27562:
----------------------------
    Description: 
We've seen some shuffle data corruption during shuffle read phase. 

As described in SPARK-26089, spark only checks small  shuffle blocks before PR 
#23453, which is proposed by ankuriitg.

There are two changes/improvements that are made in PR #23453.

1. Large blocks are checked upto maxBytesInFlight/3 size in a similar way as 
smaller blocks, so if a
large block is corrupt in the starting, that block will be re-fetched and if 
that also fails,
FetchFailureException will be thrown.
2. If large blocks are corrupt after size maxBytesInFlight/3, then any 
IOException thrown while
reading the stream will be converted to FetchFailureException. This is slightly 
more aggressive
than was originally intended but since the consumer of the stream may have 
already read some records and processed them, we can't just re-fetch the block, 
we need to fail the whole task. Additionally, we also thought about maybe 
adding a new type of TaskEndReason, which would re-try the task couple of times 
before failing the previous stage, but given the complexity involved in that 
solution we decided to not proceed in that direction.

However, I think there still exists some problems with the current shuffle 
transmitted data verification mechanism:

- For a large block, it is checked upto  maxBytesInFlight/3 size when fetching 
shuffle data. So if a large block is corrupt after size maxBytesInFlight/3, it 
can not be detected in data fetch phase.  This has been described in the 
previous section.
- Only the compressed or wrapped blocks are checked, I think we should also 
check thease blocks which are not wrapped.

We complete the verification mechanism for shuffle transmitted data:

Firstly, we choose crc32 for the checksum verification  of shuffle data.

Crc is also used for checksum verification in hadoop, it is simple and fast.

In shuffle write phase, after completing the partitionedFile, we compute 

the crc32 value for each partition and then write these digests with the indexs 
into shuffle index file.

For the sortShuffleWriter and unsafe shuffle writer, there is only one 
partitionedFile for a shuffleMapTask, so the compution of digests(compute the 
digests for each partition depend on the indexs of this partitionedFile) is  
cheap.

For the bypassShuffleWriter, the reduce partitions is little than 
byPassMergeThreshold, the cost of digests compution is acceptable.

In shuffle read phase, the digest value will be passed with the block data.

And we will recompute the digest of the data obtained to compare with the 
origin digest value.
When recomputing the digest of data obtained, it only need an additional 
buffer(2048Bytes) for computing crc32 value.
After recomputing, we will reset the obtained data inputStream, if it is 
markSupported we only need reset it, otherwise it is a 
fileSegmentManagerBuffer, we need recreate it.

So, this verification mechanism  proposed for shuffle transmitted data is cheap 
and complete.


  was:
We've seen some shuffle data corruption during shuffle read phase. 

As described in SPARK-26089, spark only checks small  shuffle blocks before PR 
#23543, which is proposed by ankuriitg.

There are two changes/improvements that are made in PR #23543.

1. Large blocks are checked upto maxBytesInFlight/3 size in a similar way as 
smaller blocks, so if a
large block is corrupt in the starting, that block will be re-fetched and if 
that also fails,
FetchFailureException will be thrown.
2. If large blocks are corrupt after size maxBytesInFlight/3, then any 
IOException thrown while
reading the stream will be converted to FetchFailureException. This is slightly 
more aggressive
than was originally intended but since the consumer of the stream may have 
already read some records and processed them, we can't just re-fetch the block, 
we need to fail the whole task. Additionally, we also thought about maybe 
adding a new type of TaskEndReason, which would re-try the task couple of times 
before failing the previous stage, but given the complexity involved in that 
solution we decided to not proceed in that direction.

However, I think there still exists some problems with the current shuffle 
transmitted data verification mechanism:

- For a large block, it is checked upto  maxBytesInFlight/3 size when fetching 
shuffle data. So if a large block is corrupt after size maxBytesInFlight/3, it 
can not be detected in data fetch phase.  This has been described in the 
previous section.
- Only the compressed or wrapped blocks are checked, I think we should also 
check thease blocks which are not wrapped.

We complete the verification mechanism for shuffle transmitted data:

Firstly, we choose crc32 for the checksum verification  of shuffle data.

Crc is also used for checksum verification in hadoop, it is simple and fast.

In shuffle write phase, after completing the partitionedFile, we compute 

the crc32 value for each partition and then write these digests with the indexs 
into shuffle index file.

For the sortShuffleWriter and unsafe shuffle writer, there is only one 
partitionedFile for a shuffleMapTask, so the compution of digests(compute the 
digests for each partition depend on the indexs of this partitionedFile) is  
cheap.

For the bypassShuffleWriter, the reduce partitions is little than 
byPassMergeThreshold, the cost of digests compution is acceptable.

In shuffle read phase, the digest value will be passed with the block data.

And we will recompute the digest of the data obtained to compare with the 
origin digest value.
When recomputing the digest of data obtained, it only need an additional 
buffer(2048Bytes) for computing crc32 value.
After recomputing, we will reset the obtained data inputStream, if it is 
markSupported we only need reset it, otherwise it is a 
fileSegmentManagerBuffer, we need recreate it.

So, this verification mechanism  proposed for shuffle transmitted data is cheap 
and complete.



> Complete the verification mechanism for shuffle transmitted data
> ----------------------------------------------------------------
>
>                 Key: SPARK-27562
>                 URL: https://issues.apache.org/jira/browse/SPARK-27562
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle
>    Affects Versions: 2.4.0
>            Reporter: feiwang
>            Priority: Major
>
> We've seen some shuffle data corruption during shuffle read phase. 
> As described in SPARK-26089, spark only checks small  shuffle blocks before 
> PR #23453, which is proposed by ankuriitg.
> There are two changes/improvements that are made in PR #23453.
> 1. Large blocks are checked upto maxBytesInFlight/3 size in a similar way as 
> smaller blocks, so if a
> large block is corrupt in the starting, that block will be re-fetched and if 
> that also fails,
> FetchFailureException will be thrown.
> 2. If large blocks are corrupt after size maxBytesInFlight/3, then any 
> IOException thrown while
> reading the stream will be converted to FetchFailureException. This is 
> slightly more aggressive
> than was originally intended but since the consumer of the stream may have 
> already read some records and processed them, we can't just re-fetch the 
> block, we need to fail the whole task. Additionally, we also thought about 
> maybe adding a new type of TaskEndReason, which would re-try the task couple 
> of times before failing the previous stage, but given the complexity involved 
> in that solution we decided to not proceed in that direction.
> However, I think there still exists some problems with the current shuffle 
> transmitted data verification mechanism:
> - For a large block, it is checked upto  maxBytesInFlight/3 size when 
> fetching shuffle data. So if a large block is corrupt after size 
> maxBytesInFlight/3, it can not be detected in data fetch phase.  This has 
> been described in the previous section.
> - Only the compressed or wrapped blocks are checked, I think we should also 
> check thease blocks which are not wrapped.
> We complete the verification mechanism for shuffle transmitted data:
> Firstly, we choose crc32 for the checksum verification  of shuffle data.
> Crc is also used for checksum verification in hadoop, it is simple and fast.
> In shuffle write phase, after completing the partitionedFile, we compute 
> the crc32 value for each partition and then write these digests with the 
> indexs into shuffle index file.
> For the sortShuffleWriter and unsafe shuffle writer, there is only one 
> partitionedFile for a shuffleMapTask, so the compution of digests(compute the 
> digests for each partition depend on the indexs of this partitionedFile) is  
> cheap.
> For the bypassShuffleWriter, the reduce partitions is little than 
> byPassMergeThreshold, the cost of digests compution is acceptable.
> In shuffle read phase, the digest value will be passed with the block data.
> And we will recompute the digest of the data obtained to compare with the 
> origin digest value.
> When recomputing the digest of data obtained, it only need an additional 
> buffer(2048Bytes) for computing crc32 value.
> After recomputing, we will reset the obtained data inputStream, if it is 
> markSupported we only need reset it, otherwise it is a 
> fileSegmentManagerBuffer, we need recreate it.
> So, this verification mechanism  proposed for shuffle transmitted data is 
> cheap and complete.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to