This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-4.14 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 8e352d4abea2fee63d3eb807528fdf370d3364a9 Author: StevenLuMT <[email protected]> AuthorDate: Wed Dec 7 19:48:50 2022 +0800 module distributedlog-common/distributedlog-protocol: refactor ByteBuf release usage (#3693) Co-authored-by: lushiji <[email protected]> (cherry picked from commit a71c7a7e681e9908c04245d3d719d8f1c0a79176) --- .../org/apache/distributedlog/io/TestCompressionCodec.java | 13 +++++++------ .../org/apache/distributedlog/EnvelopedRecordSetReader.java | 9 +++++---- .../org/apache/distributedlog/EnvelopedRecordSetWriter.java | 8 ++++---- .../src/main/java/org/apache/distributedlog/LogRecord.java | 3 ++- 4 files changed, 18 insertions(+), 15 deletions(-) diff --git a/stream/distributedlog/common/src/test/java/org/apache/distributedlog/io/TestCompressionCodec.java b/stream/distributedlog/common/src/test/java/org/apache/distributedlog/io/TestCompressionCodec.java index c8edd99e60..fa60f95d94 100644 --- a/stream/distributedlog/common/src/test/java/org/apache/distributedlog/io/TestCompressionCodec.java +++ b/stream/distributedlog/common/src/test/java/org/apache/distributedlog/io/TestCompressionCodec.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCountUtil; import java.nio.ByteBuffer; import org.junit.Test; @@ -70,9 +71,9 @@ public class TestCompressionCodec { decompressedBuf.readBytes(decompressedData); assertArrayEquals("The decompressed bytes should be same as the original bytes", data, decompressedData); - buf.release(); - compressedBuf.release(); - decompressedBuf.release(); + ReferenceCountUtil.safeRelease(buf); + ReferenceCountUtil.safeRelease(compressedBuf); + ReferenceCountUtil.safeRelease(decompressedBuf); } private void testCompressionCodec2(CompressionCodec codec) throws Exception { @@ -93,9 +94,9 @@ public class TestCompressionCodec { byte[] decompressedData = new byte[decompressedBuf.readableBytes()]; decompressedBuf.slice().readBytes(decompressedData); - buffer.release(); - compressedBuf.release(); - decompressedBuf.release(); + ReferenceCountUtil.safeRelease(buffer); + ReferenceCountUtil.safeRelease(compressedBuf); + ReferenceCountUtil.safeRelease(decompressedBuf); } } diff --git a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java index 83a950ed7c..fd2bbd4905 100644 --- a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java +++ b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java @@ -22,6 +22,7 @@ import static org.apache.distributedlog.LogRecordSet.METADATA_VERSION_MASK; import static org.apache.distributedlog.LogRecordSet.VERSION; import io.netty.buffer.ByteBuf; +import io.netty.util.ReferenceCountUtil; import java.io.IOException; import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.io.CompressionCodec; @@ -80,10 +81,10 @@ class EnvelopedRecordSetReader implements LogRecordSet.Reader { CompressionCodec codec = CompressionUtils.getCompressionCodec(Type.of(codecCode)); this.reader = codec.decompress(compressedBuf, decompressedDataLen); } finally { - compressedBuf.release(); + ReferenceCountUtil.safeRelease(compressedBuf); } if (numRecords == 0) { - this.reader.release(); + ReferenceCountUtil.safeRelease(this.reader); } } @@ -110,7 +111,7 @@ class EnvelopedRecordSetReader implements LogRecordSet.Reader { // release the record set buffer when exhausting the reader if (0 == numRecords) { - this.reader.release(); + ReferenceCountUtil.safeRelease(this.reader); } return record; @@ -120,7 +121,7 @@ class EnvelopedRecordSetReader implements LogRecordSet.Reader { public void release() { if (0 != numRecords) { numRecords = 0; - reader.release(); + ReferenceCountUtil.safeRelease(reader); } } } diff --git a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java index ea1824ef51..78d19c3989 100644 --- a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java +++ b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java @@ -157,14 +157,14 @@ class EnvelopedRecordSetWriter implements LogRecordSet.Writer { @Override public synchronized void completeTransmit(long lssn, long entryId, long startSlotId) { satisfyPromises(lssn, entryId, startSlotId); - buffer.release(); - ReferenceCountUtil.release(recordSetBuffer); + ReferenceCountUtil.safeRelease(buffer); + ReferenceCountUtil.safeRelease(recordSetBuffer); } @Override public synchronized void abortTransmit(Throwable reason) { cancelPromises(reason); - buffer.release(); - ReferenceCountUtil.release(recordSetBuffer); + ReferenceCountUtil.safeRelease(buffer); + ReferenceCountUtil.safeRelease(recordSetBuffer); } } diff --git a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/LogRecord.java b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/LogRecord.java index 63b694aa31..ee98bf5f32 100644 --- a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/LogRecord.java +++ b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/LogRecord.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCountUtil; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -225,7 +226,7 @@ public class LogRecord { void setPayloadBuf(ByteBuf payload, boolean copyData) { if (null != this.payload) { - this.payload.release(); + ReferenceCountUtil.safeRelease(this.payload); } if (copyData) { this.payload = Unpooled.copiedBuffer(payload);
