[
https://issues.apache.org/jira/browse/SPARK-56873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tengfei Huang updated SPARK-56873:
----------------------------------
Description:
Currently, the bounded branch of `getSortedIterator()` did roughly:
```java
boundedMerger = new UnsafeSorterBoundedSpillMerger(...); // step 1
readingIterator = new SpillableIterator(...); // step 2 --
volatile publish
return boundedMerger.merge(spillWriters, readingIterator); // step 3 --
snapshot taken INSIDE merge()
```
`UnsafeSorterBoundedSpillMerger.merge()` snapshots its inputs via `new
ArrayList<>(spillWriters)` at the top — so the snapshot is taken in step 3,
**after** `readingIterator` is published in step 2.
Once `readingIterator` is non-null, a sibling consumer's
`acquireExecutionMemory()` that picks this sorter as the spill victim is routed
to `readingIterator.spill()`, which (a) appends a new writer to the live
`spillWriters` list, and (b) rebinds `readingIterator.upstream` to read that
same new file. If this happens between step 2 and step 3, the snapshot taken
inside `merge()` includes the new writer, **and** the final merge round also
pulls from `readingIterator` (whose upstream now points at the new writer). The
bounded merger reads the new spill file twice — silent duplicate records in the
sorted output.
> Fix potential race condition in UnsafeExternalSorter with multiple round
> boundedMerger
> --------------------------------------------------------------------------------------
>
> Key: SPARK-56873
> URL: https://issues.apache.org/jira/browse/SPARK-56873
> Project: Spark
> Issue Type: Task
> Components: Spark Core
> Affects Versions: 4.2.0
> Reporter: Tengfei Huang
> Priority: Major
> Labels: pull-request-available
>
> Currently, the bounded branch of `getSortedIterator()` did roughly:
> ```java
> boundedMerger = new UnsafeSorterBoundedSpillMerger(...); // step 1
> readingIterator = new SpillableIterator(...); // step 2
> -- volatile publish
> return boundedMerger.merge(spillWriters, readingIterator); // step 3
> -- snapshot taken INSIDE merge()
> ```
> `UnsafeSorterBoundedSpillMerger.merge()` snapshots its inputs via `new
> ArrayList<>(spillWriters)` at the top — so the snapshot is taken in step 3,
> **after** `readingIterator` is published in step 2.
> Once `readingIterator` is non-null, a sibling consumer's
> `acquireExecutionMemory()` that picks this sorter as the spill victim is
> routed to `readingIterator.spill()`, which (a) appends a new writer to the
> live `spillWriters` list, and (b) rebinds `readingIterator.upstream` to read
> that same new file. If this happens between step 2 and step 3, the snapshot
> taken inside `merge()` includes the new writer, **and** the final merge round
> also pulls from `readingIterator` (whose upstream now points at the new
> writer). The bounded merger reads the new spill file twice — silent duplicate
> records in the sorted output.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]