Repository: activemq-artemis Updated Branches: refs/heads/master 73c79de8a -> 359592cf5
ARTEMIS-1098 Improve flow control while streaming large messages Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/da6b851c Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/da6b851c Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/da6b851c Branch: refs/heads/master Commit: da6b851c60329538f5f65ae83c9548c9bd0e40f9 Parents: 73c79de Author: Francesco Nigro <nigro....@gmail.com> Authored: Mon Apr 10 13:47:54 2017 +0200 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Mon Apr 10 13:58:33 2017 -0400 ---------------------------------------------------------------------- .../core/client/ActiveMQClientLogger.java | 5 ++ .../core/impl/ActiveMQSessionContext.java | 57 ++++++++++++-------- 2 files changed, 40 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/da6b851c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java index 0fe4a5a..748e508 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java @@ -310,6 +310,11 @@ public interface ActiveMQClientLogger extends BasicLogger { format = Message.Format.MESSAGE_FORMAT) void broadcastGroupBindError(String hostAndPort); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 212057, value = "Large Message Streaming is taking too long to flush on back pressure.", + format = Message.Format.MESSAGE_FORMAT) + void timeoutStreamingLargeMessage(); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 214000, value = "Failed to call onMessage", format = Message.Format.MESSAGE_FORMAT) void onMessageError(@Cause Throwable e); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/da6b851c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 6f92330..7799395 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -458,17 +459,7 @@ public class ActiveMQSessionContext extends SessionContext { byte[] chunk, int reconnectID, SendAcknowledgementHandler messageHandler) throws ActiveMQException { - final boolean requiresResponse = lastChunk && sendBlocking; - final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler); - - if (requiresResponse) { - // When sending it blocking, only the last chunk will be blocking. - sessionChannel.sendBlocking(chunkPacket, reconnectID, PacketImpl.NULL_RESPONSE); - } else { - sessionChannel.send(chunkPacket, reconnectID); - } - - return chunkPacket.getPacketSize(); + return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, messageHandler); } @Override @@ -478,17 +469,7 @@ public class ActiveMQSessionContext extends SessionContext { boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException { - final boolean requiresResponse = lastChunk && sendBlocking; - final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler); - - if (requiresResponse) { - // When sending it blocking, only the last chunk will be blocking. - sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE); - } else { - sessionChannel.send(chunkPacket); - } - - return chunkPacket.getPacketSize(); + return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, messageHandler); } @Override @@ -813,6 +794,38 @@ public class ActiveMQSessionContext extends SessionContext { } } + private static int sendSessionSendContinuationMessage(Channel channel, + Message msgI, + long messageBodySize, + boolean sendBlocking, + boolean lastChunk, + byte[] chunk, + SendAcknowledgementHandler messageHandler) throws ActiveMQException { + final boolean requiresResponse = lastChunk && sendBlocking; + final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler); + final int expectedEncodeSize = chunkPacket.expectedEncodeSize(); + //perform a weak form of flow control to avoid OOM on tight loops + final CoreRemotingConnection connection = channel.getConnection(); + final long blockingCallTimeoutMillis = Math.max(0, connection.getBlockingCallTimeout()); + final long startFlowControl = System.nanoTime(); + final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis); + if (!isWritable) { + final long endFlowControl = System.nanoTime(); + final long elapsedFlowControl = endFlowControl - startFlowControl; + final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl); + ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage(); + logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " + elapsedMillis + " ms on a not writable connection: [" + connection.getID() + "]"); + } + if (requiresResponse) { + // When sending it blocking, only the last chunk will be blocking. + channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE); + } else { + channel.send(chunkPacket); + } + return chunkPacket.getPacketSize(); + } + + class ClientSessionPacketHandler implements ChannelHandler { @Override