This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 34e9fd6 Copy ByteBufPair buffers when using with SSL (#2401) (#2464) 34e9fd6 is described below commit 34e9fd6a441ccb5a895562c6872a7ce992cb39b9 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Sat Sep 1 00:39:31 2018 +0200 Copy ByteBufPair buffers when using with SSL (#2401) (#2464) The netty SSL handler uses a coalescing buffer queue, which modifies the buffers used to queue the writes so that SSL_write can be given larger chunks, thereby increasing the 'goodput'. If we pass in a retained duplicate as we have been doing until now, then later clients will be passed junk, as SSL will have modified cached entry buffers. This patch introduces a copying ByteBufPair encoder, which is only used with SSL connections. --- .../broker/service/PulsarChannelInitializer.java | 4 +++- .../apache/pulsar/client/impl/ConnectionPool.java | 5 +++-- .../org/apache/pulsar/common/api/ByteBufPair.java | 25 +++++++++++++++++++++- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java index 7858a11..a3fb772 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java @@ -57,9 +57,11 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(), serviceConfig.getTlsRequireTrustedClientCertOnConnect()); ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc())); + ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER); + } else { + ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); } - ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4)); ch.pipeline().addLast("handler", new ServerCnx(pulsar)); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index afba7d2..3188b35 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -100,9 +100,10 @@ public class ConnectionPool implements Closeable { conf.getTlsTrustCertsFilePath()); } ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc())); + ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER); + } else { + ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); } - - ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4)); ch.pipeline().addLast("handler", clientCnxSupplier.get()); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java index 94e1fb7..b99270b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java @@ -109,6 +109,7 @@ public final class ByteBufPair extends AbstractReferenceCounted { } public static final Encoder ENCODER = new Encoder(); + public static final CopyingEncoder COPYING_ENCODER = new CopyingEncoder(); @Sharable public static class Encoder extends ChannelOutboundHandlerAdapter { @@ -132,4 +133,26 @@ public final class ByteBufPair extends AbstractReferenceCounted { } } -} \ No newline at end of file + @Sharable + public static class CopyingEncoder extends ChannelOutboundHandlerAdapter { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (msg instanceof ByteBufPair) { + ByteBufPair b = (ByteBufPair) msg; + + // Some handlers in the pipeline will modify the bytebufs passed in to them (i.e. SslHandler). + // For these handlers, we need to pass a copy of the buffers as the source buffers may be cached + // for multiple requests. + try { + ctx.write(b.getFirst().copy(), ctx.voidPromise()); + ctx.write(b.getSecond().copy(), promise); + } finally { + ReferenceCountUtil.safeRelease(b); + } + } else { + ctx.write(msg, promise); + } + } + } + +}