This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 006ee5a99 [CELEBORN-2163] PushDataHandler should increment 
WriteDataFailCount for file writer exception of MapPartition PushData
006ee5a99 is described below

commit 006ee5a993a9f33427bf01c3e49dbd53a0a8f46b
Author: SteNicholas <[email protected]>
AuthorDate: Tue Oct 14 13:04:39 2025 +0800

    [CELEBORN-2163] PushDataHandler should increment WriteDataFailCount for 
file writer exception of MapPartition PushData
    
    ### What changes were proposed in this pull request?
    
    `PushDataHandler` should increment `WriteDataFailCount` for file writer 
exception of MapPartition PushData.
    
    ### Why are the changes needed?
    
    `PushDataHandler` does not increment `WriteDataFailCount` for file writer 
exception of MapPartition PushData, which causes that `WriteDataFailCount` 
metric is always zero.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI.
    
    Closes #3492 from SteNicholas/CELEBORN-2163.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
    (cherry picked from commit fb8703077f19de6d5ebf45df0c5db9d81588a005)
    Signed-off-by: SteNicholas <[email protected]>
---
 .../service/deploy/worker/PushDataHandler.scala    | 42 ++++++----------------
 1 file changed, 10 insertions(+), 32 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 3e526a71e..892996780 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -1020,7 +1020,6 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
         pushData.`type`(),
         isPrimary,
         pushData.requestId,
-        null,
         location,
         if (isPrimary) WorkerSource.PRIMARY_PUSH_DATA_TIME else 
WorkerSource.REPLICA_PUSH_DATA_TIME,
         callback)
@@ -1054,27 +1053,14 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     val writePromise = Promise[Array[StatusCode]]()
     writeLocalData(Seq(fileWriter), body, shuffleKey, isPrimary, None, 
writePromise)
     // for primary, send data to replica
-    if (location.hasPeer && isPrimary) {
-      // to do
-      Try(Await.result(writePromise.future, Duration.Inf)) match {
-        case Success(result) =>
-          if (result(0) != StatusCode.SUCCESS) {
-            wrappedCallback.onFailure(new CelebornIOException("Write data 
failed!"))
-          } else {
-            wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
-          }
-        case Failure(e) => wrappedCallback.onFailure(e)
-      }
-    } else {
-      Try(Await.result(writePromise.future, Duration.Inf)) match {
-        case Success(result) =>
-          if (result(0) != StatusCode.SUCCESS) {
-            wrappedCallback.onFailure(new CelebornIOException("Write data 
failed!"))
-          } else {
-            wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
-          }
-        case Failure(e) => wrappedCallback.onFailure(e)
-      }
+    Try(Await.result(writePromise.future, Duration.Inf)) match {
+      case Success(result) =>
+        if (result(0) != StatusCode.SUCCESS) {
+          wrappedCallback.onFailure(new CelebornIOException("Write data 
failed!"))
+        } else {
+          wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+        }
+      case Failure(e) => wrappedCallback.onFailure(e)
     }
   }
 
@@ -1238,7 +1224,6 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
         messageType,
         isPrimary,
         requestId,
-        null,
         location,
         if (isPrimary) workerSourcePrimary else workerSourceReplica,
         callback)
@@ -1306,12 +1291,7 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
         case _ => throw new IllegalArgumentException(s"Not support 
$messageType yet")
       }
       // for primary , send data to replica
-      if (location.hasPeer && isPrimary) {
-        // TODO replica
-        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
-      } else {
-        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
-      }
+      wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
     } catch {
       case t: Throwable =>
         callback.onFailure(new CelebornIOException(s"$messageType failed", t))
@@ -1322,7 +1302,6 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
       messageType: Message.Type,
       isPrimary: Boolean,
       requestId: Long,
-      softSplit: AtomicBoolean,
       location: PartitionLocation,
       workerSourceTime: String,
       callback: RpcResponseCallback)
@@ -1335,8 +1314,6 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
           resp.put(response)
           resp.flip()
           callback.onSuccess(resp)
-        } else if (softSplit != null && softSplit.get()) {
-          
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.SOFT_SPLIT.getValue)))
         } else {
           callback.onSuccess(response)
         }
@@ -1407,6 +1384,7 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     val (messagePrimary, messageReplica) =
       messageType match {
         case Type.PUSH_DATA =>
+          workerSource.incCounter(WorkerSource.WRITE_DATA_FAIL_COUNT)
           (
             StatusCode.PUSH_DATA_WRITE_FAIL_PRIMARY,
             StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA)

Reply via email to