This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 7542adf70 [CELEBORN-1948] Fix the issue where replica may lose data 
when HARD_SPLIT occurs during handlePushMergeData
7542adf70 is described below

commit 7542adf708a89f8f15429faacb7e9aac5842e6d1
Author: Xianming Lei <31424839+le...@users.noreply.github.com>
AuthorDate: Fri May 9 10:39:52 2025 +0800

    [CELEBORN-1948] Fix the issue where replica may lose data when HARD_SPLIT 
occurs during handlePushMergeData
    
    ### What changes were proposed in this pull request?
    Fix data lost when push merged data of replica and hard split happen.
    
    ### Why are the changes needed?
    There is a problem with replicate rpc callback. The code should satisfy the 
following conditions: when comparing the status returned by primary and replica 
data, the status on the left should be used as the final status for the client, 
FAILURE > HARD_SPLIT > CONGESTION > SUCCESS. The status on the right cannot 
cover the status on the left.
    
    There are two problems with the code now
    1. HARD_SPLIT can cover FAILURE, which will affect the exclude worker 
logic, and there may be some problems
    2. When processing a pushMergedData request, some partitionLocations are 
committed, PushDataHandler cannot stop pushing replicas as long as there are 
any partitions that have not been committed, otherwise data loss will occur.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing UTs.
    
    Closes #3185 from leixm/CELEBORN-1948.
    
    Lead-authored-by: Xianming Lei <31424839+le...@users.noreply.github.com>
    Co-authored-by: Xianming Lei <xianming....@shopee.com>
    Co-authored-by: Shuang <lvshuang....@alibaba-inc.com>
    Signed-off-by: Shuang <lvshuang....@alibaba-inc.com>
---
 .../service/deploy/worker/PushDataHandler.scala    | 92 +++++++++++-----------
 1 file changed, 46 insertions(+), 46 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index d74e7a380..9edf91741 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -490,8 +490,7 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
       }
 
     // Fetch real batchId from body will add more cost and no meaning for 
replicate.
-    val doReplicate =
-      partitionIdToLocations.head._2 != null && 
partitionIdToLocations.head._2.hasPeer && isPrimary
+    val doReplicate = isPrimary && partitionIdToLocations.exists(p => p._2 != 
null && p._2.hasPeer)
 
     // find FileWriters responsible for the data
     var index = 0
