Stirling Chow created AMQ-4116:
----------------------------------
Summary: Memory usage is not cleared from the source queue when a
message is moved to another queue over the VMTransport
Key: AMQ-4116
URL: https://issues.apache.org/jira/browse/AMQ-4116
Project: ActiveMQ
Issue Type: Bug
Components: Transport
Affects Versions: 5.7.0
Reporter: Stirling Chow
Reproduction
============
Using VMTransport:
1. Produce a message on queue A and verify that queue A's memory usage increases
2. Consume the message from queue A and verify that queue A's memory usage
decreases.
3. Resend the message to queue B.
Expected: Queue A's memory usage is not increased by the enqueue to queue B.
Actual: Queue A's memory usage increases and no memory usage increase occurs on
queue B.
Symptom
=======
When messages are moved between queues using the VMTransport, they continue to
contribute to the memory usage of the source queue rather than the destination
queue.
The correct behaviour (memory usage decreases from queue A and increases in
queue B) is exhibited by non-VMTransport (e.g., TCP).
Cause
=====
When the message is first sent to queue A, it's memoryUsage field is set to
match queue A's:
{code:title=org.apache.activemq.broker.region.Queue}
public void send(final ProducerBrokerExchange producerExchange, final Message
message) throws Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
message.setRegionDestination(this);
...
{code}
{code:title=org.apache.activemq.command.Message}
public void setRegionDestination(org.apache.activemq.broker.region.Destination
destination) {
this.regionDestination = destination;
if(this.memoryUsage==null) {
this.memoryUsage=regionDestination.getMemoryUsage();
}
}
{code}
As the message moves across the transport, it is copied along with the
memoryUsage field:
{code:title=org.apache.activemq.command.Message}
protected void copy(Message copy) {
super.copy(copy);
...
copy.memoryUsage=this.memoryUsage;
...
{code}
When the message is sent to the second queue, memoryUsage is non-null, so
setRegionDestination(...) does not update memoryUsage to reflect the new
destination queue.
When the destination queue accepts the message, the memoryUsage of the source
queue is (incorrectly) increased:
{code:title=org.apache.activemq.command.Message}
public int incrementReferenceCount() {
int rc;
int size;
synchronized (this) {
rc = ++referenceCount;
size = getSize();
}
if (rc == 1 && getMemoryUsage() != null) {
getMemoryUsage().increaseUsage(size);
{code}
This mal-behaviour is not exhibited by other transports since they serialize
Message and memoryUsage is transient. As a result, the call to
setRegionDestination(...) will properly update memoryUsage when the message
arrives at the destination queue.
Solution
========
There are a number of possible solutions, any of which would correct the
behaviour (although I am unsure what side-effects they may have on other
behaviour):
1. It seems odd that memoryUsage is copied when Message is copied. If
Message.copy(...) is used as a shortcut to avoid serialization/deserialization
on VMTransport, then it should have the same semantics and avoid copying
transient fields.
2. It seems odd that setRegionDestination(...) would not always set the
memoryUsage to match the destination's memoryUsage.
3. ActiveMQConnection has a comment regarding concessions made for messages
transmitted by the VM transport:
{code:org.apache.activemq.ActiveMQConnection}
public void onCommand(final Object o) {
final Command command = (Command)o;
if (!closed.get() && command != null) {
try {
command.visit(new CommandVisitorAdapter() {
@Override
public Response processMessageDispatch(MessageDispatch md)
throws Exception {
waitForTransportInterruptionProcessingToComplete();
ActiveMQDispatcher dispatcher =
dispatchers.get(md.getConsumerId());
if (dispatcher != null) {
// Copy in case a embedded broker is dispatching via
// vm://
// md.getMessage() == null to signal end of queue
// browse.
Message msg = md.getMessage();
if (msg != null) {
msg = msg.copy();
msg.setReadOnlyBody(true);
msg.setReadOnlyProperties(true);
msg.setRedeliveryCounter(md.getRedeliveryCounter());
msg.setConnection(ActiveMQConnection.this);
md.setMessage(msg);
}
dispatcher.dispatch(md);
}
return null;
}
{code}
Adding a call to msg.setMemoryUsage(null) would address this bug.
The latter appears to be the least intrusive, although it will only address the
case of VMTransport messages moving between producers/consumers. Queue contains
shortcut methods for moving messages between queues (e.g., copyMessageTo). I
have not verified if these methods exhibit the same behaviour re: memory usage,
but if so, they would not be addressed by patching ActiveMQConnection.
Our main concern is with the reported use case, so I've attached a patch for
ActiveMQConnection and unit test to demonstrate the behaviour.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira