This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 0f0d3b6d [Improvement] Potenial memory leak when encountering disk
unhealthy (#370)
0f0d3b6d is described below
commit 0f0d3b6dfd816b81ec7767e80fc0c89624f51238
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Nov 29 14:04:26 2022 +0800
[Improvement] Potenial memory leak when encountering disk unhealthy (#370)
### What changes were proposed in this pull request?
Fix potential memory leak when encountering disk unhealthy
### Why are the changes needed?
When encountering disk unhealthy and exceed the timeout of
pendingDataShuffleFlushEvent, it will release memory. But in current codebase,
it wont release the data reference and cause the memory leak.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
No need.
---
.../apache/uniffle/server/ShuffleFlushManager.java | 20 ++++++++++++--------
1 file changed, 12 insertions(+), 8 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 45e329d1..b2a67853 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -210,12 +210,8 @@ public class ShuffleFlushManager {
// just log the error, don't throw the exception and stop the flush
thread
LOG.error("Exception happened when process flush shuffle data for " +
event, e);
} finally {
- ShuffleBuffer shuffleBuffer = event.getShuffleBuffer();
- if (shuffleBuffer != null) {
- shuffleBuffer.clearInFlushBuffer(event.getEventId());
- }
+ cleanupFlushEventData(event);
if (shuffleServer != null) {
- shuffleServer.getShuffleBufferManager().releaseMemory(event.getSize(),
true, false);
long duration = System.currentTimeMillis() - start;
if (writeSuccess) {
LOG.debug("Flush to file success in " + duration + " ms and release
" + event.getSize() + " bytes");
@@ -312,14 +308,22 @@ public class ShuffleFlushManager {
addPendingEventsInternal(event);
}
- private void dropPendingEvent(PendingShuffleFlushEvent event) {
- ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
+ private void cleanupFlushEventData(ShuffleDataFlushEvent event) {
+ ShuffleBuffer shuffleBuffer = event.getShuffleBuffer();
+ if (shuffleBuffer != null) {
+ shuffleBuffer.clearInFlushBuffer(event.getEventId());
+ }
if (shuffleServer != null) {
shuffleServer.getShuffleBufferManager().releaseMemory(
- event.getEvent().getSize(), true, false);
+ event.getSize(), true, false);
}
}
+ private void dropPendingEvent(PendingShuffleFlushEvent event) {
+ ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
+ cleanupFlushEventData(event.getEvent());
+ }
+
@VisibleForTesting
void addPendingEvents(ShuffleDataFlushEvent event) {
addPendingEventsInternal(new PendingShuffleFlushEvent(event));