This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new e0097df Copy ByteBufPair buffers when using with SSL (#2401) (#2464)
e0097df is described below
commit e0097dfc651dbf20eb067889113711c908692b5f
Author: Ivan Kelly <[email protected]>
AuthorDate: Sat Sep 1 00:39:31 2018 +0200
Copy ByteBufPair buffers when using with SSL (#2401) (#2464)
The netty SSL handler uses a coalescing buffer queue, which modifies
the buffers used to queue the writes so that SSL_write can be given
larger chunks, thereby increasing the 'goodput'.
If we pass in a retained duplicate as we have been doing until now,
then later clients will be passed junk, as SSL will have modified cached
entry buffers.
This patch introduces a copying ByteBufPair encoder, which is only
used with SSL connections.
---
.../broker/service/PulsarChannelInitializer.java | 4 +++-
.../apache/pulsar/client/impl/ConnectionPool.java | 5 +++--
.../org/apache/pulsar/common/api/ByteBufPair.java | 25 +++++++++++++++++++++-
3 files changed, 30 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index 8c16a55..bcb6473 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -56,9 +56,11 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
serviceConfig.getTlsCiphers(),
serviceConfig.getTlsProtocols(),
serviceConfig.getTlsRequireTrustedClientCertOnConnect());
ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
+ ch.pipeline().addLast("ByteBufPairEncoder",
ByteBufPair.COPYING_ENCODER);
+ } else {
+ ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
}
- ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
ch.pipeline().addLast("frameDecoder", new
LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
ch.pipeline().addLast("handler", new ServerCnx(brokerService));
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index afba7d2..3188b35 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -100,9 +100,10 @@ public class ConnectionPool implements Closeable {
conf.getTlsTrustCertsFilePath());
}
ch.pipeline().addLast(TLS_HANDLER,
sslCtx.newHandler(ch.alloc()));
+ ch.pipeline().addLast("ByteBufPairEncoder",
ByteBufPair.COPYING_ENCODER);
+ } else {
+ ch.pipeline().addLast("ByteBufPairEncoder",
ByteBufPair.ENCODER);
}
-
- ch.pipeline().addLast("ByteBufPairEncoder",
ByteBufPair.ENCODER);
ch.pipeline().addLast("frameDecoder", new
LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4));
ch.pipeline().addLast("handler", clientCnxSupplier.get());
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java
index 94e1fb7..b99270b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java
@@ -109,6 +109,7 @@ public final class ByteBufPair extends
AbstractReferenceCounted {
}
public static final Encoder ENCODER = new Encoder();
+ public static final CopyingEncoder COPYING_ENCODER = new CopyingEncoder();
@Sharable
public static class Encoder extends ChannelOutboundHandlerAdapter {
@@ -132,4 +133,26 @@ public final class ByteBufPair extends
AbstractReferenceCounted {
}
}
-}
\ No newline at end of file
+ @Sharable
+ public static class CopyingEncoder extends ChannelOutboundHandlerAdapter {
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) throws Exception {
+ if (msg instanceof ByteBufPair) {
+ ByteBufPair b = (ByteBufPair) msg;
+
+ // Some handlers in the pipeline will modify the bytebufs
passed in to them (i.e. SslHandler).
+ // For these handlers, we need to pass a copy of the buffers
as the source buffers may be cached
+ // for multiple requests.
+ try {
+ ctx.write(b.getFirst().copy(), ctx.voidPromise());
+ ctx.write(b.getSecond().copy(), promise);
+ } finally {
+ ReferenceCountUtil.safeRelease(b);
+ }
+ } else {
+ ctx.write(msg, promise);
+ }
+ }
+ }
+
+}