This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new bc7da3154 [CELEBORN-354][Flink] fix succeedPartitionIds may contain 
new added partitionIds (#1289)
bc7da3154 is described below

commit bc7da3154f463700d081cfa24b2fb1953b5cef0c
Author: Shuang <[email protected]>
AuthorDate: Wed Mar 1 15:45:24 2023 +0800

    [CELEBORN-354][Flink] fix succeedPartitionIds may contain new added 
partitionIds (#1289)
---
 .../celeborn/client/commit/MapPartitionCommitHandler.scala   | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
 
b/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
index 04c8f0649..090c7a7f1 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
@@ -54,7 +54,7 @@ class MapPartitionCommitHandler(
   extends CommitHandler(appId, conf, allocatedWorkers, committedPartitionInfo)
   with Logging {
 
-  private val shuffleSuccessPartitionIds = new ConcurrentHashMap[Int, 
util.Set[Integer]]()
+  private val shuffleSucceedPartitionIds = new ConcurrentHashMap[Int, 
util.Set[Integer]]()
 
   // shuffleId -> in processing partitionId set
   private val inProcessMapPartitionEndIds = new ConcurrentHashMap[Int, 
util.Set[Integer]]()
@@ -123,6 +123,7 @@ class MapPartitionCommitHandler(
 
   override def removeExpiredShuffle(shuffleId: Int): Unit = {
     inProcessMapPartitionEndIds.remove(shuffleId)
+    shuffleSucceedPartitionIds.remove(shuffleId)
     super.removeExpiredShuffle(shuffleId)
   }
 
@@ -218,7 +219,7 @@ class MapPartitionCommitHandler(
     inProcessingPartitionIds.remove(partitionId)
     if (dataCommitSuccess) {
       val resultPartitions =
-        shuffleSuccessPartitionIds.computeIfAbsent(
+        shuffleSucceedPartitionIds.computeIfAbsent(
           shuffleId,
           (k: Int) => ConcurrentHashMap.newKeySet[Integer]())
       resultPartitions.add(partitionId)
@@ -228,10 +229,15 @@ class MapPartitionCommitHandler(
   }
 
   override def handleGetReducerFileGroup(context: RpcCallContext, shuffleId: 
Int): Unit = {
+    // we need obtain the last succeed partitionIds
+    val lastSucceedPartitionIds =
+      shuffleSucceedPartitionIds.getOrDefault(shuffleId, new 
util.HashSet[Integer]())
+    val succeedPartitionIds = new 
util.HashSet[Integer](lastSucceedPartitionIds)
+
     context.reply(GetReducerFileGroupResponse(
       StatusCode.SUCCESS,
       reducerFileGroupsMap.getOrDefault(shuffleId, new ConcurrentHashMap()),
       getMapperAttempts(shuffleId),
-      shuffleSuccessPartitionIds.get(shuffleId)))
+      succeedPartitionIds))
   }
 }

Reply via email to