This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 34e9fd6  Copy ByteBufPair buffers when using with SSL (#2401) (#2464)
34e9fd6 is described below

commit 34e9fd6a441ccb5a895562c6872a7ce992cb39b9
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Sat Sep 1 00:39:31 2018 +0200

    Copy ByteBufPair buffers when using with SSL (#2401) (#2464)
    
    The netty SSL handler uses a coalescing buffer queue, which modifies
    the buffers used to queue the writes so that SSL_write can be given
    larger chunks, thereby increasing the 'goodput'.
    
    If we pass in a retained duplicate as we have been doing until now,
    then later clients will be passed junk, as SSL will have modified cached
    entry buffers.
    
    This patch introduces a copying ByteBufPair encoder, which is only
    used with SSL connections.
---
 .../broker/service/PulsarChannelInitializer.java   |  4 +++-
 .../apache/pulsar/client/impl/ConnectionPool.java  |  5 +++--
 .../org/apache/pulsar/common/api/ByteBufPair.java  | 25 +++++++++++++++++++++-
 3 files changed, 30 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index 7858a11..a3fb772 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -57,9 +57,11 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
                     serviceConfig.getTlsCiphers(), 
serviceConfig.getTlsProtocols(),
                     serviceConfig.getTlsRequireTrustedClientCertOnConnect());
             ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
+            ch.pipeline().addLast("ByteBufPairEncoder", 
ByteBufPair.COPYING_ENCODER);
+        } else {
+            ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
         }
 
-        ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
         ch.pipeline().addLast("frameDecoder", new 
LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
         ch.pipeline().addLast("handler", new ServerCnx(pulsar));
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index afba7d2..3188b35 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -100,9 +100,10 @@ public class ConnectionPool implements Closeable {
                                 conf.getTlsTrustCertsFilePath());
                     }
                     ch.pipeline().addLast(TLS_HANDLER, 
sslCtx.newHandler(ch.alloc()));
+                    ch.pipeline().addLast("ByteBufPairEncoder", 
ByteBufPair.COPYING_ENCODER);
+                } else {
+                    ch.pipeline().addLast("ByteBufPairEncoder", 
ByteBufPair.ENCODER);
                 }
-
-                ch.pipeline().addLast("ByteBufPairEncoder", 
ByteBufPair.ENCODER);
                 ch.pipeline().addLast("frameDecoder", new 
LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4));
                 ch.pipeline().addLast("handler", clientCnxSupplier.get());
             }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java
index 94e1fb7..b99270b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java
@@ -109,6 +109,7 @@ public final class ByteBufPair extends 
AbstractReferenceCounted {
     }
 
     public static final Encoder ENCODER = new Encoder();
+    public static final CopyingEncoder COPYING_ENCODER = new CopyingEncoder();
 
     @Sharable
     public static class Encoder extends ChannelOutboundHandlerAdapter {
@@ -132,4 +133,26 @@ public final class ByteBufPair extends 
AbstractReferenceCounted {
         }
     }
 
-}
\ No newline at end of file
+    @Sharable
+    public static class CopyingEncoder extends ChannelOutboundHandlerAdapter {
+        @Override
+        public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise) throws Exception {
+            if (msg instanceof ByteBufPair) {
+                ByteBufPair b = (ByteBufPair) msg;
+
+                // Some handlers in the pipeline will modify the bytebufs 
passed in to them (i.e. SslHandler).
+                // For these handlers, we need to pass a copy of the buffers 
as the source buffers may be cached
+                // for multiple requests.
+                try {
+                    ctx.write(b.getFirst().copy(), ctx.voidPromise());
+                    ctx.write(b.getSecond().copy(), promise);
+                } finally {
+                    ReferenceCountUtil.safeRelease(b);
+                }
+            } else {
+                ctx.write(msg, promise);
+            }
+        }
+    }
+
+}

Reply via email to