This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 11a400796 [CELEBORN-1718] Fix memory storage file won't hard split
when memory file is full and worker has no disks
11a400796 is described below
commit 11a4007965ba23868ce6280688ae221947161749
Author: mingji <[email protected]>
AuthorDate: Fri Nov 15 19:24:25 2024 +0800
[CELEBORN-1718] Fix memory storage file won't hard split when memory file
is full and worker has no disks
### What changes were proposed in this pull request?
Return hard split if a memory storage file is full and the worker has no
disks.
### Why are the changes needed?
In current implementation, a task might be hang because the worker rejected
the shuffle data and returned nothing.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA.
Closes #2918 from FMX/b1718.
Authored-by: mingji <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../org/apache/celeborn/service/deploy/worker/PushDataHandler.scala | 5 +++++
1 file changed, 5 insertions(+)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 592e93f09..b6ddff261 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -1251,8 +1251,13 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
|fileName:${fileWriter.getCurrentFileInfo.getFilePath}
|""".stripMargin)
if (fileWriter.needHardSplitForMemoryShuffleStorage()) {
+ workerSource.incCounter(WorkerSource.WRITE_DATA_HARD_SPLIT_COUNT)
+
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
+ logInfo(
+ s"Do hardSplit for memory shuffle file
fileLength:${fileWriter.getMemoryFileInfo.getFileLength}")
return true
}
+
val diskFileInfo = fileWriter.getDiskFileInfo
if (diskFileInfo != null) {
if (workerPartitionSplitEnabled && ((diskFull &&
diskFileInfo.getFileLength > partitionSplitMinimumSize) ||