This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 4fae84051 [CELEBORN-1210] Fix potential memory leak in
PartitionFilesCleaner
4fae84051 is described below
commit 4fae840513b588cd53cd1047c342f9be5f263634
Author: Cheng Pan <[email protected]>
AuthorDate: Fri Jan 5 18:12:18 2024 +0800
[CELEBORN-1210] Fix potential memory leak in PartitionFilesCleaner
### What changes were proposed in this pull request?
1. let `cleanupExpiredShuffleKey` holds the same lock with add and cleaner
thread
2. fix the removal during the iteration
3. simply call `condition.await()` instead of while loop with
`condition.await(500ms)`
### Why are the changes needed?
The usage of `LinkedBlockingQueue queue` in `PartitionFilesCleaner` is not
a typical producer-consumer model, but an order-agnostic buffer, which confuses
me on the first round code reading.
Though `LinkedBlockingQueue` is a thread-safe collection, it means won't
get a `ConcurrentModificationException` if the queue is modified while it is
iterating, but the iteration is not guaranteed to see all queue entries. As
`cleanupExpiredShuffleKey` is not guarded by the lock, elements removal from
the cleaner thread may break the iteration and eventually cause a memory leak.
Ref:
https://stackoverflow.com/questions/37945981/concurrently-iterating-over-a-blockingqueue
Another issue is the removal inside iteration. The typical usage is
```
Iterator itr = collection.listIterator();
while (itr.hasNext()) {
if (itr.next() xxx condition) {
itr.remove();
}
}
```
The keypoint is that `itr.remove()` should be called instead of
`collection.remove(x)`
### Does this PR introduce _any_ user-facing change?
I suppose there is a potential memory leak issue on the Worker without this
patch.
### How was this patch tested?
Pass GA.
Closes #2205 from pan3793/CELEBORN-1210.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../worker/storage/PartitionFilesSorter.java | 28 +++++++++++-----------
1 file changed, 14 insertions(+), 14 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 97d0d8498..967f0db60 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -757,7 +757,7 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
class PartitionFilesCleaner {
private static final Logger logger =
LoggerFactory.getLogger(PartitionFilesCleaner.class);
- private final LinkedBlockingQueue<PartitionFilesSorter.FileSorter> queue =
+ private final LinkedBlockingQueue<PartitionFilesSorter.FileSorter>
fileSorters =
new LinkedBlockingQueue<>();
private final Lock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
@@ -769,12 +769,13 @@ class PartitionFilesCleaner {
() -> {
try {
while (!partitionFilesSorter.isShutdown()) {
+ lock.lockInterruptibly();
try {
- lock.lockInterruptibly();
- while (queue.isEmpty()) {
- notEmpty.await(500, TimeUnit.MILLISECONDS);
+ // CELEBORN-1210: use while instead of if in case of
spurious wakeup.
+ while (fileSorters.isEmpty()) {
+ notEmpty.await();
}
- Iterator<PartitionFilesSorter.FileSorter> it =
queue.iterator();
+ Iterator<PartitionFilesSorter.FileSorter> it =
fileSorters.iterator();
while (it.hasNext()) {
PartitionFilesSorter.FileSorter sorter = it.next();
try {
@@ -784,7 +785,7 @@ class PartitionFilesCleaner {
sorter.getShuffleKey(),
((DiskFileInfo)
sorter.getOriginFileInfo()).getFilePath());
sorter.deleteOriginFiles();
- queue.remove(sorter);
+ it.remove();
}
} catch (IOException e) {
logger.error("catch IOException when delete origin
files", e);
@@ -806,7 +807,7 @@ class PartitionFilesCleaner {
public void add(PartitionFilesSorter.FileSorter fileSorter) throws
InterruptedException {
lock.lockInterruptibly();
try {
- queue.add(fileSorter);
+ fileSorters.add(fileSorter);
notEmpty.signal();
} finally {
lock.unlock();
@@ -814,17 +815,16 @@ class PartitionFilesCleaner {
}
public void cleanupExpiredShuffleKey(Set<String> expiredShuffleKeys) {
- Iterator<PartitionFilesSorter.FileSorter> it = queue.iterator();
- while (it.hasNext()) {
- PartitionFilesSorter.FileSorter sorter = it.next();
- if (expiredShuffleKeys.contains(sorter.getShuffleKey())) {
- queue.remove(sorter);
- }
+ lock.lock();
+ try {
+ fileSorters.removeIf(sorter ->
expiredShuffleKeys.contains(sorter.getShuffleKey()));
+ } finally {
+ lock.unlock();
}
}
public void close() {
- queue.clear();
+ fileSorters.clear();
cleaner.interrupt();
}
}