fixing paging & flow control on AMQP
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e85f755a Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e85f755a Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e85f755a Branch: refs/heads/artemis-1009 Commit: e85f755a0d128b4338d1c2c5911b42a35bc35787 Parents: 46be6a2 Author: Clebert Suconic <[email protected]> Authored: Tue Feb 28 21:01:05 2017 -0500 Committer: Clebert Suconic <[email protected]> Committed: Thu Mar 2 10:05:21 2017 -0500 ---------------------------------------------------------------------- .../activemq/artemis/api/core/Message.java | 6 ++++ .../artemis/core/message/impl/CoreMessage.java | 14 -------- .../protocol/amqp/broker/AMQPMessage.java | 35 ++++++++++++++++---- 3 files changed, 34 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e85f755a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index 4a5381c..b266279 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -77,6 +77,12 @@ import org.apache.activemq.artemis.core.persistence.Persister; */ public interface Message { + // This is an estimate of how much memory a Message takes up, exclusing body and properties + // Note, it is only an estimate, it's not possible to be entirely sure with Java + // This figure is calculated using the test utilities in org.apache.activemq.tests.unit.util.sizeof + // The value is somewhat higher on 64 bit architectures, probably due to different alignment + int memoryOffset = 352; + SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_AMQ_ROUTE_TO"); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e85f755a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index fd09751..edbcaa9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -45,15 +45,8 @@ public class CoreMessage extends RefCountMessage { public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE; - // This is an estimate of how much memory a ServerMessageImpl takes up, exclusing body and properties - // Note, it is only an estimate, it's not possible to be entirely sure with Java - // This figure is calculated using the test utilities in org.apache.activemq.tests.unit.util.sizeof - // The value is somewhat higher on 64 bit architectures, probably due to different alignment - private static final int memoryOffset = 352; - private volatile int memoryEstimate = -1; - private static final Logger logger = Logger.getLogger(CoreMessage.class); // There's an integer with the number of bytes for the body @@ -351,16 +344,9 @@ public class CoreMessage extends RefCountMessage { return this.properties; } - @Override public int getMemoryEstimate() { if (memoryEstimate == -1) { - if (buffer == null) { - new Exception("It is null").printStackTrace(); - } - if (properties == null) { - new Exception("Properties It is null").printStackTrace(); - } memoryEstimate = memoryOffset + (buffer != null ? buffer.capacity() : 0) + (properties != null ? properties.getMemoryOffset() : 0); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e85f755a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index ee2f870..c530c94 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.message.LargeBodyEncoder; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.utils.DataConstants; +import org.apache.qpid.proton.amqp.UnsignedInteger; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; import org.apache.qpid.proton.amqp.messaging.Header; @@ -48,6 +49,8 @@ import org.apache.qpid.proton.util.TLSEncoder; // see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format public class AMQPMessage extends RefCountMessage { + private volatile int memoryEstimate = -1; + final long messageFormat; private ProtonProtocolManager protocolManager; ByteBuf data; @@ -192,6 +195,9 @@ public class AMQPMessage extends RefCountMessage { } else { section = null; } + } else { + // meaning there is no header + headerEnd = 0; } if (!readApplicationProperties) { @@ -257,27 +263,26 @@ public class AMQPMessage extends RefCountMessage { this.data = null; } - // TODO-now this only make sense on Core @Override public ActiveMQBuffer getBodyBuffer() { + // NO-IMPL return null; } - // TODO-now this only make sense on Core @Override public ActiveMQBuffer getReadOnlyBodyBuffer() { + // NO-IMPL return null; } - // TODO: Refactor Large message @Override public LargeBodyEncoder getBodyEncoder() throws ActiveMQException { + // NO-IMPL return null; } @Override public byte getType() { - // TODO-now: what to do here? return type; } @@ -309,7 +314,6 @@ public class AMQPMessage extends RefCountMessage { @Override public org.apache.activemq.artemis.api.core.Message copy() { - // TODO-now: what to do with this? AMQPMessage newEncode = new AMQPMessage(this.messageFormat, data.array(), protocolManager); return newEncode; } @@ -471,7 +475,19 @@ public class AMQPMessage extends RefCountMessage { // I would send a new instance of Header with a new delivery count, and only send partial of the buffer // previously received checkBuffer(); - buffer.writeBytes(data); + Header header = getHeader(); + if (header != null) { + synchronized (header) { + if (header.getDeliveryCount() != null) { + header.setDeliveryCount(UnsignedInteger.valueOf(header.getDeliveryCount().intValue() + 1)); + } else { + header.setDeliveryCount(UnsignedInteger.valueOf(1)); + } + TLSEncoder.getEncoder().setByteBuffer(new NettyWritable(buffer)); + TLSEncoder.getEncoder().writeObject(header); + } + } + buffer.writeBytes(data, headerEnd, data.writerIndex() - headerEnd); } @Override @@ -728,7 +744,12 @@ public class AMQPMessage extends RefCountMessage { @Override public int getMemoryEstimate() { - return 0; + if (memoryEstimate == -1) { + memoryEstimate = memoryOffset + + (data != null ? data.capacity() : 0); + } + + return memoryEstimate; } @Override
