[
https://issues.apache.org/jira/browse/SPARK-27562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
feiwang updated SPARK-27562:
----------------------------
Description:
We've seen some shuffle data corruption during shuffle read phase.
As described in SPARK-26089, spark only checks small shuffle blocks before PR
#23543, which is proposed by ankuriitg.
There are two changes/improvements that are made in PR #23543.
1. Large blocks are checked upto maxBytesInFlight/3 size in a similar way as
smaller blocks, so if a
large block is corrupt in the starting, that block will be re-fetched and if
that also fails,
FetchFailureException will be thrown.
2. If large blocks are corrupt after size maxBytesInFlight/3, then any
IOException thrown while
reading the stream will be converted to FetchFailureException. This is slightly
more aggressive
than was originally intended but since the consumer of the stream may have
already read some records and processed them, we can't just re-fetch the block,
we need to fail the whole task. Additionally, we also thought about maybe
adding a new type of TaskEndReason, which would re-try the task couple of times
before failing the previous stage, but given the complexity involved in that
solution we decided to not proceed in that direction.
However, I think there still exists some problems with the current shuffle
transmitted data verification mechanism:
- For a large block, it is checked upto maxBytesInFlight/3 size when fetching
shuffle data. So if a large block is corrupt after size maxBytesInFlight/3, it
can not be detected in data fetch phase. This has been described in the
previous section.
- Only the compressed or wrapped blocks are checked, I think we should also
check thease blocks which are not wrapped.
We complete the verification mechanism for shuffle transmitted data:
Firstly, we choose crc32 for the checksum verification of shuffle data.
Crc is also used for checksum verification in hadoop, it is simple and fast.
In shuffle write phase, after completing the partitionedFile, we compute
the crc32 value for each partition and then write these digests with the indexs
into shuffle index file.
For the sortShuffleWriter and unsafe shuffle writer, there is only one
partitionedFile for a shuffleMapTask, so the compution of digests(compute the
digests for each partition depend on the indexs of this partitionedFile) is
cheap.
For the bypassShuffleWriter, the reduce partitions is little than
byPassMergeThreshold, the cost of digests compution is acceptable.
In shuffle read phase, the digest value will be passed with the block data.
And we will recompute the digest of the data obtained to compare with the
origin digest value.
When recomputing the digest of data obtained, it only need an additional
buffer(2048Bytes) for computing crc32 value.
After recomputing, we will reset the obtained data inputStream, if it is
markSupported we only need reset it, otherwise it is a
fileSegmentManagerBuffer, we need recreate it.
So, this verification mechanism proposed for shuffle transmitted data is cheap
and complete.
was:
SPARK-27562
We've seen some shuffle data corruption during shuffle read phase.
As described in SPARK-26089, spark only checks small shuffle blocks before PR
#23543, which is proposed by ankuriitg.
There are two changes/improvements that are made in PR #23543.
1. Large blocks are checked upto maxBytesInFlight/3 size in a similar way as
smaller blocks, so if a
large block is corrupt in the starting, that block will be re-fetched and if
that also fails,
FetchFailureException will be thrown.
2. If large blocks are corrupt after size maxBytesInFlight/3, then any
IOException thrown while
reading the stream will be converted to FetchFailureException. This is slightly
more aggressive
than was originally intended but since the consumer of the stream may have
already read some records and processed them, we can't just re-fetch the block,
we need to fail the whole task. Additionally, we also thought about maybe
adding a new type of TaskEndReason, which would re-try the task couple of times
before failing the previous stage, but given the complexity involved in that
solution we decided to not proceed in that direction.
However, I think there still exists some problems with the current shuffle
transmitted data verification mechanism:
- For a large block, it is checked upto maxBytesInFlight/3 size when fetching
shuffle data. So if a large block is corrupt after size maxBytesInFlight/3, it
can not be detected in data fetch phase. This has been described in the
previous section.
- Only the compressed or wrapped blocks are checked, I think we should also
check thease blocks which are not wrapped.
We complete the verification mechanism for shuffle transmitted data:
Firstly, we choose crc32 for the checksum verification of shuffle data.
Crc is also used for checksum verification in hadoop, it is simple and fast.
In shuffle write phase, after completing the partitionedFile, we compute
the crc32 value for each partition and then write these digests with the indexs
into shuffle index file.
For the sortShuffleWriter and unsafe shuffle writer, there is only one
partitionedFile for a shuffleMapTask, so the compution of digests(compute the
digests for each partition depend on the indexs of this partitionedFile) is
cheap.
For the bypassShuffleWriter, the reduce partitions is little than
byPassMergeThreshold, the cost of digests compution is acceptable.
In shuffle read phase, the digest value will be passed with the block data.
And we will recompute the digest of the data obtained to compare with the
origin digest value.
When recomputing the digest of data obtained, it only need an additional
buffer(2048Bytes) for computing crc32 value.
After recomputing, we will reset the obtained data inputStream, if it is
markSupported we only need reset it, otherwise it is a
fileSegmentManagerBuffer, we need recreate it.
So, this verification mechanism proposed for shuffle transmitted data is cheap
and complete.
> Complete the verification mechanism for shuffle transmitted data
> ----------------------------------------------------------------
>
> Key: SPARK-27562
> URL: https://issues.apache.org/jira/browse/SPARK-27562
> Project: Spark
> Issue Type: Improvement
> Components: Shuffle
> Affects Versions: 2.4.0
> Reporter: feiwang
> Priority: Major
>
> We've seen some shuffle data corruption during shuffle read phase.
> As described in SPARK-26089, spark only checks small shuffle blocks before
> PR #23543, which is proposed by ankuriitg.
> There are two changes/improvements that are made in PR #23543.
> 1. Large blocks are checked upto maxBytesInFlight/3 size in a similar way as
> smaller blocks, so if a
> large block is corrupt in the starting, that block will be re-fetched and if
> that also fails,
> FetchFailureException will be thrown.
> 2. If large blocks are corrupt after size maxBytesInFlight/3, then any
> IOException thrown while
> reading the stream will be converted to FetchFailureException. This is
> slightly more aggressive
> than was originally intended but since the consumer of the stream may have
> already read some records and processed them, we can't just re-fetch the
> block, we need to fail the whole task. Additionally, we also thought about
> maybe adding a new type of TaskEndReason, which would re-try the task couple
> of times before failing the previous stage, but given the complexity involved
> in that solution we decided to not proceed in that direction.
> However, I think there still exists some problems with the current shuffle
> transmitted data verification mechanism:
> - For a large block, it is checked upto maxBytesInFlight/3 size when
> fetching shuffle data. So if a large block is corrupt after size
> maxBytesInFlight/3, it can not be detected in data fetch phase. This has
> been described in the previous section.
> - Only the compressed or wrapped blocks are checked, I think we should also
> check thease blocks which are not wrapped.
> We complete the verification mechanism for shuffle transmitted data:
> Firstly, we choose crc32 for the checksum verification of shuffle data.
> Crc is also used for checksum verification in hadoop, it is simple and fast.
> In shuffle write phase, after completing the partitionedFile, we compute
> the crc32 value for each partition and then write these digests with the
> indexs into shuffle index file.
> For the sortShuffleWriter and unsafe shuffle writer, there is only one
> partitionedFile for a shuffleMapTask, so the compution of digests(compute the
> digests for each partition depend on the indexs of this partitionedFile) is
> cheap.
> For the bypassShuffleWriter, the reduce partitions is little than
> byPassMergeThreshold, the cost of digests compution is acceptable.
> In shuffle read phase, the digest value will be passed with the block data.
> And we will recompute the digest of the data obtained to compare with the
> origin digest value.
> When recomputing the digest of data obtained, it only need an additional
> buffer(2048Bytes) for computing crc32 value.
> After recomputing, we will reset the obtained data inputStream, if it is
> markSupported we only need reset it, otherwise it is a
> fileSegmentManagerBuffer, we need recreate it.
> So, this verification mechanism proposed for shuffle transmitted data is
> cheap and complete.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]