This is an automated email from the ASF dual-hosted git repository.
lushiji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new a71c7a7e68 module distributedlog-common/distributedlog-protocol:
refactor ByteBuf release usage (#3693)
a71c7a7e68 is described below
commit a71c7a7e681e9908c04245d3d719d8f1c0a79176
Author: StevenLuMT <[email protected]>
AuthorDate: Wed Dec 7 19:48:50 2022 +0800
module distributedlog-common/distributedlog-protocol: refactor ByteBuf
release usage (#3693)
Co-authored-by: lushiji <[email protected]>
---
.../org/apache/distributedlog/io/TestCompressionCodec.java | 13 +++++++------
.../org/apache/distributedlog/EnvelopedRecordSetReader.java | 9 +++++----
.../org/apache/distributedlog/EnvelopedRecordSetWriter.java | 8 ++++----
.../src/main/java/org/apache/distributedlog/LogRecord.java | 3 ++-
4 files changed, 18 insertions(+), 15 deletions(-)
diff --git
a/stream/distributedlog/common/src/test/java/org/apache/distributedlog/io/TestCompressionCodec.java
b/stream/distributedlog/common/src/test/java/org/apache/distributedlog/io/TestCompressionCodec.java
index c8edd99e60..fa60f95d94 100644
---
a/stream/distributedlog/common/src/test/java/org/apache/distributedlog/io/TestCompressionCodec.java
+++
b/stream/distributedlog/common/src/test/java/org/apache/distributedlog/io/TestCompressionCodec.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
import java.nio.ByteBuffer;
import org.junit.Test;
@@ -70,9 +71,9 @@ public class TestCompressionCodec {
decompressedBuf.readBytes(decompressedData);
assertArrayEquals("The decompressed bytes should be same as the
original bytes",
data, decompressedData);
- buf.release();
- compressedBuf.release();
- decompressedBuf.release();
+ ReferenceCountUtil.safeRelease(buf);
+ ReferenceCountUtil.safeRelease(compressedBuf);
+ ReferenceCountUtil.safeRelease(decompressedBuf);
}
private void testCompressionCodec2(CompressionCodec codec) throws
Exception {
@@ -93,9 +94,9 @@ public class TestCompressionCodec {
byte[] decompressedData = new byte[decompressedBuf.readableBytes()];
decompressedBuf.slice().readBytes(decompressedData);
- buffer.release();
- compressedBuf.release();
- decompressedBuf.release();
+ ReferenceCountUtil.safeRelease(buffer);
+ ReferenceCountUtil.safeRelease(compressedBuf);
+ ReferenceCountUtil.safeRelease(decompressedBuf);
}
}
diff --git
a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java
b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java
index 83a950ed7c..fd2bbd4905 100644
---
a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java
+++
b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java
@@ -22,6 +22,7 @@ import static
org.apache.distributedlog.LogRecordSet.METADATA_VERSION_MASK;
import static org.apache.distributedlog.LogRecordSet.VERSION;
import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.io.CompressionCodec;
@@ -80,10 +81,10 @@ class EnvelopedRecordSetReader implements
LogRecordSet.Reader {
CompressionCodec codec =
CompressionUtils.getCompressionCodec(Type.of(codecCode));
this.reader = codec.decompress(compressedBuf, decompressedDataLen);
} finally {
- compressedBuf.release();
+ ReferenceCountUtil.safeRelease(compressedBuf);
}
if (numRecords == 0) {
- this.reader.release();
+ ReferenceCountUtil.safeRelease(this.reader);
}
}
@@ -110,7 +111,7 @@ class EnvelopedRecordSetReader implements
LogRecordSet.Reader {
// release the record set buffer when exhausting the reader
if (0 == numRecords) {
- this.reader.release();
+ ReferenceCountUtil.safeRelease(this.reader);
}
return record;
@@ -120,7 +121,7 @@ class EnvelopedRecordSetReader implements
LogRecordSet.Reader {
public void release() {
if (0 != numRecords) {
numRecords = 0;
- reader.release();
+ ReferenceCountUtil.safeRelease(reader);
}
}
}
diff --git
a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
index ea1824ef51..78d19c3989 100644
---
a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
+++
b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
@@ -157,14 +157,14 @@ class EnvelopedRecordSetWriter implements
LogRecordSet.Writer {
@Override
public synchronized void completeTransmit(long lssn, long entryId, long
startSlotId) {
satisfyPromises(lssn, entryId, startSlotId);
- buffer.release();
- ReferenceCountUtil.release(recordSetBuffer);
+ ReferenceCountUtil.safeRelease(buffer);
+ ReferenceCountUtil.safeRelease(recordSetBuffer);
}
@Override
public synchronized void abortTransmit(Throwable reason) {
cancelPromises(reason);
- buffer.release();
- ReferenceCountUtil.release(recordSetBuffer);
+ ReferenceCountUtil.safeRelease(buffer);
+ ReferenceCountUtil.safeRelease(recordSetBuffer);
}
}
diff --git
a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/LogRecord.java
b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/LogRecord.java
index aeae06b75e..157ce184ce 100644
---
a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/LogRecord.java
+++
b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/LogRecord.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
@@ -225,7 +226,7 @@ public class LogRecord {
void setPayloadBuf(ByteBuf payload, boolean copyData) {
if (null != this.payload) {
- this.payload.release();
+ ReferenceCountUtil.safeRelease(this.payload);
}
if (copyData) {
this.payload = Unpooled.copiedBuffer(payload);