Repository: activemq-artemis Updated Branches: refs/heads/master c998e9a44 -> 3df29ca7f
ARTEMIS-211 Memory leak using wildcard topic Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ea9609c5 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ea9609c5 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ea9609c5 Branch: refs/heads/master Commit: ea9609c579839a4dedad634c277940a241d1b03d Parents: c998e9a Author: jbertram <[email protected]> Authored: Wed Aug 26 14:12:17 2015 -0500 Committer: jbertram <[email protected]> Committed: Wed Sep 2 15:47:00 2015 -0500 ---------------------------------------------------------------------- .../core/postoffice/impl/BindingsImpl.java | 44 ++++++++------- .../jms/client/TopicCleanupTest.java | 58 +++++++++++++++++++- 2 files changed, 80 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea9609c5/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index 8b21396..4719971 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -109,7 +109,9 @@ public final class BindingsImpl implements Bindings { } } - bindings.add(binding); + if (!bindings.contains(binding)) { + bindings.add(binding); + } } bindingsMap.put(binding.getID(), binding); @@ -141,7 +143,7 @@ public final class BindingsImpl implements Bindings { bindingsMap.remove(binding.getID()); if (isTrace) { - ActiveMQServerLogger.LOGGER.trace("Removing binding " + binding + " into " + this + " bindingTable: " + debugBindings()); + ActiveMQServerLogger.LOGGER.trace("Removing binding " + binding + " from " + this + " bindingTable: " + debugBindings()); } } @@ -509,54 +511,51 @@ public final class BindingsImpl implements Bindings { PrintWriter out = new PrintWriter(writer); - out.println("\n***************************************"); + out.println("\n**************************************************"); out.println("routingNameBindingMap:"); if (routingNameBindingMap.isEmpty()) { - out.println("EMPTY!"); + out.println("\tEMPTY!"); } for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) { - out.print("key=" + entry.getKey() + ", value=" + entry.getValue()); - // for (Binding bind : entry.getValue()) - // { - // out.print(bind + ","); - // } + out.println("\tkey=" + entry.getKey() + ", value(s):"); + for (Binding bind : entry.getValue()) { + out.println("\t\t" + bind); + } out.println(); } - out.println(); - - out.println("RoutingNamePositions:"); + out.println("routingNamePositions:"); if (routingNamePositions.isEmpty()) { - out.println("EMPTY!"); + out.println("\tEMPTY!"); } for (Map.Entry<SimpleString, Integer> entry : routingNamePositions.entrySet()) { - out.println("key=" + entry.getKey() + ", value=" + entry.getValue()); + out.println("\tkey=" + entry.getKey() + ", value=" + entry.getValue()); } out.println(); - out.println("BindingsMap:"); + out.println("bindingsMap:"); if (bindingsMap.isEmpty()) { - out.println("EMPTY!"); + out.println("\tEMPTY!"); } for (Map.Entry<Long, Binding> entry : bindingsMap.entrySet()) { - out.println("Key=" + entry.getKey() + ", value=" + entry.getValue()); + out.println("\tkey=" + entry.getKey() + ", value=" + entry.getValue()); } out.println(); - out.println("ExclusiveBindings:"); + out.println("exclusiveBindings:"); if (exclusiveBindings.isEmpty()) { - out.println("EMPTY!"); + out.println("\tEMPTY!"); } for (Binding binding : exclusiveBindings) { - out.println(binding); + out.println("\t" + binding); } - out.println("#####################################################"); + out.println("####################################################"); return writer.toString(); } @@ -606,4 +605,7 @@ public final class BindingsImpl implements Bindings { return pos; } + public Map<SimpleString, List<Binding>> getRoutingNameBindingMap() { + return routingNameBindingMap; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea9609c5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java index 1074680..084060f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java @@ -22,15 +22,22 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; +import java.util.List; +import java.util.Map; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.filter.impl.FilterImpl; import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.Bindings; +import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.QueueImpl; +import org.apache.activemq.artemis.jms.client.ActiveMQTopic; +import org.apache.activemq.artemis.tests.util.JMSTestBase; import org.junit.Test; /** @@ -97,4 +104,53 @@ public class TopicCleanupTest extends JMSTestBase { } + @Test + public void testWildcardSubscriber() throws Exception { + ActiveMQTopic topic = (ActiveMQTopic) createTopic("topic.A"); + Connection conn = cf.createConnection(); + conn.start(); + + try { + Session consumerStarSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumerStar = consumerStarSession.createConsumer(ActiveMQJMSClient.createTopic("topic.*")); + + Session consumerASession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumerA = consumerASession.createConsumer(topic); + + Session producerSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producerA = producerSession.createProducer(topic); + TextMessage msg1 = producerSession.createTextMessage("text"); + producerA.send(msg1); + + consumerStar.close(); + consumerA.close(); + + producerA.send(msg1); + + conn.close(); + + boolean foundStrayRoutingBinding = false; + Bindings bindings = server.getPostOffice().getBindingsForAddress(new SimpleString(((ActiveMQTopic) topic).getAddress())); + Map<SimpleString, List<Binding>> routingNames = ((BindingsImpl) bindings).getRoutingNameBindingMap(); + for (SimpleString key : routingNames.keySet()) { + if (!key.toString().equals(topic.getAddress())) { + foundStrayRoutingBinding = true; + assertEquals(0, ((LocalQueueBinding) routingNames.get(key).get(0)).getQueue().getMessageCount()); + } + } + + assertFalse(foundStrayRoutingBinding); + } + finally { + jmsServer.stop(); + + jmsServer.start(); + + try { + conn.close(); + } + catch (Throwable igonred) { + } + } + } }
