This is an automated email from the ASF dual-hosted git repository. nicholasjiang pushed a commit to branch branch-0.5 in repository https://gitbox.apache.org/repos/asf/celeborn.git
commit 567e4b0a8b6dc488144babcc9db0e14dc40a54da Author: Aidar Bariev <[email protected]> AuthorDate: Thu Feb 27 10:27:12 2025 +0800 [CELEBORN-1883] Replace HashSet with ConcurrentHashMap.newKeySet for ShuffleFileGroups ### What changes were proposed in this pull request? Replacing HashSet of PartitionLocations with concurrent version of it. ### Why are the changes needed? We are seeing some race conditions between `handleGetReducerFileGroup`& `tryFinalCommit`, where reducers complete without processing partition, even though there's data. ### Problematic logs On the driver side: ``` 25/01/31 14:23:02 {} INFO org.apache.celeborn.client.commit.ReducePartitionCommitHandler: Shuffle 23 commit files complete. File count 23200 using 240180 ms ... 25/01/31 14:23:02 {} INFO org.apache.celeborn.client.commit.ReducePartitionCommitHandler: Shuffle 23 partition 11931-0: primary lost, use replica PartitionLocation[ id-epoch:11931-0 host-rpcPort-pushPort-fetchPort-replicatePort:10.68.138.242-39557-35555-37139-39685 mode:REPLICA peer:(empty) storage hint:StorageInfo{type=SSD, mountPoint='', finalResult=true, filePath=} mapIdBitMap:null]. ... 25/01/31 14:23:02 {} INFO org.apache.celeborn.client.commit.ReducePartitionCommitHandler: Succeed to handle stageEnd for 23. ``` On the executor side: ``` 25/01/31 14:23:02 {executorId=92, jobId=28, partitionId=420, stageId=74, taskAttemptId=82047} INFO org.apache.celeborn.client.ShuffleClientImpl: Shuffle 23 request reducer file group success using 59315 ms, result partition size 12000 ... 25/01/31 14:40:54 {executorId=92, partitionId=11931, taskAttemptId=93846} INFO org.apache.spark.executor.Executor: Running task 11931.0 in stage 74.0 (TID 93846) 25/01/31 14:40:54 {jobId=28, executorId=92, taskAttemptId=93846, partitionId=11931, stageId=74} INFO org.apache.spark.shuffle.celeborn.SparkShuffleManager: Shuffle 24 write mode is changed to SORT because partition count 12000 is greater than threshold 2000 25/01/31 14:40:54 {executorId=92, jobId=28, partitionId=11931, stageId=74, taskAttemptId=93846} INFO org.apache.spark.shuffle.celeborn.CelebornShuffleReader: BatchOpenStream for 0 cost 0ms 25/01/31 14:40:54 {} WARN org.apache.celeborn.client.ShuffleClientImpl: Shuffle data is empty for shuffle 23 partition 11931. ``` ### How was this patch tested? No additional tests for this: I've tried to reproduce it, but we've only seen this happen with high number of nodes and during long execution time range. ### More explanation on why/how this happens ``` // write path override def setStageEnd(shuffleId: Int): Unit = { getReducerFileGroupRequest synchronized { stageEndShuffleSet.add(shuffleId) } .... // read path override def handleGetReducerFileGroup(context: RpcCallContext, shuffleId: Int): Unit = { // Quick return for ended stage, avoid occupy sync lock. if (isStageEnd(shuffleId)) { replyGetReducerFileGroup(context, shuffleId) } else { getReducerFileGroupRequest.synchronized { ... override def isStageEnd(shuffleId: Int): Boolean = { stageEndShuffleSet.contains(shuffleId) } ``` Since concurrency guarantees between read/write path are based on ConcurrentHashMap's volatile values there's no guarantee that content of a HashSet would be seen fully by the reader thread. Closes #3100 from aidar-stripe/main-commit-concurrency-fix. Authored-by: Aidar Bariev <[email protected]> Signed-off-by: Shuang <[email protected]> --- .../main/scala/org/apache/celeborn/client/commit/CommitHandler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala index da95486eb..82166afe1 100644 --- a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala +++ b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala @@ -491,7 +491,7 @@ abstract class CommitHandler( committedPartitions.values().asScala.foreach { partition => val partitionLocations = reducerFileGroupsMap.get(shuffleId).computeIfAbsent( partition.getId, - (k: Integer) => new util.HashSet[PartitionLocation]()) + (k: Integer) => ConcurrentHashMap.newKeySet[PartitionLocation]()) partitionLocations.add(partition) } }
