This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git

commit 567e4b0a8b6dc488144babcc9db0e14dc40a54da
Author: Aidar Bariev <[email protected]>
AuthorDate: Thu Feb 27 10:27:12 2025 +0800

    [CELEBORN-1883] Replace HashSet with ConcurrentHashMap.newKeySet for 
ShuffleFileGroups
    
    ### What changes were proposed in this pull request?
    Replacing HashSet of PartitionLocations with concurrent version of it.
    
    ### Why are the changes needed?
    We are seeing some race conditions between `handleGetReducerFileGroup`& 
`tryFinalCommit`, where reducers complete without processing partition, even 
though there's data.
    
    ### Problematic logs
    On the driver side:
    ```
    25/01/31 14:23:02 {} INFO 
org.apache.celeborn.client.commit.ReducePartitionCommitHandler: Shuffle 23 
commit files complete. File count 23200 using 240180 ms
    ...
    25/01/31 14:23:02 {} INFO 
org.apache.celeborn.client.commit.ReducePartitionCommitHandler: Shuffle 23 
partition 11931-0: primary lost, use replica PartitionLocation[
      id-epoch:11931-0
      
host-rpcPort-pushPort-fetchPort-replicatePort:10.68.138.242-39557-35555-37139-39685
      mode:REPLICA
      peer:(empty)
      storage hint:StorageInfo{type=SSD, mountPoint='', finalResult=true, 
filePath=}
      mapIdBitMap:null].
    ...
    25/01/31 14:23:02 {} INFO 
org.apache.celeborn.client.commit.ReducePartitionCommitHandler: Succeed to 
handle stageEnd for 23.
    ```
    
    On the executor side:
    ```
    25/01/31 14:23:02 {executorId=92, jobId=28, partitionId=420, stageId=74, 
taskAttemptId=82047} INFO org.apache.celeborn.client.ShuffleClientImpl: Shuffle 
23 request reducer file group success using 59315 ms, result partition size 
12000
    ...
    25/01/31 14:40:54 {executorId=92, partitionId=11931, taskAttemptId=93846} 
INFO org.apache.spark.executor.Executor: Running task 11931.0 in stage 74.0 
(TID 93846)
    25/01/31 14:40:54 {jobId=28, executorId=92, taskAttemptId=93846, 
partitionId=11931, stageId=74} INFO 
org.apache.spark.shuffle.celeborn.SparkShuffleManager: Shuffle 24 write mode is 
changed to SORT because partition count 12000 is greater than threshold 2000
    25/01/31 14:40:54 {executorId=92, jobId=28, partitionId=11931, stageId=74, 
taskAttemptId=93846} INFO 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader: BatchOpenStream for 0 
cost 0ms
    25/01/31 14:40:54 {} WARN org.apache.celeborn.client.ShuffleClientImpl: 
Shuffle data is empty for shuffle 23 partition 11931.
    ```
    
    ### How was this patch tested?
    No additional tests for this: I've tried to reproduce it, but we've only 
seen this happen with high number of nodes and during long execution time range.
    
    ### More explanation on why/how this happens
    
    ```
    // write path
     override def setStageEnd(shuffleId: Int): Unit = {
        getReducerFileGroupRequest synchronized {
          stageEndShuffleSet.add(shuffleId)
        }
    ....
    
    // read path
     override def handleGetReducerFileGroup(context: RpcCallContext, shuffleId: 
Int): Unit = {
        // Quick return for ended stage, avoid occupy sync lock.
        if (isStageEnd(shuffleId)) {
          replyGetReducerFileGroup(context, shuffleId)
        } else {
          getReducerFileGroupRequest.synchronized {
    ...
    
    override def isStageEnd(shuffleId: Int): Boolean = {
        stageEndShuffleSet.contains(shuffleId)
      }
    ```
    
    Since concurrency guarantees between read/write path are based on 
ConcurrentHashMap's volatile values there's no guarantee that content of a 
HashSet would be seen fully by the reader thread.
    
    Closes #3100 from aidar-stripe/main-commit-concurrency-fix.
    
    Authored-by: Aidar Bariev <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../main/scala/org/apache/celeborn/client/commit/CommitHandler.scala    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index da95486eb..82166afe1 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -491,7 +491,7 @@ abstract class CommitHandler(
     committedPartitions.values().asScala.foreach { partition =>
       val partitionLocations = 
reducerFileGroupsMap.get(shuffleId).computeIfAbsent(
         partition.getId,
-        (k: Integer) => new util.HashSet[PartitionLocation]())
+        (k: Integer) => ConcurrentHashMap.newKeySet[PartitionLocation]())
       partitionLocations.add(partition)
     }
   }

Reply via email to