This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-4.17
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.17 by this push:
     new 02a06c4be2 [fix] Fix ByteBuf release/retain in PerChannelBookClient 
(#4289)
02a06c4be2 is described below

commit 02a06c4be2cde4284e620fdb4c7800cb81d49cbf
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Thu Apr 18 06:37:29 2024 +0800

    [fix] Fix ByteBuf release/retain in PerChannelBookClient (#4289)
    
    * [fix] ByteBuf release/retain incorrect
    
    * improve the code comment
    
    * fix other cases
    
    * modify the code comment
    
    * improve the code
    
    * improve the test
    
    * add description
---
 .../bookkeeper/proto/PerChannelBookieClient.java   |  11 +-
 .../apache/bookkeeper/test/BookieClientTest.java   | 171 +++++++++++++++++++++
 2 files changed, 179 insertions(+), 3 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 5ebafe8eca..f1ff35f033 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -842,10 +842,10 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                                                    cb, ctx, ledgerId, 
entryId));
         final Channel c = channel;
         if (c == null) {
-            // usually checked in writeAndFlush, but we have extra check
-            // because we need to release toSend.
+            // Manually release the binary data(variable "request") that we 
manually created when it can not be sent out
+            // because the channel is switching.
             errorOut(completionKey);
-            ReferenceCountUtil.release(toSend);
+            ReferenceCountUtil.release(request);
             return;
         } else {
             // addEntry times out on backpressure
@@ -1180,6 +1180,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         if (channel == null) {
             LOG.warn("Operation {} failed: channel == null", 
StringUtils.requestToString(request));
             errorOut(key);
+            ReferenceCountUtil.release(request);
             return;
         }
 
@@ -1194,6 +1195,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                     StringUtils.requestToString(request));
 
             errorOut(key, BKException.Code.TooManyRequestsException);
+            ReferenceCountUtil.release(request);
             return;
         }
 
@@ -1215,6 +1217,9 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         } catch (Throwable e) {
             LOG.warn("Operation {} failed", 
StringUtils.requestToString(request), e);
             errorOut(key);
+            // If the request goes into the writeAndFlush, it should be 
handled well by Netty. So all the exceptions we
+            // get here, we can release the request.
+            ReferenceCountUtil.release(request);
         }
     }
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 60f89159a0..2fadbbd2c2 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -27,8 +27,10 @@ import static org.junit.Assert.assertTrue;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.Channel;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.ReferenceCounted;
@@ -36,16 +38,20 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.bookie.MockUncleanShutdownDetection;
 import org.apache.bookkeeper.bookie.TestBookieImpl;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeperClientStats;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.client.api.WriteFlag;
@@ -57,6 +63,7 @@ import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookieClientImpl;
+import org.apache.bookkeeper.proto.BookieProtoEncoding;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookieServer;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
@@ -64,6 +71,8 @@ import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookkeeperProtocol;
 import org.apache.bookkeeper.proto.DataFormats;
+import org.apache.bookkeeper.proto.PerChannelBookieClient;
+import org.apache.bookkeeper.proto.PerChannelBookieClientPool;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
@@ -71,6 +80,7 @@ import 
org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
 import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.IOUtils;
 import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -78,6 +88,7 @@ import org.junit.Test;
 /**
  * Test the bookie client.
  */
+@Slf4j
 public class BookieClientTest {
     BookieServer bs;
     File tmpDir;
@@ -745,4 +756,164 @@ public class BookieClientTest {
             assertTrue(Arrays.equals(kbData, bytes));
         }
     }
