[
https://issues.apache.org/jira/browse/AMQ-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stirling Chow updated AMQ-4116:
-------------------------------
Attachment: AMQ4116Test.java
Unit test demonstrating bug on VMTransport and how TCP transport does not
exhibit bug.
> Memory usage is not cleared from the source queue when a message is moved to
> another queue over the VMTransport
> ---------------------------------------------------------------------------------------------------------------
>
> Key: AMQ-4116
> URL: https://issues.apache.org/jira/browse/AMQ-4116
> Project: ActiveMQ
> Issue Type: Bug
> Components: Transport
> Affects Versions: 5.7.0
> Reporter: Stirling Chow
> Attachments: AMQ4116Test.java
>
>
> Reproduction
> ============
> Using VMTransport:
> 1. Produce a message on queue A and verify that queue A's memory usage
> increases
> 2. Consume the message from queue A and verify that queue A's memory usage
> decreases.
> 3. Resend the message to queue B.
> Expected: Queue A's memory usage is not increased by the enqueue to queue B.
> Actual: Queue A's memory usage increases and no memory usage increase occurs
> on queue B.
> Symptom
> =======
> When messages are moved between queues using the VMTransport, they continue
> to contribute to the memory usage of the source queue rather than the
> destination queue.
> The correct behaviour (memory usage decreases from queue A and increases in
> queue B) is exhibited by non-VMTransport (e.g., TCP).
> Cause
> =====
> When the message is first sent to queue A, it's memoryUsage field is set to
> match queue A's:
> {code:title=org.apache.activemq.broker.region.Queue}
> public void send(final ProducerBrokerExchange producerExchange, final Message
> message) throws Exception {
> final ConnectionContext context = producerExchange.getConnectionContext();
> // There is delay between the client sending it and it arriving at the
> // destination.. it may have expired.
> message.setRegionDestination(this);
> ...
> {code}
> {code:title=org.apache.activemq.command.Message}
> public void
> setRegionDestination(org.apache.activemq.broker.region.Destination
> destination) {
> this.regionDestination = destination;
> if(this.memoryUsage==null) {
> this.memoryUsage=regionDestination.getMemoryUsage();
> }
> }
> {code}
> As the message moves across the transport, it is copied along with the
> memoryUsage field:
> {code:title=org.apache.activemq.command.Message}
> protected void copy(Message copy) {
> super.copy(copy);
> ...
> copy.memoryUsage=this.memoryUsage;
> ...
> {code}
> When the message is sent to the second queue, memoryUsage is non-null, so
> setRegionDestination(...) does not update memoryUsage to reflect the new
> destination queue.
> When the destination queue accepts the message, the memoryUsage of the source
> queue is (incorrectly) increased:
> {code:title=org.apache.activemq.command.Message}
> public int incrementReferenceCount() {
> int rc;
> int size;
> synchronized (this) {
> rc = ++referenceCount;
> size = getSize();
> }
> if (rc == 1 && getMemoryUsage() != null) {
> getMemoryUsage().increaseUsage(size);
> {code}
> This mal-behaviour is not exhibited by other transports since they serialize
> Message and memoryUsage is transient. As a result, the call to
> setRegionDestination(...) will properly update memoryUsage when the message
> arrives at the destination queue.
> Solution
> ========
> There are a number of possible solutions, any of which would correct the
> behaviour (although I am unsure what side-effects they may have on other
> behaviour):
> 1. It seems odd that memoryUsage is copied when Message is copied. If
> Message.copy(...) is used as a shortcut to avoid
> serialization/deserialization on VMTransport, then it should have the same
> semantics and avoid copying transient fields.
> 2. It seems odd that setRegionDestination(...) would not always set the
> memoryUsage to match the destination's memoryUsage.
> 3. ActiveMQConnection has a comment regarding concessions made for messages
> transmitted by the VM transport:
> {code:title=org.apache.activemq.ActiveMQConnection}
> public void onCommand(final Object o) {
> final Command command = (Command)o;
> if (!closed.get() && command != null) {
> try {
> command.visit(new CommandVisitorAdapter() {
> @Override
> public Response processMessageDispatch(MessageDispatch md)
> throws Exception {
> waitForTransportInterruptionProcessingToComplete();
> ActiveMQDispatcher dispatcher =
> dispatchers.get(md.getConsumerId());
> if (dispatcher != null) {
> // Copy in case a embedded broker is dispatching via
> // vm://
> // md.getMessage() == null to signal end of queue
> // browse.
> Message msg = md.getMessage();
> if (msg != null) {
> msg = msg.copy();
> msg.setReadOnlyBody(true);
> msg.setReadOnlyProperties(true);
>
> msg.setRedeliveryCounter(md.getRedeliveryCounter());
> msg.setConnection(ActiveMQConnection.this);
> md.setMessage(msg);
> }
> dispatcher.dispatch(md);
> }
> return null;
> }
> {code}
> Adding a call to msg.setMemoryUsage(null) would address this bug.
> The latter appears to be the least intrusive, although it will only address
> the case of VMTransport messages moving between producers/consumers. Queue
> contains shortcut methods for moving messages between queues (e.g.,
> copyMessageTo). I have not verified if these methods exhibit the same
> behaviour re: memory usage, but if so, they would not be addressed by
> patching ActiveMQConnection.
> Our main concern is with the reported use case, so I've attached a patch for
> ActiveMQConnection and unit test to demonstrate the behaviour.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira