This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 17d25ea [ISSUE #3651] for add netty channel option
WRITE_BUFFER_WATER_MARK
17d25ea is described below
commit 17d25ea56b44625ae04e7c3181d24b0e80acaf77
Author: tianliuliu <[email protected]>
AuthorDate: Thu Dec 16 21:15:12 2021 +0800
[ISSUE #3651] for add netty channel option WRITE_BUFFER_WATER_MARK
---
.../rocketmq/remoting/netty/NettyClientConfig.java | 19 +++++++++++++++++++
.../rocketmq/remoting/netty/NettyRemotingClient.java | 3 +++
.../rocketmq/remoting/netty/NettyRemotingServer.java | 3 +++
.../rocketmq/remoting/netty/NettyServerConfig.java | 18 ++++++++++++++++++
.../rocketmq/remoting/netty/NettySystemConfig.java | 8 ++++++++
5 files changed, 51 insertions(+)
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
index 5ba3534..c1b9345 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
@@ -40,6 +40,9 @@ public class NettyClientConfig {
private boolean useTLS;
+ private int writeBufferHighWaterMark =
NettySystemConfig.writeBufferHighWaterMark;
+ private int writeBufferLowWaterMark =
NettySystemConfig.writeBufferLowWaterMark;
+
public boolean isClientCloseSocketIfTimeout() {
return clientCloseSocketIfTimeout;
}
@@ -135,4 +138,20 @@ public class NettyClientConfig {
public void setUseTLS(boolean useTLS) {
this.useTLS = useTLS;
}
+
+ public int getWriteBufferLowWaterMark() {
+ return writeBufferLowWaterMark;
+ }
+
+ public void setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
+ this.writeBufferLowWaterMark = writeBufferLowWaterMark;
+ }
+
+ public int getWriteBufferHighWaterMark() {
+ return writeBufferHighWaterMark;
+ }
+
+ public void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
+ this.writeBufferHighWaterMark = writeBufferHighWaterMark;
+ }
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 5ba6cfa..5697465 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -27,6 +27,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
@@ -167,6 +168,8 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF,
nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF,
nettyClientConfig.getClientSocketRcvBufSize())
+ .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new
WriteBufferWaterMark(nettyClientConfig.getWriteBufferLowWaterMark(),
+ nettyClientConfig.getWriteBufferHighWaterMark()))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 1586472..2b7413f 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -28,6 +28,7 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
@@ -204,6 +205,8 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF,
nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF,
nettyServerConfig.getServerSocketRcvBufSize())
+ .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new
WriteBufferWaterMark(
+ nettyServerConfig.getWriteBufferLowWaterMark(),
nettyServerConfig.getWriteBufferHighWaterMark()))
.localAddress(new
InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
index 8708471..bd87e5b 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
@@ -27,6 +27,8 @@ public class NettyServerConfig implements Cloneable {
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
+ private int writeBufferHighWaterMark =
NettySystemConfig.writeBufferHighWaterMark;
+ private int writeBufferLowWaterMark =
NettySystemConfig.writeBufferLowWaterMark;
private int serverSocketBacklog = NettySystemConfig.socketBacklog;
private boolean serverPooledByteBufAllocatorEnable = true;
@@ -139,4 +141,20 @@ public class NettyServerConfig implements Cloneable {
public Object clone() throws CloneNotSupportedException {
return (NettyServerConfig) super.clone();
}
+
+ public int getWriteBufferLowWaterMark() {
+ return writeBufferLowWaterMark;
+ }
+
+ public void setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
+ this.writeBufferLowWaterMark = writeBufferLowWaterMark;
+ }
+
+ public int getWriteBufferHighWaterMark() {
+ return writeBufferHighWaterMark;
+ }
+
+ public void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
+ this.writeBufferHighWaterMark = writeBufferHighWaterMark;
+ }
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
index 85f30f5..4290bae 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
@@ -38,6 +38,10 @@ public class NettySystemConfig {
"com.rocketmq.remoting.client.channel.maxIdleTimeSeconds";
public static final String
COM_ROCKETMQ_REMOTING_CLIENT_CLOSE_SOCKET_IF_TIMEOUT =
"com.rocketmq.remoting.client.closeSocketIfTimeout";
+ public static final String
COM_ROCKETMQ_REMOTING_WRITE_BUFFER_HIGH_WATER_MARK_VALUE =
+ "com.rocketmq.remoting.write.buffer.high.water.mark";
+ public static final String
COM_ROCKETMQ_REMOTING_WRITE_BUFFER_LOW_WATER_MARK =
+ "com.rocketmq.remoting.write.buffer.low.water.mark";
public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = //
Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE,
"false"));
@@ -59,5 +63,9 @@ public class NettySystemConfig {
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_CHANNEL_MAX_IDLE_SECONDS,
"120"));
public static boolean clientCloseSocketIfTimeout =
Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_CLOSE_SOCKET_IF_TIMEOUT,
"true"));
+ public static int writeBufferHighWaterMark =
+
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_WRITE_BUFFER_HIGH_WATER_MARK_VALUE,
"4194304"));//4M
+ public static int writeBufferLowWaterMark =
+
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_WRITE_BUFFER_LOW_WATER_MARK,
"1048576")); //1MB
}