This is an automated email from the ASF dual-hosted git repository. angerszhuuuu pushed a commit to branch CELEBORN-1928- in repository https://gitbox.apache.org/repos/asf/celeborn.git
commit f7021342e660a69d33f14c72bd925a978146441d Author: Angerszhuuuu <[email protected]> AuthorDate: Tue Mar 25 17:45:19 2025 +0800 CELEBORN-1928 [CIP-12] Support HARD_SPLIT in PushMergedData should support handle older worker success response --- .../celeborn/service/deploy/worker/PushDataHandler.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index 30a12535b..75c95e090 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -632,7 +632,17 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler // Handle the response from replica val wrappedCallback = new RpcResponseCallback() { override def onSuccess(response: ByteBuffer): Unit = { - val replicaReason = response.get() + // During the rolling upgrade of the worker cluster, it is possible for + // the primary worker to be upgraded to a new version that includes + // the changes from [CELEBORN-1721], while the replica worker is still running + // on an older version that does not have these changes. + // In this scenario, the replica may return a response without any context + // when status of SUCCESS. + val replicaReason = if (response.remaining() > 0) { + response.get() + } else { + StatusCode.SUCCESS + } if (replicaReason == StatusCode.HARD_SPLIT.getValue) { if (response.remaining() > 0) { try {
