This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new ba15416c126 [improve][proxy] Consolidate Netty channel flushes to 
mitigate syscall overhead (#16372)
ba15416c126 is described below

commit ba15416c126a178e2569dbead7bcaf41c777817b
Author: lipenghui <[email protected]>
AuthorDate: Tue Jul 5 14:29:04 2022 +0800

    [improve][proxy] Consolidate Netty channel flushes to mitigate syscall 
overhead (#16372)
    
    ### Motivation
    
    Follow change for https://github.com/apache/pulsar/pull/16361 which 
commented at https://github.com/apache/pulsar/pull/16361#issuecomment-1173241406
    
    (cherry picked from commit 10db821e8e369efd6cba05eabda0e41ef346cab5)
---
 .../main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java   | 3 +++
 .../java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java | 3 +++
 2 files changed, 6 insertions(+)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 9edb16a5e65..ca9e6cdb2ae 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -37,6 +37,7 @@ import io.netty.handler.codec.haproxy.HAProxyCommand;
 import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
 import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
+import io.netty.handler.flush.FlushConsolidationHandler;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.ssl.SslProvider;
@@ -178,6 +179,8 @@ public class DirectProxyHandler {
         b.handler(new ChannelInitializer<SocketChannel>() {
             @Override
             protected void initChannel(SocketChannel ch) {
+                ch.pipeline().addLast("consolidation", new 
FlushConsolidationHandler(1024,
+                        true));
                 if (tlsEnabledWithBroker) {
                     String host = targetBrokerAddress.getHostString();
                     int port = targetBrokerAddress.getPort();
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index 2fd0156155d..fc7c78d6a24 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.proxy.server;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.flush.FlushConsolidationHandler;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.ssl.SslProvider;
@@ -91,6 +92,8 @@ public class ServiceChannelInitializer extends 
ChannelInitializer<SocketChannel>
 
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
+        ch.pipeline().addLast("consolidation", new 
FlushConsolidationHandler(1024,
+                true));
         if (serverSslCtxRefresher != null && this.enableTls) {
             SslContext sslContext = serverSslCtxRefresher.get();
             if (sslContext != null) {

Reply via email to