This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 28b2f56e1 [CELEBORN-1705] Fix disk buffer size is negative issue
28b2f56e1 is described below

commit 28b2f56e185882a73012f9c8317b5230a69b7188
Author: Wang, Fei <[email protected]>
AuthorDate: Thu Nov 14 14:09:45 2024 +0800

    [CELEBORN-1705] Fix disk buffer size is negative issue
    
    Fix disk buffer size is negative issue.
    
    Before, when writing for PartitionDataWriter with memory file storage
    1. if `isMemoryShuffleFile` is true, increment the memory file storage 
counter
    2. check if `evict` is needed, if that, flush the buffer and then set 
`isMemoryShuffleFile` to false
    3. add data into flushBuffer
    4. if memory file storage evicted, the data buffer would be released as 
disk buffer finally.
    
    Then the disk buffer size would be negative finally, and memory file 
storage would be always positive.
    
    In this PR, we update the counter after `evict` finished.
    
    After no active running application in the celeborn cluster, I found that, 
it is abnormal per the celeborn worker log.
    ```
    24/11/09 23:30:50,474 INFO [worker-memory-manager-reporter] MemoryManager: 
Direct memory usage: 276.0 MiB/40.0 GiB, disk buffer size: -748726.0 B, sort 
memory size: 0.0 B, read buffer size: 0.0 B, memory file storage size : 731.2 
KiB
    ```
    
    ```
    disk buffer size: -748726.0 B
    memory file storage size : 731.2 KiB
    ```
    
    Both of them are expected to be 0.
    
    No.
    
    UT and Integration testing.
    
    <img width="1895" alt="image" 
src="https://github.com/user-attachments/assets/231dd0cd-e44d-49f7-b18d-2a3eb4f52c3b";>
    
    Closes #2916 from turboFei/memory_disk_size.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: mingji <[email protected]>
    (cherry picked from commit b755765ac0d188d5f5756d2cf252ff62623b5e2c)
    Signed-off-by: mingji <[email protected]>
---
 .../service/deploy/worker/memory/MemoryManager.java   |  2 +-
 .../deploy/worker/storage/PartitionDataWriter.java    | 19 ++++++++++---------
 .../memory/MemoryReducePartitionDataWriterSuiteJ.java |  4 ++++
 3 files changed, 15 insertions(+), 10 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index f2bf9be07..a18c80d1a 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -528,7 +528,7 @@ public class MemoryManager {
     return memoryFileStorageCounter.sum() < memoryFileStorageThreshold;
   }
 
-  public void increaseMemoryFileStorage(int bytes) {
+  public void incrementMemoryFileStorage(int bytes) {
     memoryFileStorageCounter.add(bytes);
   }
 
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index 5a967fedb..fab36fcd9 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -298,15 +298,6 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
     }
 
     final int numBytes = data.readableBytes();
-    if (isMemoryShuffleFile.get()) {
-      MemoryManager.instance().increaseMemoryFileStorage(numBytes);
-    } else {
-      MemoryManager.instance().incrementDiskBuffer(numBytes);
-      if (userBufferInfo != null) {
-        userBufferInfo.updateInfo(
-            System.currentTimeMillis(), new 
BufferStatusHub.BufferStatusNode(numBytes));
-      }
-    }
 
     synchronized (flushLock) {
       if (closed) {
@@ -334,6 +325,16 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
         }
       }
 
+      // update the disk buffer or memory file storage after evict
+      if (isMemoryShuffleFile.get()) {
+        MemoryManager.instance().incrementMemoryFileStorage(numBytes);
+      } else {
+        MemoryManager.instance().incrementDiskBuffer(numBytes);
+        if (userCongestionControlContext != null) {
+          userCongestionControlContext.updateProduceBytes(numBytes);
+        }
+      }
+
       data.retain();
       flushBuffer.addComponent(true, data);
       if (isMemoryShuffleFile.get()) {
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
index cf71200dc..9879feaf9 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
@@ -456,6 +456,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
   @Test
   public void testEvictAndChunkRead() throws Exception {
     final int threadsNum = 16;
+    final long memoryFileStorageBefore = 
MemoryManager.instance().getMemoryFileStorageCounter();
     PartitionDataWriter partitionDataWriter =
         new ReducePartitionDataWriter(
             PartitionDataWriterSuiteUtils.prepareMemoryEvictEnvironment(
@@ -526,6 +527,9 @@ public class MemoryReducePartitionDataWriterSuiteJ {
     result.releaseBuffers();
 
     closeChunkServer();
+
+    assert storageManager.evictedFileCount().get() > 0;
+    assert MemoryManager.instance().getMemoryFileStorageCounter() == 
memoryFileStorageBefore;
   }
 
   @Test

Reply via email to