+
+    /**
+     * Explain the stacks of "BookieClientImpl.addEntry" here
+     * 1.`BookieClientImpl.addEntry`.
+     *   a.Retain the `ByteBuf` before get `PerChannelBookieClient`. We call 
this `ByteBuf` as `toSend` in the
+     *     following sections. `toSend.recCnf` is `2` now.
+     * 2.`Get PerChannelBookieClient`.
+     * 3.`ChannelReadyForAddEntryCallback.operationComplete`
+     *   a.`PerChannelBookieClient.addEntry`
+     *     a-1.Build a new ByteBuf for request command. We call this `ByteBuf` 
new as `request` in the following
+     *       sections.
+     *     a-2.`channle.writeAndFlush(request)` or release the ByteBuf when 
`channel` is switching.
+     *       Note the callback will be called immediately if the channel is 
switching.
+     *   b.Release the `ByteBuf` since it has been retained at `step 1`. 
`toSend.recCnf` should be `1` now.
+     */
+    public void testDataRefCnfWhenReconnect(boolean useV2WireProtocol, boolean 
smallPayload,
+                                            boolean withDelayReconnect, 
boolean withDelayAddEntry,
+                                            int tryTimes) throws Exception {
+        final long ledgerId = 1;
+        final BookieId addr = bs.getBookieId();
+        // Build passwd.
+        byte[] passwd = new byte[20];
+        Arrays.fill(passwd, (byte) 'a');
+        // Build digest manager.
+        DigestManager digestManager = DigestManager.instantiate(1, passwd,
+                
BookKeeper.DigestType.toProtoDigestType(BookKeeper.DigestType.DUMMY),
+                PooledByteBufAllocator.DEFAULT, useV2WireProtocol);
+        // Build client.
+        ClientConfiguration clientConf = new ClientConfiguration();
+        clientConf.setUseV2WireProtocol(useV2WireProtocol);
+        BookieClientImpl client = new BookieClientImpl(clientConf, 
eventLoopGroup,
+                UnpooledByteBufAllocator.DEFAULT, executor, scheduler, 
NullStatsLogger.INSTANCE,
+                BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+
+        // Inject a reconnect event.
+        // 1. Get the channel that will be used.
+        // 2. Call add entry.
+        // 3. Another thread close the channel that is using.
+        for (int i = 0; i < tryTimes; i++) {
+            long entryId = i + 1;
+            long lac = i;
+            // Build payload.
+            int payloadLen;
+            ByteBuf payload;
+            if (smallPayload) {
+                payloadLen = 1;
+                payload = PooledByteBufAllocator.DEFAULT.buffer(1);
+                payload.writeByte(1);
+            } else {
+                payloadLen = BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD;
+                payload = PooledByteBufAllocator.DEFAULT.buffer();
+                byte[] bs = new byte[payloadLen];
+                payload.writeBytes(bs);
+            }
+
+            // Digest.
+            ReferenceCounted bb = 
digestManager.computeDigestAndPackageForSending(entryId, lac,
+                    payloadLen * entryId, payload, passwd, 
BookieProtocol.FLAG_NONE);
+            log.info("Before send. bb.refCnf: {}", bb.refCnt());
+
+            // Step: get the channel that will be used.
+            PerChannelBookieClientPool perChannelBookieClientPool = 
client.lookupClient(addr);
+            AtomicReference<PerChannelBookieClient> perChannelBookieClient = 
new AtomicReference<>();
+            perChannelBookieClientPool.obtain((rc, result) -> 
perChannelBookieClient.set(result), ledgerId);
+            Awaitility.await().untilAsserted(() -> {
+                assertNotNull(perChannelBookieClient.get());
+            });
+
+            // Step: Inject a reconnect event.
+            final int delayMillis = i;
+            new Thread(() -> {
+                if (withDelayReconnect) {
+                    sleep(delayMillis);
+                }
+                Channel channel = 
WhiteboxImpl.getInternalState(perChannelBookieClient.get(), "channel");
+                if (channel != null) {
+                    channel.close();
+                }
+            }).start();
+            if (withDelayAddEntry) {
+                sleep(delayMillis);
+            }
+
+            // Step: add entry.
+            AtomicBoolean callbackExecuted = new AtomicBoolean();
+            WriteCallback callback = (rc, lId, eId, socketAddr, ctx) -> {
+                log.info("Writing is finished. rc: {}, withDelayReconnect: {}, 
withDelayAddEntry: {}, ledgerId: {},"
+                                + " entryId: {}, socketAddr: {}, ctx: {}",
+                        rc, withDelayReconnect, withDelayAddEntry, lId, eId, 
socketAddr, ctx);
+                callbackExecuted.set(true);
+            };
+            client.addEntry(addr, ledgerId, passwd, entryId, bb, callback, i, 
BookieProtocol.FLAG_NONE, false,
+                    WriteFlag.NONE);
+            // Wait for adding entry is finish.
+            Awaitility.await().untilAsserted(() -> 
assertTrue(callbackExecuted.get()));
+            // The steps have be explained on the method description.
+            // Since the step "3-a-2" always runs before the step "3-b", so 
the "callbackExecuted" will be finished
+            // before the step "3-b". Add a sleep to wait the step "3-a-2" is 
finish.
+            Thread.sleep(100);
+            // Check the ref count.
+            Awaitility.await().atMost(Duration.ofSeconds(60)).untilAsserted(() 
-> {
+                assertEquals(1, bb.refCnt());
+                // V2 will release this original data if it is a small.
+                if (!useV2WireProtocol && !smallPayload) {
+                    assertEquals(1, payload.refCnt());
+                }
+            });
+            bb.release();
+            // V2 will release this original data if it is a small.
+            if (!useV2WireProtocol && !smallPayload) {
+                payload.release();
+            }
+        }
+        // cleanup.
+        client.close();
+    }
+
+    private void sleep(int milliSeconds) {
+        try {
+            if (milliSeconds > 0) {
+                Thread.sleep(1);
+            }
+        } catch (InterruptedException e) {
+            log.warn("Error occurs", e);
+        }
+    }
+
+    /**
+     * Relate to https://github.com/apache/bookkeeper/pull/4289.
+     */
+    @Test
+    public void testDataRefCnfWhenReconnectV2() throws Exception {
+        // Large payload.
+        // Run this test may not reproduce the issue, you can reproduce the 
issue this way:
+        // 1. Add two break points.
+        //   a. At the line "Channel c = channel" in the method 
PerChannelBookieClient.addEntry.
+        //   b. At the line "channel = null" in the method 
"PerChannelBookieClient.channelInactive".
+        // 2. Make the break point b to run earlier than the break point a 
during debugging.
+        testDataRefCnfWhenReconnect(true, false, false, false, 10);
+        testDataRefCnfWhenReconnect(true, false, true, false, 10);
+        testDataRefCnfWhenReconnect(true, false, false, true, 10);
+
+        // Small payload.
+        // There is no issue without 
https://github.com/apache/bookkeeper/pull/4289, just add a test for this 
scenario.
+        testDataRefCnfWhenReconnect(true, true, false, false, 10);
+        testDataRefCnfWhenReconnect(true, true, true, false, 10);
+        testDataRefCnfWhenReconnect(true, true, false, true, 10);
+    }
+
+    /**
+     * Please see the comment of the scenario "Large payload" in the {@link 
#testDataRefCnfWhenReconnectV2()} if you
+     * can not reproduce the issue when running this test.
+     * Relate to https://github.com/apache/bookkeeper/pull/4289.
+     */
+    @Test
+    public void testDataRefCnfWhenReconnectV3() throws Exception {
+        testDataRefCnfWhenReconnect(false, true, false, false, 10);
+        testDataRefCnfWhenReconnect(false, true,  true, false, 10);
+        testDataRefCnfWhenReconnect(false, true, false, true, 10);
+    }
 }

Reply via email to