@@ -602,7 +601,7 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     val writePromise = Promise[Array[StatusCode]]()
     // for primary, send data to replica
     if (doReplicate) {
-      val location = partitionIdToLocations.head._2
+      val location = partitionIdToLocations.find(p => p._2 != null && 
p._2.hasPeer).get._2
       val peer = location.getPeer
       val peerWorker = new WorkerInfo(
         peer.getHost,
@@ -631,49 +630,6 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
           // Handle the response from replica
           val wrappedCallback = new RpcResponseCallback() {
             override def onSuccess(response: ByteBuffer): Unit = {
-              // During the rolling upgrade of the worker cluster, it is 
possible for
-              // the primary worker to be upgraded to a new version that 
includes
-              // the changes from [CELEBORN-1721], while the replica worker is 
still running
-              // on an older version that does not have these changes.
-              // In this scenario, the replica may return a response without 
any context
-              // when status of SUCCESS.
-              val replicaReason =
-                if (response.remaining() > 0) {
-                  response.get()
-                } else {
-                  StatusCode.SUCCESS
-                }
-              if (replicaReason == StatusCode.HARD_SPLIT.getValue) {
-                if (response.remaining() > 0) {
-                  try {
-                    val pushMergedDataResponse: 
PbPushMergedDataSplitPartitionInfo =
-                      TransportMessage.fromByteBuffer(
-                        
response).getParsedPayload[PbPushMergedDataSplitPartitionInfo]()
-                    pushMergedDataCallback.unionReplicaSplitPartitions(
-                      pushMergedDataResponse.getSplitPartitionIndexesList,
-                      pushMergedDataResponse.getStatusCodesList)
-                  } catch {
-                    case e: CelebornIOException =>
-                      pushMergedDataCallback.onFailure(e)
-                      return
-                    case e: IllegalArgumentException =>
-                      pushMergedDataCallback.onFailure(new 
CelebornIOException(e))
-                      return
-                  }
-                } else {
-                  // During the rolling upgrade of the worker cluster, it is 
possible for the primary worker
-                  // to be upgraded to a new version that includes the changes 
from [CELEBORN-1721], while
-                  // the replica worker is still running on an older version 
that does not have these changes.
-                  // In this scenario, the replica may return a response with 
a status of HARD_SPLIT, but
-                  // will not provide a PbPushMergedDataSplitPartitionInfo.
-                  logWarning(
-                    s"The response status from the replica (shuffle 
$shuffleKey map $mapId attempt $attemptId) is HARD_SPLIT, but no 
PbPushMergedDataSplitPartitionInfo is present.")
-                  partitionIdToLocations.indices.foreach(index =>
-                    pushMergedDataCallback.addSplitPartition(index, 
StatusCode.HARD_SPLIT))
-                  pushMergedDataCallback.onSuccess(StatusCode.HARD_SPLIT)
-                  return
-                }
-              }
               Try(Await.result(writePromise.future, Duration.Inf)) match {
                 case Success(result) =>
                   var index = 0
@@ -683,6 +639,50 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
                     }
                     index += 1
                   }
+                  // During the rolling upgrade of the worker cluster, it is 
possible for
+                  // the primary worker to be upgraded to a new version that 
includes
+                  // the changes from [CELEBORN-1721], while the replica 
worker is still running
+                  // on an older version that does not have these changes.
+                  // In this scenario, the replica may return a response 
without any context
+                  // when status of SUCCESS.
+                  val replicaReason =
+                    if (response.remaining() > 0) {
+                      response.get()
+                    } else {
+                      StatusCode.SUCCESS
+                    }
+                  if (replicaReason == StatusCode.HARD_SPLIT.getValue) {
+                    if (response.remaining() > 0) {
+                      try {
+                        val pushMergedDataResponse: 
PbPushMergedDataSplitPartitionInfo =
+                          TransportMessage.fromByteBuffer(
+                            
response).getParsedPayload[PbPushMergedDataSplitPartitionInfo]()
+                        pushMergedDataCallback.unionReplicaSplitPartitions(
+                          pushMergedDataResponse.getSplitPartitionIndexesList,
+                          pushMergedDataResponse.getStatusCodesList)
+                      } catch {
+                        case e: CelebornIOException =>
+                          pushMergedDataCallback.onFailure(e)
+                          return
+                        case e: IllegalArgumentException =>
+                          pushMergedDataCallback.onFailure(new 
CelebornIOException(e))
+                          return
+                      }
+                    } else {
+                      // During the rolling upgrade of the worker cluster, it 
is possible for the primary worker
+                      // to be upgraded to a new version that includes the 
changes from [CELEBORN-1721], while
+                      // the replica worker is still running on an older 
version that does not have these changes.
+                      // In this scenario, the replica may return a response 
with a status of HARD_SPLIT, but
+                      // will not provide a PbPushMergedDataSplitPartitionInfo.
+                      logWarning(
+                        s"The response status from the replica (shuffle 
$shuffleKey map $mapId attempt $attemptId) is HARD_SPLIT, but no 
PbPushMergedDataSplitPartitionInfo is present.")
+                      partitionIdToLocations.indices.foreach(index =>
+                        pushMergedDataCallback.addSplitPartition(index, 
StatusCode.HARD_SPLIT))
+                    }
+                    pushMergedDataCallback.onSuccess(StatusCode.HARD_SPLIT)
+                    return
+                  }
+
                   // Only primary data enable replication will push data to 
replica
                   Option(CongestionController.instance()) match {
                     case Some(congestionController) =>

Reply via email to