This is an automated email from the ASF dual-hosted git repository.
rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 57f0f8bbc [#1930] fix(server): Fix a double release issue about
clearResource thread (#1931)
57f0f8bbc is described below
commit 57f0f8bbcd530feb6ab1284c2eed0c08b9e2c9ae
Author: maobaolong <[email protected]>
AuthorDate: Fri Aug 23 01:47:09 2024 +0800
[#1930] fix(server): Fix a double release issue about clearResource thread
(#1931)
### What changes were proposed in this pull request?
Fix double release issue about clearResource thread
### Why are the changes needed?
Fix: #1930
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No.
---
.../server/buffer/AbstractShuffleBuffer.java | 2 +-
.../uniffle/server/buffer/ShuffleBuffer.java | 2 +-
.../server/buffer/ShuffleBufferManager.java | 13 +++++++++++--
.../server/buffer/ShuffleBufferWithLinkedList.java | 22 ++++++++++++++++++++--
.../server/buffer/ShuffleBufferWithSkipList.java | 22 ++++++++++++++++++++--
5 files changed, 53 insertions(+), 8 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
index ddbeb21cf..3f4549c7b 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
@@ -37,7 +37,7 @@ import org.apache.uniffle.server.ShuffleDataFlushEvent;
public abstract class AbstractShuffleBuffer implements ShuffleBuffer {
- private static final Logger LOG =
LoggerFactory.getLogger(AbstractShuffleBuffer.class);
+ protected static final Logger LOG =
LoggerFactory.getLogger(AbstractShuffleBuffer.class);
protected long size;
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index f0d4dadb4..1b29e3b41 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -57,7 +57,7 @@ public interface ShuffleBuffer {
int getBlockCount();
- void release();
+ long release();
void clearInFlushBuffer(long eventId);
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 1b432abee..8cade02ef 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -722,9 +722,18 @@ public class ShuffleBufferManager {
Collection<ShuffleBuffer> buffers =
bufferRangeMap.asMapOfRanges().values();
if (buffers != null) {
for (ShuffleBuffer buffer : buffers) {
- buffer.release();
+ // the actual released size by this thread
+ long releasedSize = buffer.release();
ShuffleServerMetrics.gaugeTotalPartitionNum.dec();
- releaseMemory(buffer.getSize(), false, false);
+ if (releasedSize != buffer.getSize()) {
+ LOG.warn(
+ "Release shuffle buffer size {} is not equal to buffer size
{}, appId: {}, shuffleId: {}",
+ releasedSize,
+ buffer.getSize(),
+ appId,
+ shuffleId);
+ }
+ releaseMemory(releasedSize, false, false);
}
}
if (shuffleIdToSizeMap != null) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index cd9d7ab66..a9e8ddc1a 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -110,8 +110,26 @@ public class ShuffleBufferWithLinkedList extends
AbstractShuffleBuffer {
}
@Override
- public void release() {
- blocks.forEach(spb -> spb.getData().release());
+ public long release() {
+ Throwable lastException = null;
+ int failedToReleaseSize = 0;
+ long releasedSize = 0;
+ for (ShufflePartitionedBlock spb : blocks) {
+ try {
+ spb.getData().release();
+ releasedSize += spb.getSize();
+ } catch (Throwable t) {
+ lastException = t;
+ failedToReleaseSize += spb.getSize();
+ }
+ }
+ if (lastException != null) {
+ LOG.warn(
+ "Failed to release shuffle blocks with size {}. Maybe it has been
released by others.",
+ failedToReleaseSize,
+ lastException);
+ }
+ return releasedSize;
}
@Override
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index 2419a7dd4..960ab94f5 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -111,8 +111,26 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
}
@Override
- public void release() {
- blocksMap.values().forEach(spb -> spb.getData().release());
+ public long release() {
+ Throwable lastException = null;
+ int failedToReleaseSize = 0;
+ long releasedSize = 0;
+ for (ShufflePartitionedBlock spb : blocksMap.values()) {
+ try {
+ spb.getData().release();
+ releasedSize += spb.getSize();
+ } catch (Throwable t) {
+ lastException = t;
+ failedToReleaseSize += spb.getSize();
+ }
+ }
+ if (lastException != null) {
+ LOG.warn(
+ "Failed to release shuffle blocks with size (). Maybe it has been
released by others.",
+ failedToReleaseSize,
+ lastException);
+ }
+ return releasedSize;
}
@Override