[
https://issues.apache.org/jira/browse/SPARK-56873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tengfei Huang updated SPARK-56873:
----------------------------------
Description:
Currently, the bounded branch of `getSortedIterator()` did roughly:
{code:java}
boundedMerger = new UnsafeSorterBoundedSpillMerger(...); // step 1
readingIterator = new SpillableIterator(...); // step 2 –
volatile publish
return boundedMerger.merge(spillWriters, readingIterator); // step 3 –
snapshot taken INSIDE merge() {code}
`UnsafeSorterBoundedSpillMerger.merge()` snapshots its inputs via `new
ArrayList<>(spillWriters)` at the top — so the snapshot is taken in step 3,
{*}{{*}}after{{*}}{*} `readingIterator` is published in step 2.
Once `readingIterator` is non-null, a sibling consumer's
`acquireExecutionMemory()` that picks this sorter as the spill victim is routed
to `readingIterator.spill()`, which (a) appends a new writer to the live
`spillWriters` list, and (b) rebinds `readingIterator.upstream` to read that
same new file. If this happens between step 2 and step 3, the snapshot taken
inside `merge()` includes the new writer, {*}{{*}}and{{*}}{*} the final merge
round also pulls from `readingIterator` (whose upstream now points at the new
writer). The bounded merger reads the new spill file twice — silent duplicate
records in the sorted output.
was:
Currently, the bounded branch of `getSortedIterator()` did roughly:
```java
boundedMerger = new UnsafeSorterBoundedSpillMerger(...); // step 1
readingIterator = new SpillableIterator(...); // step 2 –
volatile publish
return boundedMerger.merge(spillWriters, readingIterator); // step 3 –
snapshot taken INSIDE merge()
```
`UnsafeSorterBoundedSpillMerger.merge()` snapshots its inputs via `new
ArrayList<>(spillWriters)` at the top — so the snapshot is taken in step 3,
*{*}after{*}* `readingIterator` is published in step 2.
Once `readingIterator` is non-null, a sibling consumer's
`acquireExecutionMemory()` that picks this sorter as the spill victim is routed
to `readingIterator.spill()`, which (a) appends a new writer to the live
`spillWriters` list, and (b) rebinds `readingIterator.upstream` to read that
same new file. If this happens between step 2 and step 3, the snapshot taken
inside `merge()` includes the new writer, *{*}and{*}* the final merge round
also pulls from `readingIterator` (whose upstream now points at the new
writer). The bounded merger reads the new spill file twice — silent duplicate
records in the sorted output.
> Fix potential race condition in UnsafeExternalSorter with multiple round
> boundedMerger
> --------------------------------------------------------------------------------------
>
> Key: SPARK-56873
> URL: https://issues.apache.org/jira/browse/SPARK-56873
> Project: Spark
> Issue Type: Task
> Components: Spark Core
> Affects Versions: 4.2.0
> Reporter: Tengfei Huang
> Priority: Major
> Labels: pull-request-available
>
> Currently, the bounded branch of `getSortedIterator()` did roughly:
> {code:java}
> boundedMerger = new UnsafeSorterBoundedSpillMerger(...); // step 1
> readingIterator = new SpillableIterator(...); // step 2 –
> volatile publish
> return boundedMerger.merge(spillWriters, readingIterator); // step 3 –
> snapshot taken INSIDE merge() {code}
> `UnsafeSorterBoundedSpillMerger.merge()` snapshots its inputs via `new
> ArrayList<>(spillWriters)` at the top — so the snapshot is taken in step 3,
> {*}{{*}}after{{*}}{*} `readingIterator` is published in step 2.
> Once `readingIterator` is non-null, a sibling consumer's
> `acquireExecutionMemory()` that picks this sorter as the spill victim is
> routed to `readingIterator.spill()`, which (a) appends a new writer to the
> live `spillWriters` list, and (b) rebinds `readingIterator.upstream` to read
> that same new file. If this happens between step 2 and step 3, the snapshot
> taken inside `merge()` includes the new writer, {*}{{*}}and{{*}}{*} the final
> merge round also pulls from `readingIterator` (whose upstream now points at
> the new writer). The bounded merger reads the new spill file twice — silent
> duplicate records in the sorted output.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]