This is an automated email from the ASF dual-hosted git repository.

shoothzj pushed a commit to branch branch-4.16
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.16 by this push:
     new 6f30f00067 Fix TLS stability issues with V2 protocol that caused data 
corruption (#4404)
6f30f00067 is described below

commit 6f30f00067f92ab259b07bdb63bc61ff7c381e3a
Author: Lari Hotari <[email protected]>
AuthorDate: Wed May 29 09:49:18 2024 +0300

    Fix TLS stability issues with V2 protocol that caused data corruption 
(#4404)
    
    * Fix TLS stability issues with V2 protocol that caused data corruption
    - add the TLS handler after the FlushConsolidationHandler
      - This makes TLS connections from Pulsar Broker to Bookkeeper stable
        when bookkeeperUseV2WireProtocol=true is used
    - Fix test TestTLS for V2
    - Fix inconsistency in client configuration in BookKeeperClusterTestCase
    
    (cherry picked from commit 5f73147a2803a5147d9d9ba2d28eaa6c79c998a3)
---
 .../org/apache/bookkeeper/proto/BookieNettyServer.java   |  3 ++-
 .../apache/bookkeeper/proto/BookieRequestProcessor.java  |  9 ++++++++-
 .../apache/bookkeeper/proto/PerChannelBookieClient.java  | 16 ++++++++++++----
 .../bookkeeper/test/BookKeeperClusterTestCase.java       |  2 +-
 .../src/test/java/org/apache/bookkeeper/tls/TestTLS.java | 13 ++++---------
 5 files changed, 27 insertions(+), 16 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index a834208ce0..a7e919c826 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -92,6 +92,7 @@ import org.slf4j.LoggerFactory;
 class BookieNettyServer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(BookieNettyServer.class);
+    public static final String CONSOLIDATION_HANDLER_NAME = "consolidation";
 
     final int maxFrameSize;
     final ServerConfiguration conf;
@@ -344,7 +345,7 @@ class BookieNettyServer {
                         new BookieSideConnectionPeerContextHandler();
                     ChannelPipeline pipeline = ch.pipeline();
 
-                    pipeline.addLast("consolidation", new 
FlushConsolidationHandler(1024, true));
+                    pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new 
FlushConsolidationHandler(1024, true));
 
                     pipeline.addLast("bytebufList", ByteBufList.ENCODER);
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index a77b3d7bb5..0d9d61f634 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -66,6 +66,7 @@ import org.slf4j.MDC;
 public class BookieRequestProcessor implements RequestProcessor {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(BookieRequestProcessor.class);
+    public static final String TLS_HANDLER_NAME = "tls";
 
     /**
      * The server configuration. We use this for getting the number of add and 
read
@@ -576,9 +577,15 @@ public class BookieRequestProcessor implements 
RequestProcessor {
             response.setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
             writeAndFlush(c, response.build());
         } else {
+            LOG.info("Starting TLS handshake with client on channel {}", c);
             // there is no need to execute in a different thread as this 
operation is light
             SslHandler sslHandler = shFactory.newTLSHandler();
-            c.pipeline().addFirst("tls", sslHandler);
+            if 
(c.pipeline().names().contains(BookieNettyServer.CONSOLIDATION_HANDLER_NAME)) {
+                
c.pipeline().addAfter(BookieNettyServer.CONSOLIDATION_HANDLER_NAME, 
TLS_HANDLER_NAME, sslHandler);
+            } else {
+                // local transport doesn't contain FlushConsolidationHandler
+                c.pipeline().addFirst(TLS_HANDLER_NAME, sslHandler);
+            }
 
             response.setStatus(BookkeeperProtocol.StatusCode.EOK);
             BookkeeperProtocol.StartTLSResponse.Builder builder = 
BookkeeperProtocol.StartTLSResponse.newBuilder();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index ca1448c768..149f97fc28 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -174,6 +174,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                         BKException.Code.WriteOnReadOnlyBookieException));
     private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100; // We may add 
finer grained priority later.
     private static final AtomicLong txnIdGenerator = new AtomicLong(0);
+    static final String CONSOLIDATION_HANDLER_NAME = "consolidation";
 
     final BookieId bookieId;
     final BookieAddressResolver bookieAddressResolver;
@@ -594,7 +595,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
             @Override
             protected void initChannel(Channel ch) throws Exception {
                 ChannelPipeline pipeline = ch.pipeline();
-                pipeline.addLast("consolidation", new 
FlushConsolidationHandler(1024, true));
+                pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new 
FlushConsolidationHandler(1024, true));
                 pipeline.addLast("bytebufList", ByteBufList.ENCODER);
                 pipeline.addLast("lengthbasedframedecoder",
                         new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 
0, 4));
@@ -1522,9 +1523,16 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         } else {
             throw new RuntimeException("Unexpected socket address type");
         }
-        SslHandler handler = 
parentObj.shFactory.newTLSHandler(address.getHostName(), address.getPort());
-        channel.pipeline().addFirst(parentObj.shFactory.getHandlerName(), 
handler);
-        handler.handshakeFuture().addListener(new 
GenericFutureListener<Future<Channel>>() {
+        LOG.info("Starting TLS handshake with {}:{}", address.getHostString(), 
address.getPort());
+        SslHandler sslHandler = 
parentObj.shFactory.newTLSHandler(address.getHostName(), address.getPort());
+        String sslHandlerName = parentObj.shFactory.getHandlerName();
+        if (channel.pipeline().names().contains(CONSOLIDATION_HANDLER_NAME)) {
+            channel.pipeline().addAfter(CONSOLIDATION_HANDLER_NAME, 
sslHandlerName, sslHandler);
+        } else {
+            // local transport doesn't contain FlushConsolidationHandler
+            channel.pipeline().addFirst(sslHandlerName, sslHandler);
+        }
+        sslHandler.handshakeFuture().addListener(new 
GenericFutureListener<Future<Channel>>() {
                 @Override
                 public void operationComplete(Future<Channel> future) throws 
Exception {
                     int rc;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 5e705f724e..f6acefb587 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -301,7 +301,7 @@ public abstract class BookKeeperClusterTestCase {
     }
 
     protected ClientConfiguration newClientConfiguration() {
-        return new ClientConfiguration(baseConf);
+        return new ClientConfiguration(baseClientConf);
     }
 
     protected ServerConfiguration newServerConfiguration(int port, File 
journalDir, File[] ledgerDirs) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
index b5719deea0..f4d5d94197 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
@@ -349,11 +349,6 @@ public class TestTLS extends BookKeeperClusterTestCase {
      */
     @Test
     public void testConnectToLocalTLSClusterTLSClient() throws Exception {
-        // skip test
-        if (useV2Protocol) {
-            return;
-        }
-
         restartBookies(c -> {
                 c.setDisableServerSocketBind(true);
                 c.setEnableLocalTransport(true);
@@ -621,10 +616,6 @@ public class TestTLS extends BookKeeperClusterTestCase {
      */
     @Test
     public void testBookieAuthPluginRequireClientTLSAuthenticationLocal() 
throws Exception {
-        if (useV2Protocol) {
-            return;
-        }
-
         restartBookies(c -> {
                 c.setBookieAuthProviderFactoryClass(
                         AllowOnlyClientsWithX509Certificates.class.getName());
@@ -755,6 +746,10 @@ public class TestTLS extends BookKeeperClusterTestCase {
             testClient(clientConf, numBookies);
             fail("Shouldn't be able to connect");
         } catch (BKException.BKUnauthorizedAccessException authFailed) {
+        } catch (BKException.BKNotEnoughBookiesException 
notEnoughBookiesException) {
+            if (!useV2Protocol) {
+                fail("Unexpected exception occurred.");
+            }
         }
 
         assertFalse(secureBookieSideChannel);

Reply via email to