Repository: activemq
Updated Branches:
  refs/heads/master a9f9d4a4d -> 406a34294


https://issues.apache.org/jira/browse/AMQ-6465

Properly decrementing the message reference count in
DemandForwardingBridgeSupport when messages supression is checked for
durable subscriptions


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/406a3429
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/406a3429
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/406a3429

Branch: refs/heads/master
Commit: 406a34294befca4ba2de4b728c86f64b96a94945
Parents: a9f9d4a
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Fri Oct 14 12:15:06 2016 -0400
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Fri Oct 14 12:16:07 2016 -0400

----------------------------------------------------------------------
 .../network/DemandForwardingBridgeSupport.java  |  3 +
 .../activemq/network/SimpleNetworkTest.java     | 89 ++++++++++++++++++++
 2 files changed, 92 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/406a3429/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 70449f0..b58259d 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -1158,6 +1158,9 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
             messageEvalContext.setMessageReference(md.getMessage());
             messageEvalContext.setDestination(md.getDestination());
             suppress = 
!sub.getNetworkBridgeFilter().matches(messageEvalContext);
+            //AMQ-6465 - Need to decrement the reference count after checking 
matches() as
+            //the call above will increment the reference count by 1
+            messageEvalContext.getMessageReference().decrementReferenceCount();
         }
         return suppress;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/406a3429/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
index 5d3376b..5c6b35a 100755
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
@@ -43,9 +43,11 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
 import org.apache.activemq.xbean.BrokerFactoryBean;
 import org.junit.After;
 import org.junit.Before;
@@ -198,6 +200,93 @@ public class SimpleNetworkTest {
         }));
     }
 
+    //Added for AMQ-6465 to make sure memory usage decreased back to 0 after 
messages are forwarded
+    //to the other broker
+    @Test(timeout = 60 * 1000)
+    public void testDurableTopicSubForwardMemoryUsage() throws Exception {
+        // create a remote durable consumer to create demand
+        MessageConsumer remoteConsumer = 
remoteSession.createDurableSubscriber(included, consumerName);
+        Thread.sleep(1000);
+
+        MessageProducer producer = localSession.createProducer(included);
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message test = localSession.createTextMessage("test-" + i);
+            producer.send(test);
+        }
+        Thread.sleep(1000);
+
+        //Make sure stats are set
+        assertEquals(MESSAGE_COUNT,
+                
localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount());
+
+        assertTrue(Wait.waitFor(new Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 
localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0;
+            }
+        }, 10000, 500));
+        remoteConsumer.close();
+    }
+
+    //Added for AMQ-6465 to make sure memory usage decreased back to 0 after 
messages are forwarded
+    //to the other broker
+    @Test(timeout = 60 * 1000)
+    public void testTopicSubForwardMemoryUsage() throws Exception {
+        // create a remote durable consumer to create demand
+        MessageConsumer remoteConsumer = 
remoteSession.createConsumer(included);
+        Thread.sleep(1000);
+
+        MessageProducer producer = localSession.createProducer(included);
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message test = localSession.createTextMessage("test-" + i);
+            producer.send(test);
+        }
+        Thread.sleep(1000);
+
+        //Make sure stats are set
+        assertEquals(MESSAGE_COUNT,
+                
localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount());
+
+        assertTrue(Wait.waitFor(new Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 
localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0;
+            }
+        }, 10000, 500));
+        remoteConsumer.close();
+    }
+
+    //Added for AMQ-6465 to make sure memory usage decreased back to 0 after 
messages are forwarded
+    //to the other broker
+    @Test(timeout = 60 * 1000)
+    public void testQueueSubForwardMemoryUsage() throws Exception {
+        ActiveMQQueue queue = new ActiveMQQueue("include.test.foo");
+        MessageConsumer remoteConsumer = remoteSession.createConsumer(queue);
+        Thread.sleep(1000);
+
+        MessageProducer producer = localSession.createProducer(queue);
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message test = localSession.createTextMessage("test-" + i);
+            producer.send(test);
+        }
+        Thread.sleep(1000);
+
+        //Make sure stats are set
+        assertEquals(MESSAGE_COUNT,
+                
localBroker.getDestination(queue).getDestinationStatistics().getForwards().getCount());
+
+        assertTrue(Wait.waitFor(new Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 
localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0;
+            }
+        }, 10000, 500));
+        remoteConsumer.close();
+    }
+
     @Test(timeout = 60 * 1000)
     public void testDurableStoreAndForward() throws Exception {
         // create a remote durable consumer

Reply via email to