This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3c9a2df0081bf6d6c71ec521258da44c61942832 Author: Lari Hotari <[email protected]> AuthorDate: Tue Aug 6 16:46:14 2024 +0300 [improve][misc] Optimize TLS performance by omitting extra buffer copies (#23115) (cherry picked from commit 1db3c5fddce45919c6cac3b5a10030183eed3d5c) --- .../broker/service/PulsarChannelInitializer.java | 4 +-- .../client/impl/PulsarChannelInitializer.java | 6 +++-- .../apache/pulsar/common/protocol/ByteBufPair.java | 30 ++++++++++++++++++++++ 3 files changed, 35 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java index e276ea24fed..f15f6d67766 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java @@ -116,10 +116,8 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> } else { ch.pipeline().addLast(TLS_HANDLER, sslCtxRefresher.get().newHandler(ch.alloc())); } - ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER); - } else { - ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); } + ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.getEncoder(this.enableTls)); if (pulsar.getConfiguration().isHaProxyProtocolEnabled()) { ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java index ed34f7d41c1..dff423d19fb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -147,11 +148,12 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> ch.pipeline().addLast("consolidation", new FlushConsolidationHandler(1024, true)); // Setup channel except for the SsHandler for TLS enabled connections - ch.pipeline().addLast("ByteBufPairEncoder", tlsEnabled ? ByteBufPair.COPYING_ENCODER : ByteBufPair.ENCODER); + ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.getEncoder(tlsEnabled)); ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder( Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4)); - ch.pipeline().addLast("handler", clientCnxSupplier.get()); + ChannelHandler clientCnx = clientCnxSupplier.get(); + ch.pipeline().addLast("handler", clientCnx); } /** diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java index cfd89d3bb28..6c4f42fcf88 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java @@ -107,9 +107,39 @@ public final class ByteBufPair extends AbstractReferenceCounted { return this; } + /** + * Encoder that writes a {@link ByteBufPair} to the socket. + * Use {@link #getEncoder(boolean)} to get the appropriate encoder instead of referencing this. + */ + @Deprecated public static final Encoder ENCODER = new Encoder(); + + private static final boolean COPY_ENCODER_REQUIRED_FOR_TLS; + static { + boolean copyEncoderRequiredForTls = false; + try { + // io.netty.handler.ssl.SslHandlerCoalescingBufferQueue is only available in netty 4.1.111 and later + // when the class is available, there's no need to use the CopyingEncoder when TLS is enabled + ByteBuf.class.getClassLoader().loadClass("io.netty.handler.ssl.SslHandlerCoalescingBufferQueue"); + } catch (ClassNotFoundException e) { + copyEncoderRequiredForTls = true; + } + COPY_ENCODER_REQUIRED_FOR_TLS = copyEncoderRequiredForTls; + } + + /** + * Encoder that makes a copy of the ByteBufs before writing them to the socket. + * This is needed with Netty <4.1.111.Final when TLS is enabled, because the SslHandler will modify the input + * ByteBufs. + * Use {@link #getEncoder(boolean)} to get the appropriate encoder instead of referencing this. + */ + @Deprecated public static final CopyingEncoder COPYING_ENCODER = new CopyingEncoder(); + public static ChannelOutboundHandlerAdapter getEncoder(boolean tlsEnabled) { + return tlsEnabled && COPY_ENCODER_REQUIRED_FOR_TLS ? COPYING_ENCODER : ENCODER; + } + @Sharable @SuppressWarnings("checkstyle:JavadocType") public static class Encoder extends ChannelOutboundHandlerAdapter {
