This is an automated email from the ASF dual-hosted git repository.

huangli pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8fa9ca5  [ISSUE 3825] Use default 
SO_SNDBUF/SO_RCVBUF/WRITE_BUFFER_WATER_MARK value. (#3826)
8fa9ca5 is described below

commit 8fa9ca592ce526ffe191ec4e9c6889997e5bfc93
Author: huangli <[email protected]>
AuthorDate: Thu Feb 10 11:15:04 2022 +0800

    [ISSUE 3825] Use default SO_SNDBUF/SO_RCVBUF/WRITE_BUFFER_WATER_MARK value. 
(#3826)
    
    This commit may greatly increase thoughtput when network lantency is big or 
message size is big, especially for consumer.
    
    In one broker (M/S sync master async flush), one producer send test.
    
    250 byte body: 0.93x old version
    4000 byte body: 3.85x of old version
    16000 byte body: 6.07x of old version
    100000 byte body: 7.93x of old version
    
    The old version is 4.9.3-SNAPSHOT in this test. In this test the network 
latency from producer to broker is 1.61ms.
---
 .../java/org/apache/rocketmq/broker/BrokerStartup.java |  9 ---------
 .../apache/rocketmq/remoting/common/RemotingUtil.java  |  9 +++++++--
 .../rocketmq/remoting/netty/NettyRemotingClient.java   | 18 ++++++++++++++----
 .../rocketmq/remoting/netty/NettyRemotingServer.java   | 18 ++++++++++++++----
 .../rocketmq/remoting/netty/NettySystemConfig.java     |  8 ++++----
 .../org/apache/rocketmq/store/ha/HAConnection.java     |  9 +++++++--
 6 files changed, 46 insertions(+), 25 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index bb2eb91..19e618b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -32,7 +32,6 @@ import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.common.TlsMode;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
-import org.apache.rocketmq.remoting.netty.NettySystemConfig;
 import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.srvutil.ServerUtil;
@@ -90,14 +89,6 @@ public class BrokerStartup {
     public static BrokerController createBrokerController(String[] args) {
         System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, 
Integer.toString(MQVersion.CURRENT_VERSION));
 
-        if (null == 
System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) 
{
-            NettySystemConfig.socketSndbufSize = 131072;
-        }
-
-        if (null == 
System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) 
{
-            NettySystemConfig.socketRcvbufSize = 131072;
-        }
-
         try {
             //PackageConflictDetect.detectFastjson();
             Options options = ServerUtil.buildCommandlineOptions(new 
Options());
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
index 3914314..d5ce20b 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
 import java.util.Enumeration;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.netty.NettySystemConfig;
 
 public class RemotingUtil {
     public static final String OS_NAME = System.getProperty("os.name");
@@ -193,8 +194,12 @@ public class RemotingUtil {
             sc.configureBlocking(true);
             sc.socket().setSoLinger(false, -1);
             sc.socket().setTcpNoDelay(true);
-            sc.socket().setReceiveBufferSize(1024 * 64);
-            sc.socket().setSendBufferSize(1024 * 64);
+            if (NettySystemConfig.socketSndbufSize > 0) {
+                
sc.socket().setReceiveBufferSize(NettySystemConfig.socketSndbufSize);
+            }
+            if (NettySystemConfig.socketRcvbufSize > 0) {
+                
sc.socket().setSendBufferSize(NettySystemConfig.socketRcvbufSize);
+            }
             sc.socket().connect(remote, timeoutMillis);
             sc.configureBlocking(false);
             return sc;
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index cbd17fa..5ced3b7 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -166,10 +166,6 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
             .option(ChannelOption.TCP_NODELAY, true)
             .option(ChannelOption.SO_KEEPALIVE, false)
             .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
nettyClientConfig.getConnectTimeoutMillis())
-            .option(ChannelOption.SO_SNDBUF, 
nettyClientConfig.getClientSocketSndBufSize())
-            .option(ChannelOption.SO_RCVBUF, 
nettyClientConfig.getClientSocketRcvBufSize())
-            .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new 
WriteBufferWaterMark(nettyClientConfig.getWriteBufferLowWaterMark(),
-                nettyClientConfig.getWriteBufferHighWaterMark()))
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
@@ -191,6 +187,20 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                         new NettyClientHandler());
                 }
             });
