aidar-stripe opened a new pull request, #3100:
URL: https://github.com/apache/celeborn/pull/3100

   ### What changes were proposed in this pull request?
   Replacing HashSet of PartitionLocations with concurrent version of it.
   
   ### Why are the changes needed?
   We are seeing some race conditions between `handleGetReducerFileGroup`& 
`tryFinalCommit`, where reducers complete without processing partition, even 
though there's data.
   
   ### Problematic logs
   On the driver side:
   ```
   25/01/31 14:23:02 {} INFO 
org.apache.celeborn.client.commit.ReducePartitionCommitHandler: Shuffle 23 
commit files complete. File count 23200 using 240180 ms
   ...
   25/01/31 14:23:02 {} INFO 
org.apache.celeborn.client.commit.ReducePartitionCommitHandler: Shuffle 23 
partition 11931-0: primary lost, use replica PartitionLocation[
     id-epoch:11931-0
     
host-rpcPort-pushPort-fetchPort-replicatePort:10.68.138.242-39557-35555-37139-39685
     mode:REPLICA
     peer:(empty)
     storage hint:StorageInfo{type=SSD, mountPoint='', finalResult=true, 
filePath=}
     mapIdBitMap:null].
   ...
   25/01/31 14:23:02 {} INFO 
org.apache.celeborn.client.commit.ReducePartitionCommitHandler: Succeed to 
handle stageEnd for 23.
   ```
   
   
   On the executor side:
   ```
   25/01/31 14:23:02 {executorId=92, jobId=28, partitionId=420, stageId=74, 
taskAttemptId=82047} INFO org.apache.celeborn.client.ShuffleClientImpl: Shuffle 
23 request reducer file group success using 59315 ms, result partition size 
12000
   ...
   25/01/31 14:40:54 {executorId=92, partitionId=11931, taskAttemptId=93846} 
INFO org.apache.spark.executor.Executor: Running task 11931.0 in stage 74.0 
(TID 93846)
   25/01/31 14:40:54 {jobId=28, executorId=92, taskAttemptId=93846, 
partitionId=11931, stageId=74} INFO 
org.apache.spark.shuffle.celeborn.SparkShuffleManager: Shuffle 24 write mode is 
changed to SORT because partition count 12000 is greater than threshold 2000
   25/01/31 14:40:54 {executorId=92, jobId=28, partitionId=11931, stageId=74, 
taskAttemptId=93846} INFO 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader: BatchOpenStream for 0 
cost 0ms
   25/01/31 14:40:54 {} WARN org.apache.celeborn.client.ShuffleClientImpl: 
Shuffle data is empty for shuffle 23 partition 11931.
   ```
   
   ### How was this patch tested?
   No additional tests for this: I've tried to reproduce it, but we've only 
seen this happen with high number of nodes and during long execution time range.
   
   ### More explanation on why/how this happens
   
   ```
   // write path
    override def setStageEnd(shuffleId: Int): Unit = {
       getReducerFileGroupRequest synchronized {
         stageEndShuffleSet.add(shuffleId)
       }
   ....
   
   // read path
    override def handleGetReducerFileGroup(context: RpcCallContext, shuffleId: 
Int): Unit = {
       // Quick return for ended stage, avoid occupy sync lock.
       if (isStageEnd(shuffleId)) {
         replyGetReducerFileGroup(context, shuffleId)
       } else {
         getReducerFileGroupRequest.synchronized {
   ...
   
   override def isStageEnd(shuffleId: Int): Boolean = {
       stageEndShuffleSet.contains(shuffleId)
     }
   ```
   
   Since concurrency guarantees between read/write path are based on 
ConcurrentHashMap's volatile values there's no guarantee that content of a 
HashSet would be seen fully by the reader thread.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to