aidar-stripe opened a new pull request, #3100:
URL: https://github.com/apache/celeborn/pull/3100
### What changes were proposed in this pull request?
Replacing HashSet of PartitionLocations with concurrent version of it.
### Why are the changes needed?
We are seeing some race conditions between `handleGetReducerFileGroup`&
`tryFinalCommit`, where reducers complete without processing partition, even
though there's data.
### Problematic logs
On the driver side:
```
25/01/31 14:23:02 {} INFO
org.apache.celeborn.client.commit.ReducePartitionCommitHandler: Shuffle 23
commit files complete. File count 23200 using 240180 ms
...
25/01/31 14:23:02 {} INFO
org.apache.celeborn.client.commit.ReducePartitionCommitHandler: Shuffle 23
partition 11931-0: primary lost, use replica PartitionLocation[
id-epoch:11931-0
host-rpcPort-pushPort-fetchPort-replicatePort:10.68.138.242-39557-35555-37139-39685
mode:REPLICA
peer:(empty)
storage hint:StorageInfo{type=SSD, mountPoint='', finalResult=true,
filePath=}
mapIdBitMap:null].
...
25/01/31 14:23:02 {} INFO
org.apache.celeborn.client.commit.ReducePartitionCommitHandler: Succeed to
handle stageEnd for 23.
```
On the executor side:
```
25/01/31 14:23:02 {executorId=92, jobId=28, partitionId=420, stageId=74,
taskAttemptId=82047} INFO org.apache.celeborn.client.ShuffleClientImpl: Shuffle
23 request reducer file group success using 59315 ms, result partition size
12000
...
25/01/31 14:40:54 {executorId=92, partitionId=11931, taskAttemptId=93846}
INFO org.apache.spark.executor.Executor: Running task 11931.0 in stage 74.0
(TID 93846)
25/01/31 14:40:54 {jobId=28, executorId=92, taskAttemptId=93846,
partitionId=11931, stageId=74} INFO
org.apache.spark.shuffle.celeborn.SparkShuffleManager: Shuffle 24 write mode is
changed to SORT because partition count 12000 is greater than threshold 2000
25/01/31 14:40:54 {executorId=92, jobId=28, partitionId=11931, stageId=74,
taskAttemptId=93846} INFO
org.apache.spark.shuffle.celeborn.CelebornShuffleReader: BatchOpenStream for 0
cost 0ms
25/01/31 14:40:54 {} WARN org.apache.celeborn.client.ShuffleClientImpl:
Shuffle data is empty for shuffle 23 partition 11931.
```
### How was this patch tested?
No additional tests for this: I've tried to reproduce it, but we've only
seen this happen with high number of nodes and during long execution time range.
### More explanation on why/how this happens
```
// write path
override def setStageEnd(shuffleId: Int): Unit = {
getReducerFileGroupRequest synchronized {
stageEndShuffleSet.add(shuffleId)
}
....
// read path
override def handleGetReducerFileGroup(context: RpcCallContext, shuffleId:
Int): Unit = {
// Quick return for ended stage, avoid occupy sync lock.
if (isStageEnd(shuffleId)) {
replyGetReducerFileGroup(context, shuffleId)
} else {
getReducerFileGroupRequest.synchronized {
...
override def isStageEnd(shuffleId: Int): Boolean = {
stageEndShuffleSet.contains(shuffleId)
}
```
Since concurrency guarantees between read/write path are based on
ConcurrentHashMap's volatile values there's no guarantee that content of a
HashSet would be seen fully by the reader thread.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]