This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 17d25ea  [ISSUE #3651] for add netty channel option 
WRITE_BUFFER_WATER_MARK
17d25ea is described below

commit 17d25ea56b44625ae04e7c3181d24b0e80acaf77
Author: tianliuliu <[email protected]>
AuthorDate: Thu Dec 16 21:15:12 2021 +0800

    [ISSUE #3651] for add netty channel option WRITE_BUFFER_WATER_MARK
---
 .../rocketmq/remoting/netty/NettyClientConfig.java    | 19 +++++++++++++++++++
 .../rocketmq/remoting/netty/NettyRemotingClient.java  |  3 +++
 .../rocketmq/remoting/netty/NettyRemotingServer.java  |  3 +++
 .../rocketmq/remoting/netty/NettyServerConfig.java    | 18 ++++++++++++++++++
 .../rocketmq/remoting/netty/NettySystemConfig.java    |  8 ++++++++
 5 files changed, 51 insertions(+)

diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
index 5ba3534..c1b9345 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
@@ -40,6 +40,9 @@ public class NettyClientConfig {
 
     private boolean useTLS;
 
+    private int writeBufferHighWaterMark = 
NettySystemConfig.writeBufferHighWaterMark;
+    private int writeBufferLowWaterMark = 
NettySystemConfig.writeBufferLowWaterMark;
+
     public boolean isClientCloseSocketIfTimeout() {
         return clientCloseSocketIfTimeout;
     }
@@ -135,4 +138,20 @@ public class NettyClientConfig {
     public void setUseTLS(boolean useTLS) {
         this.useTLS = useTLS;
     }
+
+    public int getWriteBufferLowWaterMark() {
+        return writeBufferLowWaterMark;
+    }
+
+    public void setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
+        this.writeBufferLowWaterMark = writeBufferLowWaterMark;
+    }
+
+    public int getWriteBufferHighWaterMark() {
+        return writeBufferHighWaterMark;
+    }
+
+    public void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
+        this.writeBufferHighWaterMark = writeBufferHighWaterMark;
+    }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 5ba6cfa..5697465 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -27,6 +27,7 @@ import io.netty.channel.ChannelPipeline;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.WriteBufferWaterMark;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
@@ -167,6 +168,8 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
             .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
nettyClientConfig.getConnectTimeoutMillis())
             .option(ChannelOption.SO_SNDBUF, 
nettyClientConfig.getClientSocketSndBufSize())
             .option(ChannelOption.SO_RCVBUF, 
nettyClientConfig.getClientSocketRcvBufSize())
+            .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new 
WriteBufferWaterMark(nettyClientConfig.getWriteBufferLowWaterMark(),
+                nettyClientConfig.getWriteBufferHighWaterMark()))
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 1586472..2b7413f 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -28,6 +28,7 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.WriteBufferWaterMark;
 import io.netty.channel.epoll.Epoll;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollServerSocketChannel;
@@ -204,6 +205,8 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
                 .childOption(ChannelOption.TCP_NODELAY, true)
                 .childOption(ChannelOption.SO_SNDBUF, 
nettyServerConfig.getServerSocketSndBufSize())
                 .childOption(ChannelOption.SO_RCVBUF, 
nettyServerConfig.getServerSocketRcvBufSize())
+                .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new 
WriteBufferWaterMark(
+                    nettyServerConfig.getWriteBufferLowWaterMark(), 
nettyServerConfig.getWriteBufferHighWaterMark()))
                 .localAddress(new 
InetSocketAddress(this.nettyServerConfig.getListenPort()))
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                     @Override
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
index 8708471..bd87e5b 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
@@ -27,6 +27,8 @@ public class NettyServerConfig implements Cloneable {
 
     private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
     private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
+    private int writeBufferHighWaterMark = 
NettySystemConfig.writeBufferHighWaterMark;
+    private int writeBufferLowWaterMark = 
NettySystemConfig.writeBufferLowWaterMark;
     private int serverSocketBacklog = NettySystemConfig.socketBacklog;
     private boolean serverPooledByteBufAllocatorEnable = true;
 
@@ -139,4 +141,20 @@ public class NettyServerConfig implements Cloneable {
     public Object clone() throws CloneNotSupportedException {
         return (NettyServerConfig) super.clone();
     }
+
+    public int getWriteBufferLowWaterMark() {
+        return writeBufferLowWaterMark;
+    }
+
+    public void setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
+        this.writeBufferLowWaterMark = writeBufferLowWaterMark;
+    }
+
+    public int getWriteBufferHighWaterMark() {
+        return writeBufferHighWaterMark;
+    }
+
+    public void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
+        this.writeBufferHighWaterMark = writeBufferHighWaterMark;
+    }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
index 85f30f5..4290bae 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
@@ -38,6 +38,10 @@ public class NettySystemConfig {
         "com.rocketmq.remoting.client.channel.maxIdleTimeSeconds";
     public static final String 
COM_ROCKETMQ_REMOTING_CLIENT_CLOSE_SOCKET_IF_TIMEOUT =
         "com.rocketmq.remoting.client.closeSocketIfTimeout";
+    public static final String 
COM_ROCKETMQ_REMOTING_WRITE_BUFFER_HIGH_WATER_MARK_VALUE =
+        "com.rocketmq.remoting.write.buffer.high.water.mark";
+    public static final String 
COM_ROCKETMQ_REMOTING_WRITE_BUFFER_LOW_WATER_MARK =
+        "com.rocketmq.remoting.write.buffer.low.water.mark";
 
     public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = //
         
Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE,
 "false"));
@@ -59,5 +63,9 @@ public class NettySystemConfig {
         
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_CHANNEL_MAX_IDLE_SECONDS,
 "120"));
     public static boolean clientCloseSocketIfTimeout =
         
Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_CLOSE_SOCKET_IF_TIMEOUT,
 "true"));
+    public static int writeBufferHighWaterMark =
+        
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_WRITE_BUFFER_HIGH_WATER_MARK_VALUE,
 "4194304"));//4M
+    public static int writeBufferLowWaterMark =
+        
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_WRITE_BUFFER_LOW_WATER_MARK,
 "1048576")); //1MB
 
 }

Reply via email to