This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1db3c5fddce [improve][misc] Optimize TLS performance by omitting extra
buffer copies (#23115)
1db3c5fddce is described below
commit 1db3c5fddce45919c6cac3b5a10030183eed3d5c
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Aug 6 16:46:14 2024 +0300
[improve][misc] Optimize TLS performance by omitting extra buffer copies
(#23115)
---
.../broker/service/PulsarChannelInitializer.java | 4 +--
.../client/impl/PulsarChannelInitializer.java | 6 +++--
.../apache/pulsar/common/protocol/ByteBufPair.java | 30 ++++++++++++++++++++++
3 files changed, 35 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index e276ea24fed..f15f6d67766 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -116,10 +116,8 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
} else {
ch.pipeline().addLast(TLS_HANDLER,
sslCtxRefresher.get().newHandler(ch.alloc()));
}
- ch.pipeline().addLast("ByteBufPairEncoder",
ByteBufPair.COPYING_ENCODER);
- } else {
- ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
}
+ ch.pipeline().addLast("ByteBufPairEncoder",
ByteBufPair.getEncoder(this.enableTls));
if (pulsar.getConfiguration().isHaProxyProtocolEnabled()) {
ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new
OptionalProxyProtocolDecoder());
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index ed34f7d41c1..dff423d19fb 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
@@ -147,11 +148,12 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
ch.pipeline().addLast("consolidation", new
FlushConsolidationHandler(1024, true));
// Setup channel except for the SsHandler for TLS enabled connections
- ch.pipeline().addLast("ByteBufPairEncoder", tlsEnabled ?
ByteBufPair.COPYING_ENCODER : ByteBufPair.ENCODER);
+ ch.pipeline().addLast("ByteBufPairEncoder",
ByteBufPair.getEncoder(tlsEnabled));
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
Commands.DEFAULT_MAX_MESSAGE_SIZE +
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
- ch.pipeline().addLast("handler", clientCnxSupplier.get());
+ ChannelHandler clientCnx = clientCnxSupplier.get();
+ ch.pipeline().addLast("handler", clientCnx);
}
/**
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java
index cfd89d3bb28..6c4f42fcf88 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java
@@ -107,9 +107,39 @@ public final class ByteBufPair extends
AbstractReferenceCounted {
return this;
}
+ /**
+ * Encoder that writes a {@link ByteBufPair} to the socket.
+ * Use {@link #getEncoder(boolean)} to get the appropriate encoder instead
of referencing this.
+ */
+ @Deprecated
public static final Encoder ENCODER = new Encoder();
+
+ private static final boolean COPY_ENCODER_REQUIRED_FOR_TLS;
+ static {
+ boolean copyEncoderRequiredForTls = false;
+ try {
+ // io.netty.handler.ssl.SslHandlerCoalescingBufferQueue is only
available in netty 4.1.111 and later
+ // when the class is available, there's no need to use the
CopyingEncoder when TLS is enabled
+
ByteBuf.class.getClassLoader().loadClass("io.netty.handler.ssl.SslHandlerCoalescingBufferQueue");
+ } catch (ClassNotFoundException e) {
+ copyEncoderRequiredForTls = true;
+ }
+ COPY_ENCODER_REQUIRED_FOR_TLS = copyEncoderRequiredForTls;
+ }
+
+ /**
+ * Encoder that makes a copy of the ByteBufs before writing them to the
socket.
+ * This is needed with Netty <4.1.111.Final when TLS is enabled, because
the SslHandler will modify the input
+ * ByteBufs.
+ * Use {@link #getEncoder(boolean)} to get the appropriate encoder instead
of referencing this.
+ */
+ @Deprecated
public static final CopyingEncoder COPYING_ENCODER = new CopyingEncoder();
+ public static ChannelOutboundHandlerAdapter getEncoder(boolean tlsEnabled)
{
+ return tlsEnabled && COPY_ENCODER_REQUIRED_FOR_TLS ? COPYING_ENCODER :
ENCODER;
+ }
+
@Sharable
@SuppressWarnings("checkstyle:JavadocType")
public static class Encoder extends ChannelOutboundHandlerAdapter {