+        if (nettyClientConfig.getClientSocketSndBufSize() > 0) {
+            log.info("client set SO_SNDBUF to {}", 
nettyClientConfig.getClientSocketSndBufSize());
+            handler.option(ChannelOption.SO_SNDBUF, 
nettyClientConfig.getClientSocketSndBufSize());
+        }
+        if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {
+            log.info("client set SO_RCVBUF to {}", 
nettyClientConfig.getClientSocketRcvBufSize());
+            handler.option(ChannelOption.SO_RCVBUF, 
nettyClientConfig.getClientSocketRcvBufSize());
+        }
+        if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && 
nettyClientConfig.getWriteBufferHighWaterMark() > 0) {
+            log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",
+                    nettyClientConfig.getWriteBufferLowWaterMark(), 
nettyClientConfig.getWriteBufferHighWaterMark());
+            handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new 
WriteBufferWaterMark(
+                    nettyClientConfig.getWriteBufferLowWaterMark(), 
nettyClientConfig.getWriteBufferHighWaterMark()));
+        }
 
         this.timer.scheduleAtFixedRate(new TimerTask() {
             @Override
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 2b7413f..22440af 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -203,10 +203,6 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
                 .option(ChannelOption.SO_REUSEADDR, true)
                 .option(ChannelOption.SO_KEEPALIVE, false)
                 .childOption(ChannelOption.TCP_NODELAY, true)
-                .childOption(ChannelOption.SO_SNDBUF, 
nettyServerConfig.getServerSocketSndBufSize())
-                .childOption(ChannelOption.SO_RCVBUF, 
nettyServerConfig.getServerSocketRcvBufSize())
-                .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new 
WriteBufferWaterMark(
-                    nettyServerConfig.getWriteBufferLowWaterMark(), 
nettyServerConfig.getWriteBufferHighWaterMark()))
                 .localAddress(new 
InetSocketAddress(this.nettyServerConfig.getListenPort()))
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                     @Override
@@ -222,6 +218,20 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
                             );
                     }
                 });
+        if (nettyServerConfig.getServerSocketSndBufSize() > 0) {
+            log.info("server set SO_SNDBUF to {}", 
nettyServerConfig.getServerSocketSndBufSize());
+            childHandler.childOption(ChannelOption.SO_SNDBUF, 
nettyServerConfig.getServerSocketSndBufSize());
+        }
+        if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {
+            log.info("server set SO_RCVBUF to {}", 
nettyServerConfig.getServerSocketRcvBufSize());
+            childHandler.childOption(ChannelOption.SO_RCVBUF, 
nettyServerConfig.getServerSocketRcvBufSize());
+        }
+        if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && 
nettyServerConfig.getWriteBufferHighWaterMark() > 0) {
+            log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",
+                    nettyServerConfig.getWriteBufferLowWaterMark(), 
nettyServerConfig.getWriteBufferHighWaterMark());
+            childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, 
new WriteBufferWaterMark(
+                    nettyServerConfig.getWriteBufferLowWaterMark(), 
nettyServerConfig.getWriteBufferHighWaterMark()));
+        }
 
         if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
             childHandler.childOption(ChannelOption.ALLOCATOR, 
PooledByteBufAllocator.DEFAULT);
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
index 4290bae..edad844 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
@@ -50,9 +50,9 @@ public class NettySystemConfig {
     public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE =
         
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE,
 "65535"));
     public static int socketSndbufSize =
-        
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, 
"65535"));
+        
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, 
"0"));
     public static int socketRcvbufSize =
-        
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, 
"65535"));
+        
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, 
"0"));
     public static int socketBacklog =
         
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_BACKLOG, 
"1024"));
     public static int clientWorkerSize =
@@ -64,8 +64,8 @@ public class NettySystemConfig {
     public static boolean clientCloseSocketIfTimeout =
         
Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_CLOSE_SOCKET_IF_TIMEOUT,
 "true"));
     public static int writeBufferHighWaterMark =
-        
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_WRITE_BUFFER_HIGH_WATER_MARK_VALUE,
 "4194304"));//4M
+        
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_WRITE_BUFFER_HIGH_WATER_MARK_VALUE,
 "0"));
     public static int writeBufferLowWaterMark =
-        
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_WRITE_BUFFER_LOW_WATER_MARK,
 "1048576")); //1MB
+        
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_WRITE_BUFFER_LOW_WATER_MARK,
 "0"));
 
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java 
b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
index c08c515..4c26971 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
@@ -26,6 +26,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.netty.NettySystemConfig;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 
 public class HAConnection {
@@ -46,8 +47,12 @@ public class HAConnection {
         this.socketChannel.configureBlocking(false);
         this.socketChannel.socket().setSoLinger(false, -1);
         this.socketChannel.socket().setTcpNoDelay(true);
-        this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
-        this.socketChannel.socket().setSendBufferSize(1024 * 64);
+        if (NettySystemConfig.socketSndbufSize > 0) {
+            
this.socketChannel.socket().setReceiveBufferSize(NettySystemConfig.socketSndbufSize);
+        }
+        if (NettySystemConfig.socketRcvbufSize > 0) {
+            
this.socketChannel.socket().setSendBufferSize(NettySystemConfig.socketRcvbufSize);
+        }
         this.writeSocketService = new WriteSocketService(this.socketChannel);
         this.readSocketService = new ReadSocketService(this.socketChannel);
         this.haService.getConnectionCount().incrementAndGet();

Reply via email to