This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new bc7da3154 [CELEBORN-354][Flink] fix succeedPartitionIds may contain
new added partitionIds (#1289)
bc7da3154 is described below
commit bc7da3154f463700d081cfa24b2fb1953b5cef0c
Author: Shuang <[email protected]>
AuthorDate: Wed Mar 1 15:45:24 2023 +0800
[CELEBORN-354][Flink] fix succeedPartitionIds may contain new added
partitionIds (#1289)
---
.../celeborn/client/commit/MapPartitionCommitHandler.scala | 12 +++++++++---
1 file changed, 9 insertions(+), 3 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
index 04c8f0649..090c7a7f1 100644
---
a/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
@@ -54,7 +54,7 @@ class MapPartitionCommitHandler(
extends CommitHandler(appId, conf, allocatedWorkers, committedPartitionInfo)
with Logging {
- private val shuffleSuccessPartitionIds = new ConcurrentHashMap[Int,
util.Set[Integer]]()
+ private val shuffleSucceedPartitionIds = new ConcurrentHashMap[Int,
util.Set[Integer]]()
// shuffleId -> in processing partitionId set
private val inProcessMapPartitionEndIds = new ConcurrentHashMap[Int,
util.Set[Integer]]()
@@ -123,6 +123,7 @@ class MapPartitionCommitHandler(
override def removeExpiredShuffle(shuffleId: Int): Unit = {
inProcessMapPartitionEndIds.remove(shuffleId)
+ shuffleSucceedPartitionIds.remove(shuffleId)
super.removeExpiredShuffle(shuffleId)
}
@@ -218,7 +219,7 @@ class MapPartitionCommitHandler(
inProcessingPartitionIds.remove(partitionId)
if (dataCommitSuccess) {
val resultPartitions =
- shuffleSuccessPartitionIds.computeIfAbsent(
+ shuffleSucceedPartitionIds.computeIfAbsent(
shuffleId,
(k: Int) => ConcurrentHashMap.newKeySet[Integer]())
resultPartitions.add(partitionId)
@@ -228,10 +229,15 @@ class MapPartitionCommitHandler(
}
override def handleGetReducerFileGroup(context: RpcCallContext, shuffleId:
Int): Unit = {
+ // we need obtain the last succeed partitionIds
+ val lastSucceedPartitionIds =
+ shuffleSucceedPartitionIds.getOrDefault(shuffleId, new
util.HashSet[Integer]())
+ val succeedPartitionIds = new
util.HashSet[Integer](lastSucceedPartitionIds)
+
context.reply(GetReducerFileGroupResponse(
StatusCode.SUCCESS,
reducerFileGroupsMap.getOrDefault(shuffleId, new ConcurrentHashMap()),
getMapperAttempts(shuffleId),
- shuffleSuccessPartitionIds.get(shuffleId)))
+ succeedPartitionIds))
}
}