[ 
https://issues.apache.org/jira/browse/SPARK-56873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tengfei Huang updated SPARK-56873:
----------------------------------
    Description: 
Currently, the bounded branch of `getSortedIterator()` did roughly:
{code:java}
boundedMerger = new UnsafeSorterBoundedSpillMerger(...);          // step 1
readingIterator = new SpillableIterator(...);                     // step 2 – 
volatile publish
return boundedMerger.merge(spillWriters, readingIterator);        // step 3 – 
snapshot taken INSIDE merge() {code}
`UnsafeSorterBoundedSpillMerger.merge()` snapshots its inputs via `new 
ArrayList<>(spillWriters)` at the top — so the snapshot is taken in step 3, 
{*}{{*}}after{{*}}{*} `readingIterator` is published in step 2.

Once `readingIterator` is non-null, a sibling consumer's 
`acquireExecutionMemory()` that picks this sorter as the spill victim is routed 
to `readingIterator.spill()`, which (a) appends a new writer to the live 
`spillWriters` list, and (b) rebinds `readingIterator.upstream` to read that 
same new file. If this happens between step 2 and step 3, the snapshot taken 
inside `merge()` includes the new writer, {*}{{*}}and{{*}}{*} the final merge 
round also pulls from `readingIterator` (whose upstream now points at the new 
writer). The bounded merger reads the new spill file twice — silent duplicate 
records in the sorted output.

  was:
Currently, the bounded branch of `getSortedIterator()` did roughly:


```java
boundedMerger = new UnsafeSorterBoundedSpillMerger(...);          // step 1
readingIterator = new SpillableIterator(...);                     // step 2 – 
volatile publish
return boundedMerger.merge(spillWriters, readingIterator);        // step 3 – 
snapshot taken INSIDE merge()
```

`UnsafeSorterBoundedSpillMerger.merge()` snapshots its inputs via `new 
ArrayList<>(spillWriters)` at the top — so the snapshot is taken in step 3, 
*{*}after{*}* `readingIterator` is published in step 2.

Once `readingIterator` is non-null, a sibling consumer's 
`acquireExecutionMemory()` that picks this sorter as the spill victim is routed 
to `readingIterator.spill()`, which (a) appends a new writer to the live 
`spillWriters` list, and (b) rebinds `readingIterator.upstream` to read that 
same new file. If this happens between step 2 and step 3, the snapshot taken 
inside `merge()` includes the new writer, *{*}and{*}* the final merge round 
also pulls from `readingIterator` (whose upstream now points at the new 
writer). The bounded merger reads the new spill file twice — silent duplicate 
records in the sorted output.


> Fix potential race condition in UnsafeExternalSorter with multiple round 
> boundedMerger
> --------------------------------------------------------------------------------------
>
>                 Key: SPARK-56873
>                 URL: https://issues.apache.org/jira/browse/SPARK-56873
>             Project: Spark
>          Issue Type: Task
>          Components: Spark Core
>    Affects Versions: 4.2.0
>            Reporter: Tengfei Huang
>            Priority: Major
>              Labels: pull-request-available
>
> Currently, the bounded branch of `getSortedIterator()` did roughly:
> {code:java}
> boundedMerger = new UnsafeSorterBoundedSpillMerger(...);          // step 1
> readingIterator = new SpillableIterator(...);                     // step 2 – 
> volatile publish
> return boundedMerger.merge(spillWriters, readingIterator);        // step 3 – 
> snapshot taken INSIDE merge() {code}
> `UnsafeSorterBoundedSpillMerger.merge()` snapshots its inputs via `new 
> ArrayList<>(spillWriters)` at the top — so the snapshot is taken in step 3, 
> {*}{{*}}after{{*}}{*} `readingIterator` is published in step 2.
> Once `readingIterator` is non-null, a sibling consumer's 
> `acquireExecutionMemory()` that picks this sorter as the spill victim is 
> routed to `readingIterator.spill()`, which (a) appends a new writer to the 
> live `spillWriters` list, and (b) rebinds `readingIterator.upstream` to read 
> that same new file. If this happens between step 2 and step 3, the snapshot 
> taken inside `merge()` includes the new writer, {*}{{*}}and{{*}}{*} the final 
> merge round also pulls from `readingIterator` (whose upstream now points at 
> the new writer). The bounded merger reads the new spill file twice — silent 
> duplicate records in the sorted output.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to