[
https://issues.apache.org/jira/browse/ARTEMIS-1025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15953158#comment-15953158
]
ASF GitHub Bot commented on ARTEMIS-1025:
-----------------------------------------
Github user franz1981 commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/1119#discussion_r109368909
--- Diff:
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
---
@@ -92,69 +86,143 @@ public NettyConnection(final Map<String, Object>
configuration,
this.listener = listener;
+ this.directDeliver = directDeliver;
+
this.batchingEnabled = batchingEnabled;
- this.directDeliver = directDeliver;
+ this.writeBufferHighWaterMark =
this.channel.config().getWriteBufferHighWaterMark();
+
+ this.batchLimit = batchingEnabled ?
Math.min(this.writeBufferHighWaterMark, DEFAULT_BATCH_BYTES) : 0;
+ }
+
+ private static void waitFor(ChannelPromise promise, long millis) {
+ try {
+ final boolean completed = promise.await(millis);
+ if (!completed) {
+ ActiveMQClientLogger.LOGGER.timeoutFlushingPacket();
+ }
+ } catch (InterruptedException e) {
+ throw new ActiveMQInterruptedException(e);
+ }
+ }
+
+ /**
+ * Returns an estimation of the current size of the write buffer in the
channel.
+ * To obtain a more precise value is necessary to use the unsafe API of
the channel to
+ * call the {@link
io.netty.channel.ChannelOutboundBuffer#totalPendingWriteBytes()}.
+ * Anyway, both these values are subject to concurrent modifications.
+ */
+ private static int batchBufferSize(Channel channel, int
writeBufferHighWaterMark) {
+ //Channel::bytesBeforeUnwritable is performing a volatile load
+ //this is the reason why writeBufferHighWaterMark is passed as an
argument
+ final int bytesBeforeUnwritable = (int)
channel.bytesBeforeUnwritable();
+ assert bytesBeforeUnwritable >= 0;
+ final int writtenBytes = writeBufferHighWaterMark -
bytesBeforeUnwritable;
+ assert writtenBytes >= 0;
+ return writtenBytes;
+ }
+
+ /**
+ * When batching is not enabled, it tries to back-pressure the caller
thread.
+ * The back-pressure provided is not before the writeAndFlush request,
buf after it: too many threads that are not
+ * using {@link Channel#isWritable} to know when push unbatched data
will risk to cause OOM due to the enqueue of each own {@link
Channel#writeAndFlush} requests.
+ * Trying to provide back-pressure before the {@link
Channel#writeAndFlush} request could work, but in certain scenarios it will
block {@link Channel#isWritable} to be true.
+ */
+ private static ChannelFuture backPressuredWriteAndFlush(final ByteBuf
bytes,
+ final int
readableBytes,
+ final Channel
channel,
+ final
ChannelPromise promise) {
+ final ChannelFuture future;
+ if (!channel.isWritable()) {
+ final ChannelPromise channelPromise = promise.isVoid() ?
channel.newPromise() : promise;
+ future = channel.writeAndFlush(bytes, channelPromise);
+ //is the channel is not writable wait the current request to be
flushed, providing backpressuring on the caller thread
+ if (!channel.isWritable() &&
!future.awaitUninterruptibly(DEFAULT_BACK_PRESSURE_WAIT_MILLIS)) {
+ if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
--- End diff --
> on that case, we never block inside the NettyConnector... we verify
isWritable().. and stop writing..
I'm not 100 % sure it could work: it is counterintuitive, but not all the
pending writes are counted in the write buffer of Netty and having that buffer
writable doesn't mean that you can't go OOM :(
We'll talk about it tomorrow anyway :+1:
> OutOfDirectMemoryError raised from Netty
> ----------------------------------------
>
> Key: ARTEMIS-1025
> URL: https://issues.apache.org/jira/browse/ARTEMIS-1025
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Components: Broker
> Reporter: Francesco Nigro
> Assignee: Francesco Nigro
>
> If you send and receive a lot of messages in short time to Artemis via Netty
> connector, the OutOfDirectMemoryError exception is thrown from the client.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)