This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new b755765ac [CELEBORN-1705] Fix disk buffer size is negative issue
b755765ac is described below
commit b755765ac0d188d5f5756d2cf252ff62623b5e2c
Author: Wang, Fei <[email protected]>
AuthorDate: Thu Nov 14 14:09:45 2024 +0800
[CELEBORN-1705] Fix disk buffer size is negative issue
### What changes were proposed in this pull request?
Fix disk buffer size is negative issue.
Before, when writing for PartitionDataWriter with memory file storage
1. if `isMemoryShuffleFile` is true, increment the memory file storage
counter
2. check if `evict` is needed, if that, flush the buffer and then set
`isMemoryShuffleFile` to false
3. add data into flushBuffer
4. if memory file storage evicted, the data buffer would be released as
disk buffer finally.
Then the disk buffer size would be negative finally, and memory file
storage would be always positive.
In this PR, we update the counter after `evict` finished.
### Why are the changes needed?
After no active running application in the celeborn cluster, I found that,
it is abnormal per the celeborn worker log.
```
24/11/09 23:30:50,474 INFO [worker-memory-manager-reporter] MemoryManager:
Direct memory usage: 276.0 MiB/40.0 GiB, disk buffer size: -748726.0 B, sort
memory size: 0.0 B, read buffer size: 0.0 B, memory file storage size : 731.2
KiB
```
```
disk buffer size: -748726.0 B
memory file storage size : 731.2 KiB
```
Both of them are expected to be 0.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT and Integration testing.
<img width="1895" alt="image"
src="https://github.com/user-attachments/assets/231dd0cd-e44d-49f7-b18d-2a3eb4f52c3b">
Closes #2916 from turboFei/memory_disk_size.
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../service/deploy/worker/memory/MemoryManager.java | 2 +-
.../deploy/worker/storage/PartitionDataWriter.java | 18 ++++++++++--------
.../memory/MemoryReducePartitionDataWriterSuiteJ.java | 4 ++++
3 files changed, 15 insertions(+), 9 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index 96425ecf8..0b7adcb91 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -528,7 +528,7 @@ public class MemoryManager {
return memoryFileStorageCounter.sum() < memoryFileStorageThreshold;
}
- public void increaseMemoryFileStorage(int bytes) {
+ public void incrementMemoryFileStorage(int bytes) {
memoryFileStorageCounter.add(bytes);
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index 542ccd249..af93b132a 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -316,14 +316,6 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
}
final int numBytes = data.readableBytes();
- if (isMemoryShuffleFile.get()) {
- MemoryManager.instance().increaseMemoryFileStorage(numBytes);
- } else {
- MemoryManager.instance().incrementDiskBuffer(numBytes);
- if (userCongestionControlContext != null) {
- userCongestionControlContext.updateProduceBytes(numBytes);
- }
- }
synchronized (flushLock) {
if (closed) {
@@ -351,6 +343,16 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
}
}
+ // update the disk buffer or memory file storage after evict
+ if (isMemoryShuffleFile.get()) {
+ MemoryManager.instance().incrementMemoryFileStorage(numBytes);
+ } else {
+ MemoryManager.instance().incrementDiskBuffer(numBytes);
+ if (userCongestionControlContext != null) {
+ userCongestionControlContext.updateProduceBytes(numBytes);
+ }
+ }
+
data.retain();
flushBuffer.addComponent(true, data);
if (isMemoryShuffleFile.get()) {
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
index e2d9f6a41..479d19676 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
@@ -456,6 +456,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
@Test
public void testEvictAndChunkRead() throws Exception {
final int threadsNum = 16;
+ final long memoryFileStorageBefore =
MemoryManager.instance().getMemoryFileStorageCounter();
PartitionDataWriter partitionDataWriter =
new ReducePartitionDataWriter(
PartitionDataWriterSuiteUtils.prepareMemoryEvictEnvironment(
@@ -526,6 +527,9 @@ public class MemoryReducePartitionDataWriterSuiteJ {
result.releaseBuffers();
closeChunkServer();
+
+ assert storageManager.evictedFileCount().get() > 0;
+ assert MemoryManager.instance().getMemoryFileStorageCounter() ==
memoryFileStorageBefore;
}
@Test