This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new ba15416c126 [improve][proxy] Consolidate Netty channel flushes to
mitigate syscall overhead (#16372)
ba15416c126 is described below
commit ba15416c126a178e2569dbead7bcaf41c777817b
Author: lipenghui <[email protected]>
AuthorDate: Tue Jul 5 14:29:04 2022 +0800
[improve][proxy] Consolidate Netty channel flushes to mitigate syscall
overhead (#16372)
### Motivation
Follow change for https://github.com/apache/pulsar/pull/16361 which
commented at https://github.com/apache/pulsar/pull/16361#issuecomment-1173241406
(cherry picked from commit 10db821e8e369efd6cba05eabda0e41ef346cab5)
---
.../main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java | 3 +++
.../java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java | 3 +++
2 files changed, 6 insertions(+)
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 9edb16a5e65..ca9e6cdb2ae 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -37,6 +37,7 @@ import io.netty.handler.codec.haproxy.HAProxyCommand;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
+import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslProvider;
@@ -178,6 +179,8 @@ public class DirectProxyHandler {
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
+ ch.pipeline().addLast("consolidation", new
FlushConsolidationHandler(1024,
+ true));
if (tlsEnabledWithBroker) {
String host = targetBrokerAddress.getHostString();
int port = targetBrokerAddress.getPort();
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index 2fd0156155d..fc7c78d6a24 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.proxy.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslProvider;
@@ -91,6 +92,8 @@ public class ServiceChannelInitializer extends
ChannelInitializer<SocketChannel>
@Override
protected void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline().addLast("consolidation", new
FlushConsolidationHandler(1024,
+ true));
if (serverSslCtxRefresher != null && this.enableTls) {
SslContext sslContext = serverSslCtxRefresher.get();
if (sslContext != null) {