[ 
https://issues.apache.org/jira/browse/AMQ-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stirling Chow updated AMQ-4116:
-------------------------------

    Description: 
Reproduction
============
Using VMTransport:

1. Produce a message on queue A and verify that queue A's memory usage increases
2. Consume the message from queue A and verify that queue A's memory usage 
decreases.
3. Resend the message to queue B.

Expected: Queue A's memory usage is not increased by the enqueue to queue B.
Actual: Queue A's memory usage increases and no memory usage increase occurs on 
queue B.

Symptom
=======
When messages are moved between queues using the VMTransport, they continue to 
contribute to the memory usage of the source queue rather than the destination 
queue.

The correct behaviour (memory usage decreases from queue A and increases in 
queue B) is exhibited by non-VMTransport (e.g., TCP).

Cause
=====
When the message is first sent to queue A, it's memoryUsage field is set to 
match queue A's:

{code:title=org.apache.activemq.broker.region.Queue}
public void send(final ProducerBrokerExchange producerExchange, final Message 
message) throws Exception {
    final ConnectionContext context = producerExchange.getConnectionContext();
    // There is delay between the client sending it and it arriving at the
    // destination.. it may have expired.
    message.setRegionDestination(this);
...
{code}

{code:title=org.apache.activemq.command.Message}
public void setRegionDestination(org.apache.activemq.broker.region.Destination 
destination) {
    this.regionDestination = destination;
    if(this.memoryUsage==null) {
        this.memoryUsage=regionDestination.getMemoryUsage();
    }
}
{code}

As the message moves across the transport, it is copied along with the 
memoryUsage field:

{code:title=org.apache.activemq.command.Message}
protected void copy(Message copy) {
    super.copy(copy);
...
    copy.memoryUsage=this.memoryUsage;
...
{code}

When the message is sent to the second queue, memoryUsage is non-null, so 
setRegionDestination(...) does not update memoryUsage to reflect the new 
destination queue.

When the destination queue accepts the message, the memoryUsage of the source 
queue is (incorrectly) increased:

{code:title=org.apache.activemq.command.Message}
public int incrementReferenceCount() {
    int rc;
    int size;
    synchronized (this) {
        rc = ++referenceCount;
        size = getSize();
    }

    if (rc == 1 && getMemoryUsage() != null) {
        getMemoryUsage().increaseUsage(size);
{code}

This mal-behaviour is not exhibited by other transports since they serialize 
Message and memoryUsage is transient.  As a result, the call to 
setRegionDestination(...) will properly update memoryUsage when the message 
arrives at the destination queue.

Solution
========
There are a number of possible solutions, any of which would correct the 
behaviour (although I am unsure what side-effects they may have on other 
behaviour):

1. It seems odd that memoryUsage is copied when Message is copied.  If 
Message.copy(...) is used as a shortcut to avoid serialization/deserialization 
on VMTransport, then it should have the same semantics and avoid copying 
transient fields.

2. It seems odd that setRegionDestination(...) would not always set the 
memoryUsage to match the destination's memoryUsage.

3. ActiveMQConnection has a comment regarding concessions made for messages 
transmitted by the VM transport:

{code:title=org.apache.activemq.ActiveMQConnection}
public void onCommand(final Object o) {
    final Command command = (Command)o;
    if (!closed.get() && command != null) {
        try {
            command.visit(new CommandVisitorAdapter() {
                @Override
                public Response processMessageDispatch(MessageDispatch md) 
throws Exception {
                    waitForTransportInterruptionProcessingToComplete();
                    ActiveMQDispatcher dispatcher = 
dispatchers.get(md.getConsumerId());
                    if (dispatcher != null) {
                        // Copy in case a embedded broker is dispatching via
                        // vm://
                        // md.getMessage() == null to signal end of queue
                        // browse.
                        Message msg = md.getMessage();
                        if (msg != null) {
                            msg = msg.copy();
                            msg.setReadOnlyBody(true);
                            msg.setReadOnlyProperties(true);
                            msg.setRedeliveryCounter(md.getRedeliveryCounter());
                            msg.setConnection(ActiveMQConnection.this);
                            md.setMessage(msg);
                        }
                        dispatcher.dispatch(md);
                    }
                    return null;
                }
{code}

Adding a call to msg.setMemoryUsage(null) would address this bug.

The latter appears to be the least intrusive, although it will only address the 
case of VMTransport messages moving between producers/consumers. Queue contains 
shortcut methods for moving messages between queues (e.g., copyMessageTo).  I 
have not verified if these methods exhibit the same behaviour re: memory usage, 
but if so, they would not be addressed by patching ActiveMQConnection.

Our main concern is with the reported use case, so I've attached a patch for 
ActiveMQConnection and unit test to demonstrate the behaviour.

  was:
Reproduction
============
Using VMTransport:

1. Produce a message on queue A and verify that queue A's memory usage increases
2. Consume the message from queue A and verify that queue A's memory usage 
decreases.
3. Resend the message to queue B.

Expected: Queue A's memory usage is not increased by the enqueue to queue B.
Actual: Queue A's memory usage increases and no memory usage increase occurs on 
queue B.

Symptom
=======
When messages are moved between queues using the VMTransport, they continue to 
contribute to the memory usage of the source queue rather than the destination 
queue.

The correct behaviour (memory usage decreases from queue A and increases in 
queue B) is exhibited by non-VMTransport (e.g., TCP).

Cause
=====
When the message is first sent to queue A, it's memoryUsage field is set to 
match queue A's:

{code:title=org.apache.activemq.broker.region.Queue}
public void send(final ProducerBrokerExchange producerExchange, final Message 
message) throws Exception {
    final ConnectionContext context = producerExchange.getConnectionContext();
    // There is delay between the client sending it and it arriving at the
    // destination.. it may have expired.
    message.setRegionDestination(this);
...
{code}

{code:title=org.apache.activemq.command.Message}
public void setRegionDestination(org.apache.activemq.broker.region.Destination 
destination) {
    this.regionDestination = destination;
    if(this.memoryUsage==null) {
        this.memoryUsage=regionDestination.getMemoryUsage();
    }
}
{code}

As the message moves across the transport, it is copied along with the 
memoryUsage field:

{code:title=org.apache.activemq.command.Message}
protected void copy(Message copy) {
    super.copy(copy);
...
    copy.memoryUsage=this.memoryUsage;
...
{code}

When the message is sent to the second queue, memoryUsage is non-null, so 
setRegionDestination(...) does not update memoryUsage to reflect the new 
destination queue.

When the destination queue accepts the message, the memoryUsage of the source 
queue is (incorrectly) increased:

{code:title=org.apache.activemq.command.Message}
public int incrementReferenceCount() {
    int rc;
    int size;
    synchronized (this) {
        rc = ++referenceCount;
        size = getSize();
    }

    if (rc == 1 && getMemoryUsage() != null) {
        getMemoryUsage().increaseUsage(size);
{code}

This mal-behaviour is not exhibited by other transports since they serialize 
Message and memoryUsage is transient.  As a result, the call to 
setRegionDestination(...) will properly update memoryUsage when the message 
arrives at the destination queue.

Solution
========
There are a number of possible solutions, any of which would correct the 
behaviour (although I am unsure what side-effects they may have on other 
behaviour):

1. It seems odd that memoryUsage is copied when Message is copied.  If 
Message.copy(...) is used as a shortcut to avoid serialization/deserialization 
on VMTransport, then it should have the same semantics and avoid copying 
transient fields.

2. It seems odd that setRegionDestination(...) would not always set the 
memoryUsage to match the destination's memoryUsage.

3. ActiveMQConnection has a comment regarding concessions made for messages 
transmitted by the VM transport:

{code:org.apache.activemq.ActiveMQConnection}
public void onCommand(final Object o) {
    final Command command = (Command)o;
    if (!closed.get() && command != null) {
        try {
            command.visit(new CommandVisitorAdapter() {
                @Override
                public Response processMessageDispatch(MessageDispatch md) 
throws Exception {
                    waitForTransportInterruptionProcessingToComplete();
                    ActiveMQDispatcher dispatcher = 
dispatchers.get(md.getConsumerId());
                    if (dispatcher != null) {
                        // Copy in case a embedded broker is dispatching via
                        // vm://
                        // md.getMessage() == null to signal end of queue
                        // browse.
                        Message msg = md.getMessage();
                        if (msg != null) {
                            msg = msg.copy();
                            msg.setReadOnlyBody(true);
                            msg.setReadOnlyProperties(true);
                            msg.setRedeliveryCounter(md.getRedeliveryCounter());
                            msg.setConnection(ActiveMQConnection.this);
                            md.setMessage(msg);
                        }
                        dispatcher.dispatch(md);
                    }
                    return null;
                }
{code}

Adding a call to msg.setMemoryUsage(null) would address this bug.

The latter appears to be the least intrusive, although it will only address the 
case of VMTransport messages moving between producers/consumers. Queue contains 
shortcut methods for moving messages between queues (e.g., copyMessageTo).  I 
have not verified if these methods exhibit the same behaviour re: memory usage, 
but if so, they would not be addressed by patching ActiveMQConnection.

Our main concern is with the reported use case, so I've attached a patch for 
ActiveMQConnection and unit test to demonstrate the behaviour.

    
> Memory usage is not cleared from the source queue when a message is moved to 
> another queue over the VMTransport
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4116
>                 URL: https://issues.apache.org/jira/browse/AMQ-4116
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.7.0
>            Reporter: Stirling Chow
>
> Reproduction
> ============
> Using VMTransport:
> 1. Produce a message on queue A and verify that queue A's memory usage 
> increases
> 2. Consume the message from queue A and verify that queue A's memory usage 
> decreases.
> 3. Resend the message to queue B.
> Expected: Queue A's memory usage is not increased by the enqueue to queue B.
> Actual: Queue A's memory usage increases and no memory usage increase occurs 
> on queue B.
> Symptom
> =======
> When messages are moved between queues using the VMTransport, they continue 
> to contribute to the memory usage of the source queue rather than the 
> destination queue.
> The correct behaviour (memory usage decreases from queue A and increases in 
> queue B) is exhibited by non-VMTransport (e.g., TCP).
> Cause
> =====
> When the message is first sent to queue A, it's memoryUsage field is set to 
> match queue A's:
> {code:title=org.apache.activemq.broker.region.Queue}
> public void send(final ProducerBrokerExchange producerExchange, final Message 
> message) throws Exception {
>     final ConnectionContext context = producerExchange.getConnectionContext();
>     // There is delay between the client sending it and it arriving at the
>     // destination.. it may have expired.
>     message.setRegionDestination(this);
> ...
> {code}
> {code:title=org.apache.activemq.command.Message}
> public void 
> setRegionDestination(org.apache.activemq.broker.region.Destination 
> destination) {
>     this.regionDestination = destination;
>     if(this.memoryUsage==null) {
>         this.memoryUsage=regionDestination.getMemoryUsage();
>     }
> }
> {code}
> As the message moves across the transport, it is copied along with the 
> memoryUsage field:
> {code:title=org.apache.activemq.command.Message}
> protected void copy(Message copy) {
>     super.copy(copy);
> ...
>     copy.memoryUsage=this.memoryUsage;
> ...
> {code}
> When the message is sent to the second queue, memoryUsage is non-null, so 
> setRegionDestination(...) does not update memoryUsage to reflect the new 
> destination queue.
> When the destination queue accepts the message, the memoryUsage of the source 
> queue is (incorrectly) increased:
> {code:title=org.apache.activemq.command.Message}
> public int incrementReferenceCount() {
>     int rc;
>     int size;
>     synchronized (this) {
>         rc = ++referenceCount;
>         size = getSize();
>     }
>     if (rc == 1 && getMemoryUsage() != null) {
>         getMemoryUsage().increaseUsage(size);
> {code}
> This mal-behaviour is not exhibited by other transports since they serialize 
> Message and memoryUsage is transient.  As a result, the call to 
> setRegionDestination(...) will properly update memoryUsage when the message 
> arrives at the destination queue.
> Solution
> ========
> There are a number of possible solutions, any of which would correct the 
> behaviour (although I am unsure what side-effects they may have on other 
> behaviour):
> 1. It seems odd that memoryUsage is copied when Message is copied.  If 
> Message.copy(...) is used as a shortcut to avoid 
> serialization/deserialization on VMTransport, then it should have the same 
> semantics and avoid copying transient fields.
> 2. It seems odd that setRegionDestination(...) would not always set the 
> memoryUsage to match the destination's memoryUsage.
> 3. ActiveMQConnection has a comment regarding concessions made for messages 
> transmitted by the VM transport:
> {code:title=org.apache.activemq.ActiveMQConnection}
> public void onCommand(final Object o) {
>     final Command command = (Command)o;
>     if (!closed.get() && command != null) {
>         try {
>             command.visit(new CommandVisitorAdapter() {
>                 @Override
>                 public Response processMessageDispatch(MessageDispatch md) 
> throws Exception {
>                     waitForTransportInterruptionProcessingToComplete();
>                     ActiveMQDispatcher dispatcher = 
> dispatchers.get(md.getConsumerId());
>                     if (dispatcher != null) {
>                         // Copy in case a embedded broker is dispatching via
>                         // vm://
>                         // md.getMessage() == null to signal end of queue
>                         // browse.
>                         Message msg = md.getMessage();
>                         if (msg != null) {
>                             msg = msg.copy();
>                             msg.setReadOnlyBody(true);
>                             msg.setReadOnlyProperties(true);
>                             
> msg.setRedeliveryCounter(md.getRedeliveryCounter());
>                             msg.setConnection(ActiveMQConnection.this);
>                             md.setMessage(msg);
>                         }
>                         dispatcher.dispatch(md);
>                     }
>                     return null;
>                 }
> {code}
> Adding a call to msg.setMemoryUsage(null) would address this bug.
> The latter appears to be the least intrusive, although it will only address 
> the case of VMTransport messages moving between producers/consumers. Queue 
> contains shortcut methods for moving messages between queues (e.g., 
> copyMessageTo).  I have not verified if these methods exhibit the same 
> behaviour re: memory usage, but if so, they would not be addressed by 
> patching ActiveMQConnection.
> Our main concern is with the reported use case, so I've attached a patch for 
> ActiveMQConnection and unit test to demonstrate the behaviour.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to