This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 09d0ac6c8 [CELEBORN-1718] Fix memory storage file won't hard split 
when memory file is full and worker has no disks
09d0ac6c8 is described below

commit 09d0ac6c81a6900607e2f038ee84851a491b48c4
Author: mingji <[email protected]>
AuthorDate: Fri Nov 15 19:24:25 2024 +0800

    [CELEBORN-1718] Fix memory storage file won't hard split when memory file 
is full and worker has no disks
    
    ### What changes were proposed in this pull request?
    Return hard split if a memory storage file is full and the worker has no 
disks.
    
    ### Why are the changes needed?
    In current implementation, a task might be hang because the worker rejected 
the shuffle data and returned nothing.
    
    ### Does this PR introduce _any_ user-facing change?
    NO.
    
    ### How was this patch tested?
    GA.
    
    Closes #2918 from FMX/b1718.
    
    Authored-by: mingji <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
    (cherry picked from commit 11a4007965ba23868ce6280688ae221947161749)
    Signed-off-by: SteNicholas <[email protected]>
---
 .../org/apache/celeborn/service/deploy/worker/PushDataHandler.scala  | 5 +++++
 1 file changed, 5 insertions(+)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 1b93014bc..7f044084c 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -1224,8 +1224,13 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
          |fileName:${fileWriter.getCurrentFileInfo.getFilePath}
          |""".stripMargin)
     if (fileWriter.needHardSplitForMemoryShuffleStorage()) {
+      workerSource.incCounter(WorkerSource.WRITE_DATA_HARD_SPLIT_COUNT)
+      
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
+      logInfo(
+        s"Do hardSplit for memory shuffle file 
fileLength:${fileWriter.getMemoryFileInfo.getFileLength}")
       return true
     }
+
     val diskFileInfo = fileWriter.getDiskFileInfo
     if (diskFileInfo != null) {
       if (workerPartitionSplitEnabled && ((diskFull && 
diskFileInfo.getFileLength > partitionSplitMinimumSize) ||

Reply via email to