Repository: activemq-artemis Updated Branches: refs/heads/master f798178c6 -> 2bcc255f4
ARTEMIS-1036 Streaming huge messages would cause OME Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/759d3b78 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/759d3b78 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/759d3b78 Branch: refs/heads/master Commit: 759d3b78d98001e3709fdca94676d021909eb328 Parents: f798178 Author: Francesco Nigro <[email protected]> Authored: Wed Mar 15 16:59:57 2017 +0100 Committer: Clebert Suconic <[email protected]> Committed: Thu Mar 23 10:42:44 2017 -0400 ---------------------------------------------------------------------- .../wireformat/SessionContinuationMessage.java | 29 +++++++++++++++++++- .../SessionReceiveContinuationMessage.java | 9 +++++- .../SessionSendContinuationMessage.java | 8 +++++- 3 files changed, 43 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/759d3b78/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java index fcdd943..a57cdb4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java @@ -18,8 +18,11 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import java.util.Arrays; +import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.DataConstants; public abstract class SessionContinuationMessage extends PacketImpl { @@ -61,6 +64,30 @@ public abstract class SessionContinuationMessage extends PacketImpl { return continues; } + /** + * Returns the exact expected encoded size of {@code this} packet. + * It will be used to allocate the proper encoding buffer in {@link #createPacket}, hence any + * wrong value will result in a thrown exception or a resize of the encoding + * buffer during the encoding process, depending to the implementation of {@link #createPacket}. + * Any child of {@code this} class are required to override this method if their encoded size is changed + * from the base class. + * + * @return the size in bytes of the expected encoded packet + */ + protected int expectedEncodedSize() { + return SESSION_CONTINUATION_BASE_SIZE + (body == null ? 0 : body.length); + } + + @Override + protected final ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) { + final int expectedEncodedSize = expectedEncodedSize(); + if (connection == null) { + return new ChannelBufferWrapper(Unpooled.buffer(expectedEncodedSize)); + } else { + return connection.createTransportBuffer(expectedEncodedSize, usePooled); + } + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeInt(body.length); @@ -110,4 +137,4 @@ public abstract class SessionContinuationMessage extends PacketImpl { return true; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/759d3b78/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java index 9141ae1..44ad1bb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java @@ -67,6 +67,13 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag return consumerID; } + // Protected ----------------------------------------------------- + + @Override + protected final int expectedEncodedSize() { + return super.expectedEncodedSize() + DataConstants.SIZE_LONG; + } + // Public -------------------------------------------------------- @Override @@ -121,4 +128,4 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag return true; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/759d3b78/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java index 0ecfe33..1c600e9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; +import org.apache.activemq.artemis.utils.DataConstants; /** * A SessionSendContinuationMessage<br> @@ -92,6 +93,11 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { } @Override + protected final int expectedEncodedSize() { + return super.expectedEncodedSize() + (!continues ? DataConstants.SIZE_LONG : 0) + DataConstants.SIZE_BOOLEAN; + } + + @Override public void encodeRest(final ActiveMQBuffer buffer) { super.encodeRest(buffer); if (!continues) { @@ -154,4 +160,4 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { public SendAcknowledgementHandler getHandler() { return handler; } -} +} \ No newline at end of file
