Github user franz1981 commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/1119#discussion_r109278189
--- Diff:
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
---
@@ -92,69 +86,143 @@ public NettyConnection(final Map<String, Object>
configuration,
this.listener = listener;
+ this.directDeliver = directDeliver;
+
this.batchingEnabled = batchingEnabled;
- this.directDeliver = directDeliver;
+ this.writeBufferHighWaterMark =
this.channel.config().getWriteBufferHighWaterMark();
+
+ this.batchLimit = batchingEnabled ?
Math.min(this.writeBufferHighWaterMark, DEFAULT_BATCH_BYTES) : 0;
+ }
+
+ private static void waitFor(ChannelPromise promise, long millis) {
+ try {
+ final boolean completed = promise.await(millis);
+ if (!completed) {
+ ActiveMQClientLogger.LOGGER.timeoutFlushingPacket();
+ }
+ } catch (InterruptedException e) {
+ throw new ActiveMQInterruptedException(e);
+ }
+ }
+
+ /**
+ * Returns an estimation of the current size of the write buffer in the
channel.
+ * To obtain a more precise value is necessary to use the unsafe API of
the channel to
+ * call the {@link
io.netty.channel.ChannelOutboundBuffer#totalPendingWriteBytes()}.
+ * Anyway, both these values are subject to concurrent modifications.
+ */
+ private static int batchBufferSize(Channel channel, int
writeBufferHighWaterMark) {
+ //Channel::bytesBeforeUnwritable is performing a volatile load
+ //this is the reason why writeBufferHighWaterMark is passed as an
argument
+ final int bytesBeforeUnwritable = (int)
channel.bytesBeforeUnwritable();
+ assert bytesBeforeUnwritable >= 0;
+ final int writtenBytes = writeBufferHighWaterMark -
bytesBeforeUnwritable;
+ assert writtenBytes >= 0;
+ return writtenBytes;
+ }
+
+ /**
+ * When batching is not enabled, it tries to back-pressure the caller
thread.
+ * The back-pressure provided is not before the writeAndFlush request,
buf after it: too many threads that are not
+ * using {@link Channel#isWritable} to know when push unbatched data
will risk to cause OOM due to the enqueue of each own {@link
Channel#writeAndFlush} requests.
+ * Trying to provide back-pressure before the {@link
Channel#writeAndFlush} request could work, but in certain scenarios it will
block {@link Channel#isWritable} to be true.
+ */
+ private static ChannelFuture backPressuredWriteAndFlush(final ByteBuf
bytes,
+ final int
readableBytes,
+ final Channel
channel,
+ final
ChannelPromise promise) {
+ final ChannelFuture future;
+ if (!channel.isWritable()) {
+ final ChannelPromise channelPromise = promise.isVoid() ?
channel.newPromise() : promise;
+ future = channel.writeAndFlush(bytes, channelPromise);
+ //is the channel is not writable wait the current request to be
flushed, providing backpressuring on the caller thread
+ if (!channel.isWritable() &&
!future.awaitUninterruptibly(DEFAULT_BACK_PRESSURE_WAIT_MILLIS)) {
+ if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
--- End diff --
Agree with the warn and about reaction strategy: a sensible default and
customization via connection properties would be welcome.
I'm personally a fan of "kill the slowest connections", but I'm worried it
could lead to a storm of users complaining that while sending 1
[EB](https://it.wikipedia.org/wiki/Exabyte) of message strangely the broker has
died while screaming: ``"WARN: killed connection X - too slow; required 10 days
to send -9223372036854775808 bytes"`` :)
I'm joking, but effectively I'm just worried to do not break everything
adding here new behaviours that would break (too much) the expectations of any
piece of code that will request to write something on the network: maybe an
external job (running in the even loop) that will monitor the connections and
detecting the slowest ones (using the new methods I've added to monitor the
buffering state) applying the configured countermeasures could do the job,
leaving the ``NettyConnection`` simpler and with less responsabilities, wdyt?
Just thinking loud
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---