This is an automated email from the ASF dual-hosted git repository. rexxiong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push: new 7542adf70 [CELEBORN-1948] Fix the issue where replica may lose data when HARD_SPLIT occurs during handlePushMergeData 7542adf70 is described below commit 7542adf708a89f8f15429faacb7e9aac5842e6d1 Author: Xianming Lei <31424839+le...@users.noreply.github.com> AuthorDate: Fri May 9 10:39:52 2025 +0800 [CELEBORN-1948] Fix the issue where replica may lose data when HARD_SPLIT occurs during handlePushMergeData ### What changes were proposed in this pull request? Fix data lost when push merged data of replica and hard split happen. ### Why are the changes needed? There is a problem with replicate rpc callback. The code should satisfy the following conditions: when comparing the status returned by primary and replica data, the status on the left should be used as the final status for the client, FAILURE > HARD_SPLIT > CONGESTION > SUCCESS. The status on the right cannot cover the status on the left. There are two problems with the code now 1. HARD_SPLIT can cover FAILURE, which will affect the exclude worker logic, and there may be some problems 2. When processing a pushMergedData request, some partitionLocations are committed, PushDataHandler cannot stop pushing replicas as long as there are any partitions that have not been committed, otherwise data loss will occur. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs. Closes #3185 from leixm/CELEBORN-1948. Lead-authored-by: Xianming Lei <31424839+le...@users.noreply.github.com> Co-authored-by: Xianming Lei <xianming....@shopee.com> Co-authored-by: Shuang <lvshuang....@alibaba-inc.com> Signed-off-by: Shuang <lvshuang....@alibaba-inc.com> --- .../service/deploy/worker/PushDataHandler.scala | 92 +++++++++++----------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index d74e7a380..9edf91741 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -490,8 +490,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler } // Fetch real batchId from body will add more cost and no meaning for replicate. - val doReplicate = - partitionIdToLocations.head._2 != null && partitionIdToLocations.head._2.hasPeer && isPrimary + val doReplicate = isPrimary && partitionIdToLocations.exists(p => p._2 != null && p._2.hasPeer) // find FileWriters responsible for the data var index = 0 @@ -602,7 +601,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler val writePromise = Promise[Array[StatusCode]]() // for primary, send data to replica if (doReplicate) { - val location = partitionIdToLocations.head._2 + val location = partitionIdToLocations.find(p => p._2 != null && p._2.hasPeer).get._2 val peer = location.getPeer val peerWorker = new WorkerInfo( peer.getHost, @@ -631,49 +630,6 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler // Handle the response from replica val wrappedCallback = new RpcResponseCallback() { override def onSuccess(response: ByteBuffer): Unit = { - // During the rolling upgrade of the worker cluster, it is possible for - // the primary worker to be upgraded to a new version that includes - // the changes from [CELEBORN-1721], while the replica worker is still running - // on an older version that does not have these changes. - // In this scenario, the replica may return a response without any context - // when status of SUCCESS. - val replicaReason = - if (response.remaining() > 0) { - response.get() - } else { - StatusCode.SUCCESS - } - if (replicaReason == StatusCode.HARD_SPLIT.getValue) { - if (response.remaining() > 0) { - try { - val pushMergedDataResponse: PbPushMergedDataSplitPartitionInfo = - TransportMessage.fromByteBuffer( - response).getParsedPayload[PbPushMergedDataSplitPartitionInfo]() - pushMergedDataCallback.unionReplicaSplitPartitions( - pushMergedDataResponse.getSplitPartitionIndexesList, - pushMergedDataResponse.getStatusCodesList) - } catch { - case e: CelebornIOException => - pushMergedDataCallback.onFailure(e) - return - case e: IllegalArgumentException => - pushMergedDataCallback.onFailure(new CelebornIOException(e)) - return - } - } else { - // During the rolling upgrade of the worker cluster, it is possible for the primary worker - // to be upgraded to a new version that includes the changes from [CELEBORN-1721], while - // the replica worker is still running on an older version that does not have these changes. - // In this scenario, the replica may return a response with a status of HARD_SPLIT, but - // will not provide a PbPushMergedDataSplitPartitionInfo. - logWarning( - s"The response status from the replica (shuffle $shuffleKey map $mapId attempt $attemptId) is HARD_SPLIT, but no PbPushMergedDataSplitPartitionInfo is present.") - partitionIdToLocations.indices.foreach(index => - pushMergedDataCallback.addSplitPartition(index, StatusCode.HARD_SPLIT)) - pushMergedDataCallback.onSuccess(StatusCode.HARD_SPLIT) - return - } - } Try(Await.result(writePromise.future, Duration.Inf)) match { case Success(result) => var index = 0 @@ -683,6 +639,50 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler } index += 1 } + // During the rolling upgrade of the worker cluster, it is possible for + // the primary worker to be upgraded to a new version that includes + // the changes from [CELEBORN-1721], while the replica worker is still running + // on an older version that does not have these changes. + // In this scenario, the replica may return a response without any context + // when status of SUCCESS. + val replicaReason = + if (response.remaining() > 0) { + response.get() + } else { + StatusCode.SUCCESS + } + if (replicaReason == StatusCode.HARD_SPLIT.getValue) { + if (response.remaining() > 0) { + try { + val pushMergedDataResponse: PbPushMergedDataSplitPartitionInfo = + TransportMessage.fromByteBuffer( + response).getParsedPayload[PbPushMergedDataSplitPartitionInfo]() + pushMergedDataCallback.unionReplicaSplitPartitions( + pushMergedDataResponse.getSplitPartitionIndexesList, + pushMergedDataResponse.getStatusCodesList) + } catch { + case e: CelebornIOException => + pushMergedDataCallback.onFailure(e) + return + case e: IllegalArgumentException => + pushMergedDataCallback.onFailure(new CelebornIOException(e)) + return + } + } else { + // During the rolling upgrade of the worker cluster, it is possible for the primary worker + // to be upgraded to a new version that includes the changes from [CELEBORN-1721], while + // the replica worker is still running on an older version that does not have these changes. + // In this scenario, the replica may return a response with a status of HARD_SPLIT, but + // will not provide a PbPushMergedDataSplitPartitionInfo. + logWarning( + s"The response status from the replica (shuffle $shuffleKey map $mapId attempt $attemptId) is HARD_SPLIT, but no PbPushMergedDataSplitPartitionInfo is present.") + partitionIdToLocations.indices.foreach(index => + pushMergedDataCallback.addSplitPartition(index, StatusCode.HARD_SPLIT)) + } + pushMergedDataCallback.onSuccess(StatusCode.HARD_SPLIT) + return + } + // Only primary data enable replication will push data to replica Option(CongestionController.instance()) match { case Some(congestionController) =>