Repository: activemq-artemis Updated Branches: refs/heads/master 2edc972c5 -> d0ae3f25a
ARTEMIS-1095 Netty's WriteBufferWaterMark configuration via TransportConstants Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f53449b9 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f53449b9 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f53449b9 Branch: refs/heads/master Commit: f53449b945d2c7ddbd185063cd1ece051ed93990 Parents: 2edc972 Author: Francesco Nigro <[email protected]> Authored: Thu Apr 6 11:33:29 2017 +0200 Committer: Francesco Nigro <[email protected]> Committed: Thu Apr 6 15:26:13 2017 +0200 ---------------------------------------------------------------------- .../core/remoting/impl/netty/NettyConnector.java | 12 +++++++++++- .../core/remoting/impl/netty/TransportConstants.java | 12 ++++++++++++ .../artemis/core/remoting/impl/netty/NettyAcceptor.java | 12 +++++++++++- docs/user-manual/en/configuring-transports.md | 11 +++++++++++ 4 files changed, 45 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f53449b9/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index f6935be..d31bdb2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -58,6 +58,7 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollSocketChannel; @@ -211,6 +212,10 @@ public class NettyConnector extends AbstractConnector { private int tcpReceiveBufferSize; + private final int writeBufferLowWaterMark; + + private final int writeBufferHighWaterMark; + private long batchDelay; private ConcurrentMap<Object, Connection> connections = new ConcurrentHashMap<>(); @@ -336,7 +341,8 @@ public class NettyConnector extends AbstractConnector { tcpNoDelay = ConfigurationHelper.getBooleanProperty(TransportConstants.TCP_NODELAY_PROPNAME, TransportConstants.DEFAULT_TCP_NODELAY, configuration); tcpSendBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_SENDBUFFER_SIZE, configuration); tcpReceiveBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE, configuration); - + this.writeBufferLowWaterMark = ConfigurationHelper.getIntProperty(TransportConstants.WRITE_BUFFER_LOW_WATER_MARK_PROPNAME, TransportConstants.DEFAULT_WRITE_BUFFER_LOW_WATER_MARK, configuration); + this.writeBufferHighWaterMark = ConfigurationHelper.getIntProperty(TransportConstants.WRITE_BUFFER_HIGH_WATER_MARK_PROPNAME, TransportConstants.DEFAULT_WRITE_BUFFER_HIGH_WATER_MARK, configuration); batchDelay = ConfigurationHelper.getLongProperty(TransportConstants.BATCH_DELAY, TransportConstants.DEFAULT_BATCH_DELAY, configuration); connectTimeoutMillis = ConfigurationHelper.getIntProperty(TransportConstants.NETTY_CONNECT_TIMEOUT, TransportConstants.DEFAULT_NETTY_CONNECT_TIMEOUT, configuration); @@ -423,6 +429,10 @@ public class NettyConnector extends AbstractConnector { if (tcpSendBufferSize != -1) { bootstrap.option(ChannelOption.SO_SNDBUF, tcpSendBufferSize); } + final int writeBufferLowWaterMark = this.writeBufferLowWaterMark != -1 ? this.writeBufferLowWaterMark : WriteBufferWaterMark.DEFAULT.low(); + final int writeBufferHighWaterMark = this.writeBufferHighWaterMark != -1 ? this.writeBufferHighWaterMark : WriteBufferWaterMark.DEFAULT.high(); + final WriteBufferWaterMark writeBufferWaterMark = new WriteBufferWaterMark(writeBufferLowWaterMark, writeBufferHighWaterMark); + bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, writeBufferWaterMark); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.SO_REUSEADDR, true); channelGroup = new DefaultChannelGroup("activemq-connector", GlobalEventExecutor.INSTANCE); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f53449b9/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java index 4317a68..69eaa94 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java @@ -125,6 +125,10 @@ public class TransportConstants { */ public static final String NIO_REMOTING_THREADS_PROPNAME = "nioRemotingThreads"; + public static final String WRITE_BUFFER_LOW_WATER_MARK_PROPNAME = "writeBufferLowWaterMark"; + + public static final String WRITE_BUFFER_HIGH_WATER_MARK_PROPNAME = "writeBufferHighWaterMark"; + public static final String REMOTING_THREADS_PROPNAME = "RemotingThreads"; public static final String BATCH_DELAY = "batchDelay"; @@ -183,6 +187,10 @@ public class TransportConstants { public static final int DEFAULT_TCP_RECEIVEBUFFER_SIZE = 1024 * 1024; + public static final int DEFAULT_WRITE_BUFFER_LOW_WATER_MARK = 32 * 1024; + + public static final int DEFAULT_WRITE_BUFFER_HIGH_WATER_MARK = 128 * 1024; + public static final boolean DEFAULT_HTTP_ENABLED = false; public static final long DEFAULT_HTTP_CLIENT_IDLE_TIME = 500; @@ -253,6 +261,8 @@ public class TransportConstants { allowableAcceptorKeys.add(TransportConstants.TCP_NODELAY_PROPNAME); allowableAcceptorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME); allowableAcceptorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME); + allowableAcceptorKeys.add(TransportConstants.WRITE_BUFFER_HIGH_WATER_MARK_PROPNAME); + allowableAcceptorKeys.add(TransportConstants.WRITE_BUFFER_LOW_WATER_MARK_PROPNAME); allowableAcceptorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME); allowableAcceptorKeys.add(TransportConstants.REMOTING_THREADS_PROPNAME); allowableAcceptorKeys.add(TransportConstants.BATCH_DELAY); @@ -303,6 +313,8 @@ public class TransportConstants { allowableConnectorKeys.add(TransportConstants.TCP_NODELAY_PROPNAME); allowableConnectorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME); allowableConnectorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME); + allowableConnectorKeys.add(TransportConstants.WRITE_BUFFER_HIGH_WATER_MARK_PROPNAME); + allowableConnectorKeys.add(TransportConstants.WRITE_BUFFER_LOW_WATER_MARK_PROPNAME); allowableConnectorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME); allowableConnectorKeys.add(TransportConstants.REMOTING_THREADS_PROPNAME); allowableConnectorKeys.add(TransportConstants.BATCH_DELAY); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f53449b9/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java index 7b17694..4f248f5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java @@ -44,6 +44,7 @@ import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; +import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerSocketChannel; @@ -161,6 +162,10 @@ public class NettyAcceptor extends AbstractAcceptor { private final int tcpReceiveBufferSize; + private final int writeBufferLowWaterMark; + + private final int writeBufferHighWaterMark; + private int remotingThreads; private final ConcurrentMap<Object, NettyServerConnection> connections = new ConcurrentHashMap<>(); @@ -260,7 +265,8 @@ public class NettyAcceptor extends AbstractAcceptor { tcpNoDelay = ConfigurationHelper.getBooleanProperty(TransportConstants.TCP_NODELAY_PROPNAME, TransportConstants.DEFAULT_TCP_NODELAY, configuration); tcpSendBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_SENDBUFFER_SIZE, configuration); tcpReceiveBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE, configuration); - + this.writeBufferLowWaterMark = ConfigurationHelper.getIntProperty(TransportConstants.WRITE_BUFFER_LOW_WATER_MARK_PROPNAME, TransportConstants.DEFAULT_WRITE_BUFFER_LOW_WATER_MARK, configuration); + this.writeBufferHighWaterMark = ConfigurationHelper.getIntProperty(TransportConstants.WRITE_BUFFER_HIGH_WATER_MARK_PROPNAME, TransportConstants.DEFAULT_WRITE_BUFFER_HIGH_WATER_MARK, configuration); this.scheduledThreadPool = scheduledThreadPool; batchDelay = ConfigurationHelper.getLongProperty(TransportConstants.BATCH_DELAY, TransportConstants.DEFAULT_BATCH_DELAY, configuration); @@ -341,6 +347,10 @@ public class NettyAcceptor extends AbstractAcceptor { if (tcpSendBufferSize != -1) { bootstrap.childOption(ChannelOption.SO_SNDBUF, tcpSendBufferSize); } + final int writeBufferLowWaterMark = this.writeBufferLowWaterMark != -1 ? this.writeBufferLowWaterMark : WriteBufferWaterMark.DEFAULT.low(); + final int writeBufferHighWaterMark = this.writeBufferHighWaterMark != -1 ? this.writeBufferHighWaterMark : WriteBufferWaterMark.DEFAULT.high(); + final WriteBufferWaterMark writeBufferWaterMark = new WriteBufferWaterMark(writeBufferLowWaterMark, writeBufferHighWaterMark); + bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, writeBufferWaterMark); if (backlog != -1) { bootstrap.option(ChannelOption.SO_BACKLOG, backlog); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f53449b9/docs/user-manual/en/configuring-transports.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/configuring-transports.md b/docs/user-manual/en/configuring-transports.md index eec4e09..d3adfef 100644 --- a/docs/user-manual/en/configuring-transports.md +++ b/docs/user-manual/en/configuring-transports.md @@ -239,6 +239,17 @@ Netty for simple TCP: - `tcpReceiveBufferSize`. This parameter determines the size of the TCP receive buffer in bytes. The default value for this property is `32768` bytes (32KiB). + +- `writeBufferLowWaterMark`. This parameter determines the low water mark of + the Netty write buffer. Once the number of bytes queued in the write buffer exceeded + the high water mark and then dropped down below this value, Netty's channel + will start to be writable again. The default value for this property is + `32768` bytes (32KiB). + +- `writeBufferHighWaterMark`. This parameter determines the high water mark of + the Netty write buffer. If the number of bytes queued in the write buffer exceeds + this value, Netty's channel will start to be not writable. The default value for + this property is `131072` bytes (128KiB). - `batchDelay`. Before writing packets to the transport, Apache ActiveMQ Artemis can be configured to batch up writes for a maximum of `batchDelay`
