This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3c9a2df0081bf6d6c71ec521258da44c61942832
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Aug 6 16:46:14 2024 +0300

    [improve][misc] Optimize TLS performance by omitting extra buffer copies 
(#23115)
    
    (cherry picked from commit 1db3c5fddce45919c6cac3b5a10030183eed3d5c)
---
 .../broker/service/PulsarChannelInitializer.java   |  4 +--
 .../client/impl/PulsarChannelInitializer.java      |  6 +++--
 .../apache/pulsar/common/protocol/ByteBufPair.java | 30 ++++++++++++++++++++++
 3 files changed, 35 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index e276ea24fed..f15f6d67766 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -116,10 +116,8 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
             } else {
                 ch.pipeline().addLast(TLS_HANDLER, 
sslCtxRefresher.get().newHandler(ch.alloc()));
             }
-            ch.pipeline().addLast("ByteBufPairEncoder", 
ByteBufPair.COPYING_ENCODER);
-        } else {
-            ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
         }
+        ch.pipeline().addLast("ByteBufPairEncoder", 
ByteBufPair.getEncoder(this.enableTls));
 
         if (pulsar.getConfiguration().isHaProxyProtocolEnabled()) {
             ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new 
OptionalProxyProtocolDecoder());
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index ed34f7d41c1..dff423d19fb 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
@@ -147,11 +148,12 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
         ch.pipeline().addLast("consolidation", new 
FlushConsolidationHandler(1024, true));
 
         // Setup channel except for the SsHandler for TLS enabled connections
-        ch.pipeline().addLast("ByteBufPairEncoder", tlsEnabled ? 
ByteBufPair.COPYING_ENCODER : ByteBufPair.ENCODER);
+        ch.pipeline().addLast("ByteBufPairEncoder", 
ByteBufPair.getEncoder(tlsEnabled));
 
         ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
                 Commands.DEFAULT_MAX_MESSAGE_SIZE + 
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
-        ch.pipeline().addLast("handler", clientCnxSupplier.get());
+        ChannelHandler clientCnx = clientCnxSupplier.get();
+        ch.pipeline().addLast("handler", clientCnx);
     }
 
    /**
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java
index cfd89d3bb28..6c4f42fcf88 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java
@@ -107,9 +107,39 @@ public final class ByteBufPair extends 
AbstractReferenceCounted {
         return this;
     }
 
+    /**
+     * Encoder that writes a {@link ByteBufPair} to the socket.
+     * Use {@link #getEncoder(boolean)} to get the appropriate encoder instead 
of referencing this.
+     */
+    @Deprecated
     public static final Encoder ENCODER = new Encoder();
+
+    private static final boolean COPY_ENCODER_REQUIRED_FOR_TLS;
+    static {
+        boolean copyEncoderRequiredForTls = false;
+        try {
+            // io.netty.handler.ssl.SslHandlerCoalescingBufferQueue is only 
available in netty 4.1.111 and later
+            // when the class is available, there's no need to use the 
CopyingEncoder when TLS is enabled
+            
ByteBuf.class.getClassLoader().loadClass("io.netty.handler.ssl.SslHandlerCoalescingBufferQueue");
+        } catch (ClassNotFoundException e) {
+            copyEncoderRequiredForTls = true;
+        }
+        COPY_ENCODER_REQUIRED_FOR_TLS = copyEncoderRequiredForTls;
+    }
+
+    /**
+     * Encoder that makes a copy of the ByteBufs before writing them to the 
socket.
+     * This is needed with Netty <4.1.111.Final when TLS is enabled, because 
the SslHandler will modify the input
+     * ByteBufs.
+     * Use {@link #getEncoder(boolean)} to get the appropriate encoder instead 
of referencing this.
+     */
+    @Deprecated
     public static final CopyingEncoder COPYING_ENCODER = new CopyingEncoder();
 
+    public static ChannelOutboundHandlerAdapter getEncoder(boolean tlsEnabled) 
{
+        return tlsEnabled && COPY_ENCODER_REQUIRED_FOR_TLS ? COPYING_ENCODER : 
ENCODER;
+    }
+
     @Sharable
     @SuppressWarnings("checkstyle:JavadocType")
     public static class Encoder extends ChannelOutboundHandlerAdapter {

Reply via email to