This is an automated email from the ASF dual-hosted git repository.
huangli pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 8fa9ca5 [ISSUE 3825] Use default
SO_SNDBUF/SO_RCVBUF/WRITE_BUFFER_WATER_MARK value. (#3826)
8fa9ca5 is described below
commit 8fa9ca592ce526ffe191ec4e9c6889997e5bfc93
Author: huangli <[email protected]>
AuthorDate: Thu Feb 10 11:15:04 2022 +0800
[ISSUE 3825] Use default SO_SNDBUF/SO_RCVBUF/WRITE_BUFFER_WATER_MARK value.
(#3826)
This commit may greatly increase thoughtput when network lantency is big or
message size is big, especially for consumer.
In one broker (M/S sync master async flush), one producer send test.
250 byte body: 0.93x old version
4000 byte body: 3.85x of old version
16000 byte body: 6.07x of old version
100000 byte body: 7.93x of old version
The old version is 4.9.3-SNAPSHOT in this test. In this test the network
latency from producer to broker is 1.61ms.
---
.../java/org/apache/rocketmq/broker/BrokerStartup.java | 9 ---------
.../apache/rocketmq/remoting/common/RemotingUtil.java | 9 +++++++--
.../rocketmq/remoting/netty/NettyRemotingClient.java | 18 ++++++++++++++----
.../rocketmq/remoting/netty/NettyRemotingServer.java | 18 ++++++++++++++----
.../rocketmq/remoting/netty/NettySystemConfig.java | 8 ++++----
.../org/apache/rocketmq/store/ha/HAConnection.java | 9 +++++++--
6 files changed, 46 insertions(+), 25 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index bb2eb91..19e618b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -32,7 +32,6 @@ import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
-import org.apache.rocketmq.remoting.netty.NettySystemConfig;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
@@ -90,14 +89,6 @@ public class BrokerStartup {
public static BrokerController createBrokerController(String[] args) {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY,
Integer.toString(MQVersion.CURRENT_VERSION));
- if (null ==
System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE))
{
- NettySystemConfig.socketSndbufSize = 131072;
- }
-
- if (null ==
System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE))
{
- NettySystemConfig.socketRcvbufSize = 131072;
- }
-
try {
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new
Options());
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
index 3914314..d5ce20b 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
import java.util.Enumeration;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.netty.NettySystemConfig;
public class RemotingUtil {
public static final String OS_NAME = System.getProperty("os.name");
@@ -193,8 +194,12 @@ public class RemotingUtil {
sc.configureBlocking(true);
sc.socket().setSoLinger(false, -1);
sc.socket().setTcpNoDelay(true);
- sc.socket().setReceiveBufferSize(1024 * 64);
- sc.socket().setSendBufferSize(1024 * 64);
+ if (NettySystemConfig.socketSndbufSize > 0) {
+
sc.socket().setReceiveBufferSize(NettySystemConfig.socketSndbufSize);
+ }
+ if (NettySystemConfig.socketRcvbufSize > 0) {
+
sc.socket().setSendBufferSize(NettySystemConfig.socketRcvbufSize);
+ }
sc.socket().connect(remote, timeoutMillis);
sc.configureBlocking(false);
return sc;
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index cbd17fa..5ced3b7 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -166,10 +166,6 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
nettyClientConfig.getConnectTimeoutMillis())
- .option(ChannelOption.SO_SNDBUF,
nettyClientConfig.getClientSocketSndBufSize())
- .option(ChannelOption.SO_RCVBUF,
nettyClientConfig.getClientSocketRcvBufSize())
- .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new
WriteBufferWaterMark(nettyClientConfig.getWriteBufferLowWaterMark(),
- nettyClientConfig.getWriteBufferHighWaterMark()))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
@@ -191,6 +187,20 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
new NettyClientHandler());
}
});
+ if (nettyClientConfig.getClientSocketSndBufSize() > 0) {
+ log.info("client set SO_SNDBUF to {}",
nettyClientConfig.getClientSocketSndBufSize());
+ handler.option(ChannelOption.SO_SNDBUF,
nettyClientConfig.getClientSocketSndBufSize());
+ }
+ if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {
+ log.info("client set SO_RCVBUF to {}",
nettyClientConfig.getClientSocketRcvBufSize());
+ handler.option(ChannelOption.SO_RCVBUF,
nettyClientConfig.getClientSocketRcvBufSize());
+ }
+ if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 &&
nettyClientConfig.getWriteBufferHighWaterMark() > 0) {
+ log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",
+ nettyClientConfig.getWriteBufferLowWaterMark(),
nettyClientConfig.getWriteBufferHighWaterMark());
+ handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new
WriteBufferWaterMark(
+ nettyClientConfig.getWriteBufferLowWaterMark(),
nettyClientConfig.getWriteBufferHighWaterMark()));
+ }
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 2b7413f..22440af 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -203,10 +203,6 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
- .childOption(ChannelOption.SO_SNDBUF,
nettyServerConfig.getServerSocketSndBufSize())
- .childOption(ChannelOption.SO_RCVBUF,
nettyServerConfig.getServerSocketRcvBufSize())
- .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new
WriteBufferWaterMark(
- nettyServerConfig.getWriteBufferLowWaterMark(),
nettyServerConfig.getWriteBufferHighWaterMark()))
.localAddress(new
InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
@@ -222,6 +218,20 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
);
}
});
+ if (nettyServerConfig.getServerSocketSndBufSize() > 0) {
+ log.info("server set SO_SNDBUF to {}",
nettyServerConfig.getServerSocketSndBufSize());
+ childHandler.childOption(ChannelOption.SO_SNDBUF,
nettyServerConfig.getServerSocketSndBufSize());
+ }
+ if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {
+ log.info("server set SO_RCVBUF to {}",
nettyServerConfig.getServerSocketRcvBufSize());
+ childHandler.childOption(ChannelOption.SO_RCVBUF,
nettyServerConfig.getServerSocketRcvBufSize());
+ }
+ if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 &&
nettyServerConfig.getWriteBufferHighWaterMark() > 0) {
+ log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",
+ nettyServerConfig.getWriteBufferLowWaterMark(),
nettyServerConfig.getWriteBufferHighWaterMark());
+ childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(
+ nettyServerConfig.getWriteBufferLowWaterMark(),
nettyServerConfig.getWriteBufferHighWaterMark()));
+ }
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR,
PooledByteBufAllocator.DEFAULT);
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
index 4290bae..edad844 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
@@ -50,9 +50,9 @@ public class NettySystemConfig {
public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE =
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE,
"65535"));
public static int socketSndbufSize =
-
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE,
"65535"));
+
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE,
"0"));
public static int socketRcvbufSize =
-
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE,
"65535"));
+
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE,
"0"));
public static int socketBacklog =
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_BACKLOG,
"1024"));
public static int clientWorkerSize =
@@ -64,8 +64,8 @@ public class NettySystemConfig {
public static boolean clientCloseSocketIfTimeout =
Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_CLOSE_SOCKET_IF_TIMEOUT,
"true"));
public static int writeBufferHighWaterMark =
-
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_WRITE_BUFFER_HIGH_WATER_MARK_VALUE,
"4194304"));//4M
+
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_WRITE_BUFFER_HIGH_WATER_MARK_VALUE,
"0"));
public static int writeBufferLowWaterMark =
-
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_WRITE_BUFFER_LOW_WATER_MARK,
"1048576")); //1MB
+
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_WRITE_BUFFER_LOW_WATER_MARK,
"0"));
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
index c08c515..4c26971 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
@@ -26,6 +26,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.netty.NettySystemConfig;
import org.apache.rocketmq.store.SelectMappedBufferResult;
public class HAConnection {
@@ -46,8 +47,12 @@ public class HAConnection {
this.socketChannel.configureBlocking(false);
this.socketChannel.socket().setSoLinger(false, -1);
this.socketChannel.socket().setTcpNoDelay(true);
- this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
- this.socketChannel.socket().setSendBufferSize(1024 * 64);
+ if (NettySystemConfig.socketSndbufSize > 0) {
+
this.socketChannel.socket().setReceiveBufferSize(NettySystemConfig.socketSndbufSize);
+ }
+ if (NettySystemConfig.socketRcvbufSize > 0) {
+
this.socketChannel.socket().setSendBufferSize(NettySystemConfig.socketRcvbufSize);
+ }
this.writeSocketService = new WriteSocketService(this.socketChannel);
this.readSocketService = new ReadSocketService(this.socketChannel);
this.haService.getConnectionCount().incrementAndGet();