This is an automated email from the ASF dual-hosted git repository. feiwang pushed a commit to branch branch-0.5 in repository https://gitbox.apache.org/repos/asf/celeborn.git
commit 5c6a2c85fcac0fc232ad7b108335d2c963a5a6ca Author: Wang, Fei <[email protected]> AuthorDate: Thu Nov 14 14:09:45 2024 +0800 [CELEBORN-1705] Fix disk buffer size is negative issue Fix disk buffer size is negative issue. Before, when writing for PartitionDataWriter with memory file storage 1. if `isMemoryShuffleFile` is true, increment the memory file storage counter 2. check if `evict` is needed, if that, flush the buffer and then set `isMemoryShuffleFile` to false 3. add data into flushBuffer 4. if memory file storage evicted, the data buffer would be released as disk buffer finally. Then the disk buffer size would be negative finally, and memory file storage would be always positive. In this PR, we update the counter after `evict` finished. After no active running application in the celeborn cluster, I found that, it is abnormal per the celeborn worker log. ``` 24/11/09 23:30:50,474 INFO [worker-memory-manager-reporter] MemoryManager: Direct memory usage: 276.0 MiB/40.0 GiB, disk buffer size: -748726.0 B, sort memory size: 0.0 B, read buffer size: 0.0 B, memory file storage size : 731.2 KiB ``` ``` disk buffer size: -748726.0 B memory file storage size : 731.2 KiB ``` Both of them are expected to be 0. No. UT and Integration testing. <img width="1895" alt="image" src="https://github.com/user-attachments/assets/231dd0cd-e44d-49f7-b18d-2a3eb4f52c3b"> Closes #2916 from turboFei/memory_disk_size. Authored-by: Wang, Fei <[email protected]> Signed-off-by: mingji <[email protected]> (cherry picked from commit b755765ac0d188d5f5756d2cf252ff62623b5e2c) Signed-off-by: mingji <[email protected]> --- .../service/deploy/worker/memory/MemoryManager.java | 2 +- .../deploy/worker/storage/PartitionDataWriter.java | 20 +++++++++++--------- .../MemoryReducePartitionDataWriterSuiteJ.java | 4 ++++ 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java index f2bf9be07..a18c80d1a 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java @@ -528,7 +528,7 @@ public class MemoryManager { return memoryFileStorageCounter.sum() < memoryFileStorageThreshold; } - public void increaseMemoryFileStorage(int bytes) { + public void incrementMemoryFileStorage(int bytes) { memoryFileStorageCounter.add(bytes); } diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java index 5a967fedb..03930bc60 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java @@ -298,15 +298,6 @@ public abstract class PartitionDataWriter implements DeviceObserver { } final int numBytes = data.readableBytes(); - if (isMemoryShuffleFile.get()) { - MemoryManager.instance().increaseMemoryFileStorage(numBytes); - } else { - MemoryManager.instance().incrementDiskBuffer(numBytes); - if (userBufferInfo != null) { - userBufferInfo.updateInfo( - System.currentTimeMillis(), new BufferStatusHub.BufferStatusNode(numBytes)); - } - } synchronized (flushLock) { if (closed) { @@ -334,6 +325,17 @@ public abstract class PartitionDataWriter implements DeviceObserver { } } + // update the disk buffer or memory file storage after evict + if (isMemoryShuffleFile.get()) { + MemoryManager.instance().incrementMemoryFileStorage(numBytes); + } else { + MemoryManager.instance().incrementDiskBuffer(numBytes); + if (userBufferInfo != null) { + userBufferInfo.updateInfo( + System.currentTimeMillis(), new BufferStatusHub.BufferStatusNode(numBytes)); + } + } + data.retain(); flushBuffer.addComponent(true, data); if (isMemoryShuffleFile.get()) { diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java index cf71200dc..9879feaf9 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java @@ -456,6 +456,7 @@ public class MemoryReducePartitionDataWriterSuiteJ { @Test public void testEvictAndChunkRead() throws Exception { final int threadsNum = 16; + final long memoryFileStorageBefore = MemoryManager.instance().getMemoryFileStorageCounter(); PartitionDataWriter partitionDataWriter = new ReducePartitionDataWriter( PartitionDataWriterSuiteUtils.prepareMemoryEvictEnvironment( @@ -526,6 +527,9 @@ public class MemoryReducePartitionDataWriterSuiteJ { result.releaseBuffers(); closeChunkServer(); + + assert storageManager.evictedFileCount().get() > 0; + assert MemoryManager.instance().getMemoryFileStorageCounter() == memoryFileStorageBefore; } @Test
