This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-4.17 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.17 by this push: new 02a06c4be2 [fix] Fix ByteBuf release/retain in PerChannelBookClient (#4289) 02a06c4be2 is described below commit 02a06c4be2cde4284e620fdb4c7800cb81d49cbf Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Thu Apr 18 06:37:29 2024 +0800 [fix] Fix ByteBuf release/retain in PerChannelBookClient (#4289) * [fix] ByteBuf release/retain incorrect * improve the code comment * fix other cases * modify the code comment * improve the code * improve the test * add description --- .../bookkeeper/proto/PerChannelBookieClient.java | 11 +- .../apache/bookkeeper/test/BookieClientTest.java | 171 +++++++++++++++++++++ 2 files changed, 179 insertions(+), 3 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 5ebafe8eca..f1ff35f033 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -842,10 +842,10 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { cb, ctx, ledgerId, entryId)); final Channel c = channel; if (c == null) { - // usually checked in writeAndFlush, but we have extra check - // because we need to release toSend. + // Manually release the binary data(variable "request") that we manually created when it can not be sent out + // because the channel is switching. errorOut(completionKey); - ReferenceCountUtil.release(toSend); + ReferenceCountUtil.release(request); return; } else { // addEntry times out on backpressure @@ -1180,6 +1180,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { if (channel == null) { LOG.warn("Operation {} failed: channel == null", StringUtils.requestToString(request)); errorOut(key); + ReferenceCountUtil.release(request); return; } @@ -1194,6 +1195,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { StringUtils.requestToString(request)); errorOut(key, BKException.Code.TooManyRequestsException); + ReferenceCountUtil.release(request); return; } @@ -1215,6 +1217,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { } catch (Throwable e) { LOG.warn("Operation {} failed", StringUtils.requestToString(request), e); errorOut(key); + // If the request goes into the writeAndFlush, it should be handled well by Netty. So all the exceptions we + // get here, we can release the request. + ReferenceCountUtil.release(request); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java index 60f89159a0..2fadbbd2c2 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java @@ -27,8 +27,10 @@ import static org.junit.Assert.assertTrue; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.ReferenceCounted; @@ -36,16 +38,20 @@ import io.netty.util.concurrent.DefaultThreadFactory; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.bookie.MockUncleanShutdownDetection; import org.apache.bookkeeper.bookie.TestBookieImpl; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BKException.Code; +import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeperClientStats; import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; import org.apache.bookkeeper.client.api.WriteFlag; @@ -57,6 +63,7 @@ import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieClient; import org.apache.bookkeeper.proto.BookieClientImpl; +import org.apache.bookkeeper.proto.BookieProtoEncoding; import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback; @@ -64,6 +71,8 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.proto.BookkeeperProtocol; import org.apache.bookkeeper.proto.DataFormats; +import org.apache.bookkeeper.proto.PerChannelBookieClient; +import org.apache.bookkeeper.proto.PerChannelBookieClientPool; import org.apache.bookkeeper.proto.checksum.DigestManager; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger; @@ -71,6 +80,7 @@ import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger; import org.apache.bookkeeper.util.ByteBufList; import org.apache.bookkeeper.util.IOUtils; import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -78,6 +88,7 @@ import org.junit.Test; /** * Test the bookie client. */ +@Slf4j public class BookieClientTest { BookieServer bs; File tmpDir; @@ -745,4 +756,164 @@ public class BookieClientTest { assertTrue(Arrays.equals(kbData, bytes)); } } + + /** + * Explain the stacks of "BookieClientImpl.addEntry" here + * 1.`BookieClientImpl.addEntry`. + * a.Retain the `ByteBuf` before get `PerChannelBookieClient`. We call this `ByteBuf` as `toSend` in the + * following sections. `toSend.recCnf` is `2` now. + * 2.`Get PerChannelBookieClient`. + * 3.`ChannelReadyForAddEntryCallback.operationComplete` + * a.`PerChannelBookieClient.addEntry` + * a-1.Build a new ByteBuf for request command. We call this `ByteBuf` new as `request` in the following + * sections. + * a-2.`channle.writeAndFlush(request)` or release the ByteBuf when `channel` is switching. + * Note the callback will be called immediately if the channel is switching. + * b.Release the `ByteBuf` since it has been retained at `step 1`. `toSend.recCnf` should be `1` now. + */ + public void testDataRefCnfWhenReconnect(boolean useV2WireProtocol, boolean smallPayload, + boolean withDelayReconnect, boolean withDelayAddEntry, + int tryTimes) throws Exception { + final long ledgerId = 1; + final BookieId addr = bs.getBookieId(); + // Build passwd. + byte[] passwd = new byte[20]; + Arrays.fill(passwd, (byte) 'a'); + // Build digest manager. + DigestManager digestManager = DigestManager.instantiate(1, passwd, + BookKeeper.DigestType.toProtoDigestType(BookKeeper.DigestType.DUMMY), + PooledByteBufAllocator.DEFAULT, useV2WireProtocol); + // Build client. + ClientConfiguration clientConf = new ClientConfiguration(); + clientConf.setUseV2WireProtocol(useV2WireProtocol); + BookieClientImpl client = new BookieClientImpl(clientConf, eventLoopGroup, + UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE, + BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + + // Inject a reconnect event. + // 1. Get the channel that will be used. + // 2. Call add entry. + // 3. Another thread close the channel that is using. + for (int i = 0; i < tryTimes; i++) { + long entryId = i + 1; + long lac = i; + // Build payload. + int payloadLen; + ByteBuf payload; + if (smallPayload) { + payloadLen = 1; + payload = PooledByteBufAllocator.DEFAULT.buffer(1); + payload.writeByte(1); + } else { + payloadLen = BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD; + payload = PooledByteBufAllocator.DEFAULT.buffer(); + byte[] bs = new byte[payloadLen]; + payload.writeBytes(bs); + } + + // Digest. + ReferenceCounted bb = digestManager.computeDigestAndPackageForSending(entryId, lac, + payloadLen * entryId, payload, passwd, BookieProtocol.FLAG_NONE); + log.info("Before send. bb.refCnf: {}", bb.refCnt()); + + // Step: get the channel that will be used. + PerChannelBookieClientPool perChannelBookieClientPool = client.lookupClient(addr); + AtomicReference<PerChannelBookieClient> perChannelBookieClient = new AtomicReference<>(); + perChannelBookieClientPool.obtain((rc, result) -> perChannelBookieClient.set(result), ledgerId); + Awaitility.await().untilAsserted(() -> { + assertNotNull(perChannelBookieClient.get()); + }); + + // Step: Inject a reconnect event. + final int delayMillis = i; + new Thread(() -> { + if (withDelayReconnect) { + sleep(delayMillis); + } + Channel channel = WhiteboxImpl.getInternalState(perChannelBookieClient.get(), "channel"); + if (channel != null) { + channel.close(); + } + }).start(); + if (withDelayAddEntry) { + sleep(delayMillis); + } + + // Step: add entry. + AtomicBoolean callbackExecuted = new AtomicBoolean(); + WriteCallback callback = (rc, lId, eId, socketAddr, ctx) -> { + log.info("Writing is finished. rc: {}, withDelayReconnect: {}, withDelayAddEntry: {}, ledgerId: {}," + + " entryId: {}, socketAddr: {}, ctx: {}", + rc, withDelayReconnect, withDelayAddEntry, lId, eId, socketAddr, ctx); + callbackExecuted.set(true); + }; + client.addEntry(addr, ledgerId, passwd, entryId, bb, callback, i, BookieProtocol.FLAG_NONE, false, + WriteFlag.NONE); + // Wait for adding entry is finish. + Awaitility.await().untilAsserted(() -> assertTrue(callbackExecuted.get())); + // The steps have be explained on the method description. + // Since the step "3-a-2" always runs before the step "3-b", so the "callbackExecuted" will be finished + // before the step "3-b". Add a sleep to wait the step "3-a-2" is finish. + Thread.sleep(100); + // Check the ref count. + Awaitility.await().atMost(Duration.ofSeconds(60)).untilAsserted(() -> { + assertEquals(1, bb.refCnt()); + // V2 will release this original data if it is a small. + if (!useV2WireProtocol && !smallPayload) { + assertEquals(1, payload.refCnt()); + } + }); + bb.release(); + // V2 will release this original data if it is a small. + if (!useV2WireProtocol && !smallPayload) { + payload.release(); + } + } + // cleanup. + client.close(); + } + + private void sleep(int milliSeconds) { + try { + if (milliSeconds > 0) { + Thread.sleep(1); + } + } catch (InterruptedException e) { + log.warn("Error occurs", e); + } + } + + /** + * Relate to https://github.com/apache/bookkeeper/pull/4289. + */ + @Test + public void testDataRefCnfWhenReconnectV2() throws Exception { + // Large payload. + // Run this test may not reproduce the issue, you can reproduce the issue this way: + // 1. Add two break points. + // a. At the line "Channel c = channel" in the method PerChannelBookieClient.addEntry. + // b. At the line "channel = null" in the method "PerChannelBookieClient.channelInactive". + // 2. Make the break point b to run earlier than the break point a during debugging. + testDataRefCnfWhenReconnect(true, false, false, false, 10); + testDataRefCnfWhenReconnect(true, false, true, false, 10); + testDataRefCnfWhenReconnect(true, false, false, true, 10); + + // Small payload. + // There is no issue without https://github.com/apache/bookkeeper/pull/4289, just add a test for this scenario. + testDataRefCnfWhenReconnect(true, true, false, false, 10); + testDataRefCnfWhenReconnect(true, true, true, false, 10); + testDataRefCnfWhenReconnect(true, true, false, true, 10); + } + + /** + * Please see the comment of the scenario "Large payload" in the {@link #testDataRefCnfWhenReconnectV2()} if you + * can not reproduce the issue when running this test. + * Relate to https://github.com/apache/bookkeeper/pull/4289. + */ + @Test + public void testDataRefCnfWhenReconnectV3() throws Exception { + testDataRefCnfWhenReconnect(false, true, false, false, 10); + testDataRefCnfWhenReconnect(false, true, true, false, 10); + testDataRefCnfWhenReconnect(false, true, false, true, 10); + } }