This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-4.14 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit eaf399629e0fc78c56e8e208bb9b5efc40b943d4 Author: StevenLuMT <[email protected]> AuthorDate: Tue Dec 6 15:21:21 2022 +0800 module distributedlog-core: refactor ByteBuf release usage (#3691) Co-authored-by: lushiji <[email protected]> (cherry picked from commit 7ae5a04a83984f1731781c7892613e8932d2f62e) --- .../src/main/java/org/apache/bookkeeper/client/LedgerReader.java | 3 ++- .../src/main/java/org/apache/distributedlog/EnvelopedEntry.java | 3 ++- .../java/org/apache/distributedlog/EnvelopedEntryReader.java | 3 ++- .../java/org/apache/distributedlog/EnvelopedEntryWriter.java | 8 ++++---- .../java/org/apache/distributedlog/tools/DistributedLogTool.java | 7 ++++--- .../core/src/test/java/org/apache/distributedlog/TestEntry.java | 9 +++++---- 6 files changed, 19 insertions(+), 14 deletions(-) diff --git a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java index f5850bf0ba..4fc3d05be2 100644 --- a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java +++ b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.client; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCountUtil; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.HashSet; @@ -110,7 +111,7 @@ public class LedgerReader { eid, BKException.Code.DigestMatchException, null, bookieAddress.getSocketAddress()); } finally { - buffer.release(); + ReferenceCountUtil.safeRelease(buffer); } } readResults.add(rr); diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java index 0e17929300..e0ccef949d 100644 --- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java +++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java @@ -18,6 +18,7 @@ package org.apache.distributedlog; import io.netty.buffer.ByteBuf; +import io.netty.util.ReferenceCountUtil; import java.io.IOException; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.distributedlog.io.CompressionCodec; @@ -102,7 +103,7 @@ class EnvelopedEntry { CompressionCodec codec = CompressionUtils.getCompressionCodec(Type.of(codecCode)); decompressedBuf = codec.decompress(compressedBuf, originDataLen); } finally { - compressedBuf.release(); + ReferenceCountUtil.safeRelease(compressedBuf); } return decompressedBuf; } diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java index 82656bab46..6db96e246e 100644 --- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java +++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java @@ -19,6 +19,7 @@ package org.apache.distributedlog; import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; +import io.netty.util.ReferenceCountUtil; import java.io.IOException; import javax.annotation.concurrent.NotThreadSafe; import org.apache.bookkeeper.stats.StatsLogger; @@ -80,7 +81,7 @@ class EnvelopedEntryReader implements Entry.Reader, RecordStream { private void releaseBuffer() { isExhausted = true; - this.src.release(); + ReferenceCountUtil.safeRelease(this.src); } @Override diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java index 733593ad60..e6497e9a5b 100644 --- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java +++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java @@ -210,18 +210,18 @@ class EnvelopedEntryWriter implements Writer { @Override public void completeTransmit(long lssn, long entryId) { satisfyPromises(lssn, entryId); - buffer.release(); + ReferenceCountUtil.safeRelease(buffer); synchronized (this) { - ReferenceCountUtil.release(finalizedBuffer); + ReferenceCountUtil.safeRelease(finalizedBuffer); } } @Override public void abortTransmit(Throwable reason) { cancelPromises(reason); - buffer.release(); + ReferenceCountUtil.safeRelease(buffer); synchronized (this) { - ReferenceCountUtil.release(finalizedBuffer); + ReferenceCountUtil.safeRelease(finalizedBuffer); } } } diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java index bec9abff72..c752c1cd22 100644 --- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java +++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java @@ -23,6 +23,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; +import io.netty.util.ReferenceCountUtil; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; @@ -1719,7 +1720,7 @@ import org.slf4j.LoggerFactory; .setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(segment.getVersion())) .setEntry(lastEntry.getEntryBuffer()) .buildReader(); - lastEntry.getEntryBuffer().release(); + ReferenceCountUtil.safeRelease(lastEntry.getEntryBuffer()); LogRecordWithDLSN record = reader.nextRecord(); LogRecordWithDLSN lastRecord = null; while (null != record) { @@ -2033,7 +2034,7 @@ import org.slf4j.LoggerFactory; .setEntry(rr.getValue()) .setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(metadataVersion)) .buildReader(); - rr.getValue().release(); + ReferenceCountUtil.safeRelease(rr.getValue()); printEntry(reader); } else { System.out.println("status = " + BKException.getMessage(rr.getResultCode())); @@ -2095,7 +2096,7 @@ import org.slf4j.LoggerFactory; .setEntry(entry.getEntryBuffer()) .setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(metadataVersion)) .buildReader(); - entry.getEntryBuffer().release(); + ReferenceCountUtil.safeRelease(entry.getEntryBuffer()); printEntry(reader); ++i; } diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestEntry.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestEntry.java index a7f413a788..3b0a9de9a6 100644 --- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestEntry.java +++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestEntry.java @@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue; import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; +import io.netty.util.ReferenceCountUtil; import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -72,7 +73,7 @@ public class TestEntry { Assert.assertNull("Empty record set should return null", reader.nextRecord()); assertEquals(refCnt - 1, reader.getSrcBuf().refCnt()); - buffer.release(); + ReferenceCountUtil.safeRelease(buffer); } @Test(timeout = 20000) @@ -97,7 +98,7 @@ public class TestEntry { ByteBuf buffer = writer.getBuffer(); assertEquals("zero bytes", HEADER_LENGTH, buffer.readableBytes()); - buffer.release(); + ReferenceCountUtil.safeRelease(buffer); } @Test(timeout = 20000) @@ -158,7 +159,7 @@ public class TestEntry { .setEntryId(0L) .setEnvelopeEntry(true) .buildReader(); - buffer.release(); + ReferenceCountUtil.safeRelease(buffer); LogRecordWithDLSN record = reader.nextRecord(); int numReads = 0; long expectedTxid = 0L; @@ -276,7 +277,7 @@ public class TestEntry { new DLSN(1L, 1L, 12L), 0, 0, 3, new DLSN(1L, 1L, 12L), 12L); - buffer.release(); + ReferenceCountUtil.safeRelease(buffer); } void verifyReadResult(ByteBuf data,
