This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f0d3b6d [Improvement] Potenial memory leak when encountering disk 
unhealthy (#370)
0f0d3b6d is described below

commit 0f0d3b6dfd816b81ec7767e80fc0c89624f51238
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Nov 29 14:04:26 2022 +0800

    [Improvement] Potenial memory leak when encountering disk unhealthy (#370)
    
    ### What changes were proposed in this pull request?
    
    Fix potential memory leak when encountering disk unhealthy
    
    ### Why are the changes needed?
    
    When encountering disk unhealthy and exceed the timeout of 
pendingDataShuffleFlushEvent, it will release memory. But in current codebase, 
it wont release the data reference and cause the memory leak.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    No need.
---
 .../apache/uniffle/server/ShuffleFlushManager.java   | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 45e329d1..b2a67853 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -210,12 +210,8 @@ public class ShuffleFlushManager {
       // just log the error, don't throw the exception and stop the flush 
thread
       LOG.error("Exception happened when process flush shuffle data for " + 
event, e);
     } finally {
-      ShuffleBuffer shuffleBuffer = event.getShuffleBuffer();
-      if (shuffleBuffer != null) {
-        shuffleBuffer.clearInFlushBuffer(event.getEventId());
-      }
+      cleanupFlushEventData(event);
       if (shuffleServer != null) {
-        shuffleServer.getShuffleBufferManager().releaseMemory(event.getSize(), 
true, false);
         long duration = System.currentTimeMillis() - start;
         if (writeSuccess) {
           LOG.debug("Flush to file success in " + duration + " ms and release 
" + event.getSize() + " bytes");
@@ -312,14 +308,22 @@ public class ShuffleFlushManager {
     addPendingEventsInternal(event);
   }
 
-  private void dropPendingEvent(PendingShuffleFlushEvent event) {
-    ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
+  private void cleanupFlushEventData(ShuffleDataFlushEvent event) {
+    ShuffleBuffer shuffleBuffer = event.getShuffleBuffer();
+    if (shuffleBuffer != null) {
+      shuffleBuffer.clearInFlushBuffer(event.getEventId());
+    }
     if (shuffleServer != null) {
       shuffleServer.getShuffleBufferManager().releaseMemory(
-          event.getEvent().getSize(), true, false);
+          event.getSize(), true, false);
     }
   }
 
+  private void dropPendingEvent(PendingShuffleFlushEvent event) {
+    ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
+    cleanupFlushEventData(event.getEvent());
+  }
+
   @VisibleForTesting
   void addPendingEvents(ShuffleDataFlushEvent event) {
     addPendingEventsInternal(new PendingShuffleFlushEvent(event));

Reply via email to