This is an automated email from the ASF dual-hosted git repository.
shoothzj pushed a commit to branch branch-4.16
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.16 by this push:
new 6f30f00067 Fix TLS stability issues with V2 protocol that caused data
corruption (#4404)
6f30f00067 is described below
commit 6f30f00067f92ab259b07bdb63bc61ff7c381e3a
Author: Lari Hotari <[email protected]>
AuthorDate: Wed May 29 09:49:18 2024 +0300
Fix TLS stability issues with V2 protocol that caused data corruption
(#4404)
* Fix TLS stability issues with V2 protocol that caused data corruption
- add the TLS handler after the FlushConsolidationHandler
- This makes TLS connections from Pulsar Broker to Bookkeeper stable
when bookkeeperUseV2WireProtocol=true is used
- Fix test TestTLS for V2
- Fix inconsistency in client configuration in BookKeeperClusterTestCase
(cherry picked from commit 5f73147a2803a5147d9d9ba2d28eaa6c79c998a3)
---
.../org/apache/bookkeeper/proto/BookieNettyServer.java | 3 ++-
.../apache/bookkeeper/proto/BookieRequestProcessor.java | 9 ++++++++-
.../apache/bookkeeper/proto/PerChannelBookieClient.java | 16 ++++++++++++----
.../bookkeeper/test/BookKeeperClusterTestCase.java | 2 +-
.../src/test/java/org/apache/bookkeeper/tls/TestTLS.java | 13 ++++---------
5 files changed, 27 insertions(+), 16 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index a834208ce0..a7e919c826 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -92,6 +92,7 @@ import org.slf4j.LoggerFactory;
class BookieNettyServer {
private static final Logger LOG =
LoggerFactory.getLogger(BookieNettyServer.class);
+ public static final String CONSOLIDATION_HANDLER_NAME = "consolidation";
final int maxFrameSize;
final ServerConfiguration conf;
@@ -344,7 +345,7 @@ class BookieNettyServer {
new BookieSideConnectionPeerContextHandler();
ChannelPipeline pipeline = ch.pipeline();
- pipeline.addLast("consolidation", new
FlushConsolidationHandler(1024, true));
+ pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new
FlushConsolidationHandler(1024, true));
pipeline.addLast("bytebufList", ByteBufList.ENCODER);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index a77b3d7bb5..0d9d61f634 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -66,6 +66,7 @@ import org.slf4j.MDC;
public class BookieRequestProcessor implements RequestProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(BookieRequestProcessor.class);
+ public static final String TLS_HANDLER_NAME = "tls";
/**
* The server configuration. We use this for getting the number of add and
read
@@ -576,9 +577,15 @@ public class BookieRequestProcessor implements
RequestProcessor {
response.setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
writeAndFlush(c, response.build());
} else {
+ LOG.info("Starting TLS handshake with client on channel {}", c);
// there is no need to execute in a different thread as this
operation is light
SslHandler sslHandler = shFactory.newTLSHandler();
- c.pipeline().addFirst("tls", sslHandler);
+ if
(c.pipeline().names().contains(BookieNettyServer.CONSOLIDATION_HANDLER_NAME)) {
+
c.pipeline().addAfter(BookieNettyServer.CONSOLIDATION_HANDLER_NAME,
TLS_HANDLER_NAME, sslHandler);
+ } else {
+ // local transport doesn't contain FlushConsolidationHandler
+ c.pipeline().addFirst(TLS_HANDLER_NAME, sslHandler);
+ }
response.setStatus(BookkeeperProtocol.StatusCode.EOK);
BookkeeperProtocol.StartTLSResponse.Builder builder =
BookkeeperProtocol.StartTLSResponse.newBuilder();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index ca1448c768..149f97fc28 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -174,6 +174,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
BKException.Code.WriteOnReadOnlyBookieException));
private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100; // We may add
finer grained priority later.
private static final AtomicLong txnIdGenerator = new AtomicLong(0);
+ static final String CONSOLIDATION_HANDLER_NAME = "consolidation";
final BookieId bookieId;
final BookieAddressResolver bookieAddressResolver;
@@ -594,7 +595,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
- pipeline.addLast("consolidation", new
FlushConsolidationHandler(1024, true));
+ pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new
FlushConsolidationHandler(1024, true));
pipeline.addLast("bytebufList", ByteBufList.ENCODER);
pipeline.addLast("lengthbasedframedecoder",
new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4,
0, 4));
@@ -1522,9 +1523,16 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
} else {
throw new RuntimeException("Unexpected socket address type");
}
- SslHandler handler =
parentObj.shFactory.newTLSHandler(address.getHostName(), address.getPort());
- channel.pipeline().addFirst(parentObj.shFactory.getHandlerName(),
handler);
- handler.handshakeFuture().addListener(new
GenericFutureListener<Future<Channel>>() {
+ LOG.info("Starting TLS handshake with {}:{}", address.getHostString(),
address.getPort());
+ SslHandler sslHandler =
parentObj.shFactory.newTLSHandler(address.getHostName(), address.getPort());
+ String sslHandlerName = parentObj.shFactory.getHandlerName();
+ if (channel.pipeline().names().contains(CONSOLIDATION_HANDLER_NAME)) {
+ channel.pipeline().addAfter(CONSOLIDATION_HANDLER_NAME,
sslHandlerName, sslHandler);
+ } else {
+ // local transport doesn't contain FlushConsolidationHandler
+ channel.pipeline().addFirst(sslHandlerName, sslHandler);
+ }
+ sslHandler.handshakeFuture().addListener(new
GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future) throws
Exception {
int rc;
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 5e705f724e..f6acefb587 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -301,7 +301,7 @@ public abstract class BookKeeperClusterTestCase {
}
protected ClientConfiguration newClientConfiguration() {
- return new ClientConfiguration(baseConf);
+ return new ClientConfiguration(baseClientConf);
}
protected ServerConfiguration newServerConfiguration(int port, File
journalDir, File[] ledgerDirs) {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
index b5719deea0..f4d5d94197 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
@@ -349,11 +349,6 @@ public class TestTLS extends BookKeeperClusterTestCase {
*/
@Test
public void testConnectToLocalTLSClusterTLSClient() throws Exception {
- // skip test
- if (useV2Protocol) {
- return;
- }
-
restartBookies(c -> {
c.setDisableServerSocketBind(true);
c.setEnableLocalTransport(true);
@@ -621,10 +616,6 @@ public class TestTLS extends BookKeeperClusterTestCase {
*/
@Test
public void testBookieAuthPluginRequireClientTLSAuthenticationLocal()
throws Exception {
- if (useV2Protocol) {
- return;
- }
-
restartBookies(c -> {
c.setBookieAuthProviderFactoryClass(
AllowOnlyClientsWithX509Certificates.class.getName());
@@ -755,6 +746,10 @@ public class TestTLS extends BookKeeperClusterTestCase {
testClient(clientConf, numBookies);
fail("Shouldn't be able to connect");
} catch (BKException.BKUnauthorizedAccessException authFailed) {
+ } catch (BKException.BKNotEnoughBookiesException
notEnoughBookiesException) {
+ if (!useV2Protocol) {
+ fail("Unexpected exception occurred.");
+ }
}
assertFalse(secureBookieSideChannel);