This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 006ee5a99 [CELEBORN-2163] PushDataHandler should increment
WriteDataFailCount for file writer exception of MapPartition PushData
006ee5a99 is described below
commit 006ee5a993a9f33427bf01c3e49dbd53a0a8f46b
Author: SteNicholas <[email protected]>
AuthorDate: Tue Oct 14 13:04:39 2025 +0800
[CELEBORN-2163] PushDataHandler should increment WriteDataFailCount for
file writer exception of MapPartition PushData
### What changes were proposed in this pull request?
`PushDataHandler` should increment `WriteDataFailCount` for file writer
exception of MapPartition PushData.
### Why are the changes needed?
`PushDataHandler` does not increment `WriteDataFailCount` for file writer
exception of MapPartition PushData, which causes that `WriteDataFailCount`
metric is always zero.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes #3492 from SteNicholas/CELEBORN-2163.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
(cherry picked from commit fb8703077f19de6d5ebf45df0c5db9d81588a005)
Signed-off-by: SteNicholas <[email protected]>
---
.../service/deploy/worker/PushDataHandler.scala | 42 ++++++----------------
1 file changed, 10 insertions(+), 32 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 3e526a71e..892996780 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -1020,7 +1020,6 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
pushData.`type`(),
isPrimary,
pushData.requestId,
- null,
location,
if (isPrimary) WorkerSource.PRIMARY_PUSH_DATA_TIME else
WorkerSource.REPLICA_PUSH_DATA_TIME,
callback)
@@ -1054,27 +1053,14 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
val writePromise = Promise[Array[StatusCode]]()
writeLocalData(Seq(fileWriter), body, shuffleKey, isPrimary, None,
writePromise)
// for primary, send data to replica
- if (location.hasPeer && isPrimary) {
- // to do
- Try(Await.result(writePromise.future, Duration.Inf)) match {
- case Success(result) =>
- if (result(0) != StatusCode.SUCCESS) {
- wrappedCallback.onFailure(new CelebornIOException("Write data
failed!"))
- } else {
- wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
- }
- case Failure(e) => wrappedCallback.onFailure(e)
- }
- } else {
- Try(Await.result(writePromise.future, Duration.Inf)) match {
- case Success(result) =>
- if (result(0) != StatusCode.SUCCESS) {
- wrappedCallback.onFailure(new CelebornIOException("Write data
failed!"))
- } else {
- wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
- }
- case Failure(e) => wrappedCallback.onFailure(e)
- }
+ Try(Await.result(writePromise.future, Duration.Inf)) match {
+ case Success(result) =>
+ if (result(0) != StatusCode.SUCCESS) {
+ wrappedCallback.onFailure(new CelebornIOException("Write data
failed!"))
+ } else {
+ wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+ }
+ case Failure(e) => wrappedCallback.onFailure(e)
}
}
@@ -1238,7 +1224,6 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
messageType,
isPrimary,
requestId,
- null,
location,
if (isPrimary) workerSourcePrimary else workerSourceReplica,
callback)
@@ -1306,12 +1291,7 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
case _ => throw new IllegalArgumentException(s"Not support
$messageType yet")
}
// for primary , send data to replica
- if (location.hasPeer && isPrimary) {
- // TODO replica
- wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
- } else {
- wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
- }
+ wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
} catch {
case t: Throwable =>
callback.onFailure(new CelebornIOException(s"$messageType failed", t))
@@ -1322,7 +1302,6 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
messageType: Message.Type,
isPrimary: Boolean,
requestId: Long,
- softSplit: AtomicBoolean,
location: PartitionLocation,
workerSourceTime: String,
callback: RpcResponseCallback)
@@ -1335,8 +1314,6 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
resp.put(response)
resp.flip()
callback.onSuccess(resp)
- } else if (softSplit != null && softSplit.get()) {
-
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.SOFT_SPLIT.getValue)))
} else {
callback.onSuccess(response)
}
@@ -1407,6 +1384,7 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
val (messagePrimary, messageReplica) =
messageType match {
case Type.PUSH_DATA =>
+ workerSource.incCounter(WorkerSource.WRITE_DATA_FAIL_COUNT)
(
StatusCode.PUSH_DATA_WRITE_FAIL_PRIMARY,
StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA)