This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 193a1e037 [CELEBORN-962] Add check DiskInfo#Status in
PushDataHandler#checkDiskFull
193a1e037 is described below
commit 193a1e03729afaa0364dc9ac50b6f2732026715a
Author: jiaoqingbo <[email protected]>
AuthorDate: Thu Sep 14 19:53:28 2023 +0800
[CELEBORN-962] Add check DiskInfo#Status in PushDataHandler#checkDiskFull
### What changes were proposed in this pull request?
As Title
### Why are the changes needed?
As Title
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
PASS GA
Closes #1909 from jiaoqingbo/CELEBORN-962.
Authored-by: jiaoqingbo <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit 03fc00e6a6177da3a6c8205e4b47551d4c791ca6)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../apache/celeborn/service/deploy/worker/PushDataHandler.scala | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 36167ad47..68facb601 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -26,7 +26,7 @@ import io.netty.buffer.ByteBuf
import org.apache.celeborn.common.exception.{AlreadyClosedException,
CelebornIOException}
import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.meta.{WorkerInfo,
WorkerPartitionLocationInfo}
+import org.apache.celeborn.common.meta.{DiskStatus, WorkerInfo,
WorkerPartitionLocationInfo}
import org.apache.celeborn.common.metrics.source.Source
import org.apache.celeborn.common.network.buffer.{NettyManagedBuffer,
NioManagedBuffer}
import org.apache.celeborn.common.network.client.{RpcResponseCallback,
TransportClient, TransportClientFactory}
@@ -1105,9 +1105,12 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
if (fileWriter.flusher.isInstanceOf[HdfsFlusher]) {
return false
}
- val diskFull = workerInfo.diskInfos
+ val diskInfo = workerInfo.diskInfos
.get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
- .actualUsableSpace < diskReserveSize
+
+ val diskFull = diskInfo.status.equals(
+ DiskStatus.HIGH_DISK_USAGE) || diskInfo.actualUsableSpace <
diskReserveSize
+
diskFull
}