This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new b755765ac [CELEBORN-1705] Fix disk buffer size is negative issue
b755765ac is described below

commit b755765ac0d188d5f5756d2cf252ff62623b5e2c
Author: Wang, Fei <[email protected]>
AuthorDate: Thu Nov 14 14:09:45 2024 +0800

    [CELEBORN-1705] Fix disk buffer size is negative issue
    
    ### What changes were proposed in this pull request?
    
    Fix disk buffer size is negative issue.
    
    Before, when writing for PartitionDataWriter with memory file storage
    1. if `isMemoryShuffleFile` is true, increment the memory file storage 
counter
    2. check if `evict` is needed, if that, flush the buffer and then set 
`isMemoryShuffleFile` to false
    3. add data into flushBuffer
    4. if memory file storage evicted, the data buffer would be released as 
disk buffer finally.
    
    Then the disk buffer size would be negative finally, and memory file 
storage would be always positive.
    
    In this PR, we update the counter after `evict` finished.
    
    ### Why are the changes needed?
    After no active running application in the celeborn cluster, I found that, 
it is abnormal per the celeborn worker log.
    ```
    24/11/09 23:30:50,474 INFO [worker-memory-manager-reporter] MemoryManager: 
Direct memory usage: 276.0 MiB/40.0 GiB, disk buffer size: -748726.0 B, sort 
memory size: 0.0 B, read buffer size: 0.0 B, memory file storage size : 731.2 
KiB
    ```
    
    ```
    disk buffer size: -748726.0 B
    memory file storage size : 731.2 KiB
    ```
    
    Both of them are expected to be 0.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    UT and Integration testing.
    
    <img width="1895" alt="image" 
src="https://github.com/user-attachments/assets/231dd0cd-e44d-49f7-b18d-2a3eb4f52c3b";>
    
    Closes #2916 from turboFei/memory_disk_size.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../service/deploy/worker/memory/MemoryManager.java    |  2 +-
 .../deploy/worker/storage/PartitionDataWriter.java     | 18 ++++++++++--------
 .../memory/MemoryReducePartitionDataWriterSuiteJ.java  |  4 ++++
 3 files changed, 15 insertions(+), 9 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index 96425ecf8..0b7adcb91 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -528,7 +528,7 @@ public class MemoryManager {
     return memoryFileStorageCounter.sum() < memoryFileStorageThreshold;
   }
 
-  public void increaseMemoryFileStorage(int bytes) {
+  public void incrementMemoryFileStorage(int bytes) {
     memoryFileStorageCounter.add(bytes);
   }
 
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index 542ccd249..af93b132a 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -316,14 +316,6 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
     }
 
     final int numBytes = data.readableBytes();
-    if (isMemoryShuffleFile.get()) {
-      MemoryManager.instance().increaseMemoryFileStorage(numBytes);
-    } else {
-      MemoryManager.instance().incrementDiskBuffer(numBytes);
-      if (userCongestionControlContext != null) {
-        userCongestionControlContext.updateProduceBytes(numBytes);
-      }
-    }
 
     synchronized (flushLock) {
       if (closed) {
@@ -351,6 +343,16 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
         }
       }
 
+      // update the disk buffer or memory file storage after evict
+      if (isMemoryShuffleFile.get()) {
+        MemoryManager.instance().incrementMemoryFileStorage(numBytes);
+      } else {
+        MemoryManager.instance().incrementDiskBuffer(numBytes);
+        if (userCongestionControlContext != null) {
+          userCongestionControlContext.updateProduceBytes(numBytes);
+        }
+      }
+
       data.retain();
       flushBuffer.addComponent(true, data);
       if (isMemoryShuffleFile.get()) {
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
index e2d9f6a41..479d19676 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
@@ -456,6 +456,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
   @Test
   public void testEvictAndChunkRead() throws Exception {
     final int threadsNum = 16;
+    final long memoryFileStorageBefore = 
MemoryManager.instance().getMemoryFileStorageCounter();
     PartitionDataWriter partitionDataWriter =
         new ReducePartitionDataWriter(
             PartitionDataWriterSuiteUtils.prepareMemoryEvictEnvironment(
@@ -526,6 +527,9 @@ public class MemoryReducePartitionDataWriterSuiteJ {
     result.releaseBuffers();
 
     closeChunkServer();
+
+    assert storageManager.evictedFileCount().get() > 0;
+    assert MemoryManager.instance().getMemoryFileStorageCounter() == 
memoryFileStorageBefore;
   }
 
   @Test

Reply via email to