This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 4fae84051 [CELEBORN-1210] Fix potential memory leak in 
PartitionFilesCleaner
4fae84051 is described below

commit 4fae840513b588cd53cd1047c342f9be5f263634
Author: Cheng Pan <[email protected]>
AuthorDate: Fri Jan 5 18:12:18 2024 +0800

    [CELEBORN-1210] Fix potential memory leak in PartitionFilesCleaner
    
    ### What changes were proposed in this pull request?
    
    1. let `cleanupExpiredShuffleKey` holds the same lock with add and cleaner 
thread
    2. fix the removal during the iteration
    3. simply call `condition.await()` instead of while loop with 
`condition.await(500ms)`
    
    ### Why are the changes needed?
    
    The usage of `LinkedBlockingQueue queue` in `PartitionFilesCleaner` is not 
a typical producer-consumer model, but an order-agnostic buffer, which confuses 
me on the first round code reading.
    
    Though `LinkedBlockingQueue` is a thread-safe collection, it means won't 
get a `ConcurrentModificationException` if the queue is modified while it is 
iterating, but the iteration is not guaranteed to see all queue entries. As 
`cleanupExpiredShuffleKey` is not guarded by the lock, elements removal from 
the cleaner thread may break the iteration and eventually cause a memory leak.
    
    Ref: 
https://stackoverflow.com/questions/37945981/concurrently-iterating-over-a-blockingqueue
    
    Another issue is the removal inside iteration. The typical usage is
    ```
    Iterator itr = collection.listIterator();
    while (itr.hasNext()) {
        if (itr.next() xxx condition) {
            itr.remove();
        }
    }
    ```
    The keypoint is that `itr.remove()` should be called instead of 
`collection.remove(x)`
    
    ### Does this PR introduce _any_ user-facing change?
    
    I suppose there is a potential memory leak issue on the Worker without this 
patch.
    
    ### How was this patch tested?
    
    Pass GA.
    
    Closes #2205 from pan3793/CELEBORN-1210.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../worker/storage/PartitionFilesSorter.java       | 28 +++++++++++-----------
 1 file changed, 14 insertions(+), 14 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 97d0d8498..967f0db60 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -757,7 +757,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
 class PartitionFilesCleaner {
   private static final Logger logger = 
LoggerFactory.getLogger(PartitionFilesCleaner.class);
 
-  private final LinkedBlockingQueue<PartitionFilesSorter.FileSorter> queue =
+  private final LinkedBlockingQueue<PartitionFilesSorter.FileSorter> 
fileSorters =
       new LinkedBlockingQueue<>();
   private final Lock lock = new ReentrantLock();
   private final Condition notEmpty = lock.newCondition();
@@ -769,12 +769,13 @@ class PartitionFilesCleaner {
             () -> {
               try {
                 while (!partitionFilesSorter.isShutdown()) {
+                  lock.lockInterruptibly();
                   try {
-                    lock.lockInterruptibly();
-                    while (queue.isEmpty()) {
-                      notEmpty.await(500, TimeUnit.MILLISECONDS);
+                    // CELEBORN-1210: use while instead of if in case of 
spurious wakeup.
+                    while (fileSorters.isEmpty()) {
+                      notEmpty.await();
                     }
-                    Iterator<PartitionFilesSorter.FileSorter> it = 
queue.iterator();
+                    Iterator<PartitionFilesSorter.FileSorter> it = 
fileSorters.iterator();
                     while (it.hasNext()) {
                       PartitionFilesSorter.FileSorter sorter = it.next();
                       try {
@@ -784,7 +785,7 @@ class PartitionFilesCleaner {
                               sorter.getShuffleKey(),
                               ((DiskFileInfo) 
sorter.getOriginFileInfo()).getFilePath());
                           sorter.deleteOriginFiles();
-                          queue.remove(sorter);
+                          it.remove();
                         }
                       } catch (IOException e) {
                         logger.error("catch IOException when delete origin 
files", e);
@@ -806,7 +807,7 @@ class PartitionFilesCleaner {
   public void add(PartitionFilesSorter.FileSorter fileSorter) throws 
InterruptedException {
     lock.lockInterruptibly();
     try {
-      queue.add(fileSorter);
+      fileSorters.add(fileSorter);
       notEmpty.signal();
     } finally {
       lock.unlock();
@@ -814,17 +815,16 @@ class PartitionFilesCleaner {
   }
 
   public void cleanupExpiredShuffleKey(Set<String> expiredShuffleKeys) {
-    Iterator<PartitionFilesSorter.FileSorter> it = queue.iterator();
-    while (it.hasNext()) {
-      PartitionFilesSorter.FileSorter sorter = it.next();
-      if (expiredShuffleKeys.contains(sorter.getShuffleKey())) {
-        queue.remove(sorter);
-      }
+    lock.lock();
+    try {
+      fileSorters.removeIf(sorter -> 
expiredShuffleKeys.contains(sorter.getShuffleKey()));
+    } finally {
+      lock.unlock();
     }
   }
 
   public void close() {
-    queue.clear();
+    fileSorters.clear();
     cleaner.interrupt();
   }
 }

Reply via email to