This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 193a1e037 [CELEBORN-962] Add check DiskInfo#Status in 
PushDataHandler#checkDiskFull
193a1e037 is described below

commit 193a1e03729afaa0364dc9ac50b6f2732026715a
Author: jiaoqingbo <[email protected]>
AuthorDate: Thu Sep 14 19:53:28 2023 +0800

    [CELEBORN-962] Add check DiskInfo#Status in PushDataHandler#checkDiskFull
    
    ### What changes were proposed in this pull request?
    
    As Title
    
    ### Why are the changes needed?
    
    As Title
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    PASS GA
    
    Closes #1909 from jiaoqingbo/CELEBORN-962.
    
    Authored-by: jiaoqingbo <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit 03fc00e6a6177da3a6c8205e4b47551d4c791ca6)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../apache/celeborn/service/deploy/worker/PushDataHandler.scala  | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 36167ad47..68facb601 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -26,7 +26,7 @@ import io.netty.buffer.ByteBuf
 
 import org.apache.celeborn.common.exception.{AlreadyClosedException, 
CelebornIOException}
 import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.meta.{WorkerInfo, 
WorkerPartitionLocationInfo}
+import org.apache.celeborn.common.meta.{DiskStatus, WorkerInfo, 
WorkerPartitionLocationInfo}
 import org.apache.celeborn.common.metrics.source.Source
 import org.apache.celeborn.common.network.buffer.{NettyManagedBuffer, 
NioManagedBuffer}
 import org.apache.celeborn.common.network.client.{RpcResponseCallback, 
TransportClient, TransportClientFactory}
@@ -1105,9 +1105,12 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
     if (fileWriter.flusher.isInstanceOf[HdfsFlusher]) {
       return false
     }
-    val diskFull = workerInfo.diskInfos
+    val diskInfo = workerInfo.diskInfos
       .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
-      .actualUsableSpace < diskReserveSize
+
+    val diskFull = diskInfo.status.equals(
+      DiskStatus.HIGH_DISK_USAGE) || diskInfo.actualUsableSpace < 
diskReserveSize
+
     diskFull
   }
 

Reply via email to