This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 8949e0e6 [#133] feat(netty): Fix IllegalReferenceCountException. (#899)
8949e0e6 is described below

commit 8949e0e64983ba2f49dc857bdea26ac6733db04b
Author: Xianming Lei <[email protected]>
AuthorDate: Tue May 23 19:05:00 2023 +0800

    [#133] feat(netty): Fix IllegalReferenceCountException. (#899)
    
    ### What changes were proposed in this pull request?
    
    The ByteBuf.release() of the block should be called after 
clearInFlushBuffer is called, otherwise there will be a certain probability of 
IllegalReferenceCountException at `ShuffleBuffer#updateShuffleData -> 
data.addComponent(true, block.getData().retain());`
    1.Fix IllegalReferenceCountException,
    2.Added ByteBuf.release() when remove resources.
    
    ### Why are the changes needed?
    
    Fix: #133
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    existing UTs.
    
    Co-authored-by: leixianming <[email protected]>
---
 .../main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java    | 5 ++++-
 .../java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java  | 1 +
 .../apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java   | 1 -
 3 files changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index fcb6567e..cbefe9f8 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -107,7 +107,10 @@ public class ShuffleBuffer {
         spBlocks,
         isValid,
         this);
-    event.addCleanupCallback(() -> 
this.clearInFlushBuffer(event.getEventId()));
+    event.addCleanupCallback(() -> {
+      this.clearInFlushBuffer(event.getEventId());
+      spBlocks.forEach(spb -> spb.getData().release());
+    });
     inFlushBlockMap.put(eventId, inFlushedQueueBlocks);
     blocks.clear();
     size = 0;
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index ca5759fa..0eaf2b07 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -563,6 +563,7 @@ public class ShuffleBufferManager {
       Collection<ShuffleBuffer> buffers = 
bufferRangeMap.asMapOfRanges().values();
       if (buffers != null) {
         for (ShuffleBuffer buffer : buffers) {
+          buffer.getBlocks().forEach(spb -> spb.getData().release());
           ShuffleServerMetrics.gaugeTotalPartitionNum.dec();
           size += buffer.getSize();
         }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
index 25f504fb..6e28cceb 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
@@ -108,7 +108,6 @@ public class LocalFileWriteHandler implements 
ShuffleWriteHandler {
         long crc = block.getCrc();
         long startOffset = dataWriter.nextOffset();
         dataWriter.writeData(ByteBufUtils.readBytes(block.getData()));
-        block.getData().release();
 
         FileBasedShuffleSegment segment = new FileBasedShuffleSegment(
             blockId, startOffset, block.getLength(), 
block.getUncompressLength(), crc, block.getTaskAttemptId());

Reply via email to