This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 8949e0e6 [#133] feat(netty): Fix IllegalReferenceCountException. (#899)
8949e0e6 is described below
commit 8949e0e64983ba2f49dc857bdea26ac6733db04b
Author: Xianming Lei <[email protected]>
AuthorDate: Tue May 23 19:05:00 2023 +0800
[#133] feat(netty): Fix IllegalReferenceCountException. (#899)
### What changes were proposed in this pull request?
The ByteBuf.release() of the block should be called after
clearInFlushBuffer is called, otherwise there will be a certain probability of
IllegalReferenceCountException at `ShuffleBuffer#updateShuffleData ->
data.addComponent(true, block.getData().retain());`
1.Fix IllegalReferenceCountException,
2.Added ByteBuf.release() when remove resources.
### Why are the changes needed?
Fix: #133
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
existing UTs.
Co-authored-by: leixianming <[email protected]>
---
.../main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java | 5 ++++-
.../java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java | 1 +
.../apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java | 1 -
3 files changed, 5 insertions(+), 2 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index fcb6567e..cbefe9f8 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -107,7 +107,10 @@ public class ShuffleBuffer {
spBlocks,
isValid,
this);
- event.addCleanupCallback(() ->
this.clearInFlushBuffer(event.getEventId()));
+ event.addCleanupCallback(() -> {
+ this.clearInFlushBuffer(event.getEventId());
+ spBlocks.forEach(spb -> spb.getData().release());
+ });
inFlushBlockMap.put(eventId, inFlushedQueueBlocks);
blocks.clear();
size = 0;
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index ca5759fa..0eaf2b07 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -563,6 +563,7 @@ public class ShuffleBufferManager {
Collection<ShuffleBuffer> buffers =
bufferRangeMap.asMapOfRanges().values();
if (buffers != null) {
for (ShuffleBuffer buffer : buffers) {
+ buffer.getBlocks().forEach(spb -> spb.getData().release());
ShuffleServerMetrics.gaugeTotalPartitionNum.dec();
size += buffer.getSize();
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
index 25f504fb..6e28cceb 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
@@ -108,7 +108,6 @@ public class LocalFileWriteHandler implements
ShuffleWriteHandler {
long crc = block.getCrc();
long startOffset = dataWriter.nextOffset();
dataWriter.writeData(ByteBufUtils.readBytes(block.getData()));
- block.getData().release();
FileBasedShuffleSegment segment = new FileBasedShuffleSegment(
blockId, startOffset, block.getLength(),
block.getUncompressLength(), crc, block.getTaskAttemptId());