Repository: activemq Updated Branches: refs/heads/trunk 1abc95b6a -> 3afde7bac
https://issues.apache.org/jira/browse/AMQ-5346 need to call free on the amqp resources now to ensure their state data is all cleaned up. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3afde7ba Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3afde7ba Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3afde7ba Branch: refs/heads/trunk Commit: 3afde7bac7d6bf536f69abb25f5705eac9e968ae Parents: 1abc95b Author: Timothy Bish <[email protected]> Authored: Mon Sep 8 17:48:22 2014 -0400 Committer: Timothy Bish <[email protected]> Committed: Mon Sep 8 17:48:22 2014 -0400 ---------------------------------------------------------------------- .../transport/amqp/AmqpProtocolConverter.java | 5 +++-- .../activemq/transport/amqp/JMSClientTest.java | 21 +++++++++++--------- 2 files changed, 15 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/3afde7ba/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 186d790..71a4ad7 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -32,6 +32,7 @@ import javax.jms.InvalidSelectorException; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTempQueue; +import org.apache.activemq.command.ActiveMQTempTopic; import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionError; import org.apache.activemq.command.ConnectionId; @@ -53,7 +54,6 @@ import org.apache.activemq.command.SessionId; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.TransactionInfo; -import org.apache.activemq.command.ActiveMQTempTopic; import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IdGenerator; @@ -83,7 +83,6 @@ import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Event; -import org.apache.qpid.proton.engine.Event.Type.*; import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Sasl; @@ -310,6 +309,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } else if (remoteState == EndpointState.CLOSED) { ((AmqpDeliveryListener) link.getContext()).onClose(); link.close(); + link.free(); } } @@ -481,6 +481,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { session.setContext(null); } session.close(); + session.free(); } private void onLinkOpen(Link link) { http://git-wip-us.apache.org/repos/asf/activemq/blob/3afde7ba/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java index 479dd14..2122470 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java @@ -46,11 +46,9 @@ import org.apache.activemq.broker.jmx.ConnectorViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin; import org.apache.activemq.util.Wait; - import org.junit.After; import org.junit.Before; import org.junit.Test; - import org.objectweb.jtests.jms.framework.TestConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -284,7 +282,7 @@ public class JMSClientTest extends JMSClientTestSupport { assertEquals("hello + 9", ((TextMessage) msg).getText()); } } - + abstract class Testable implements Runnable { protected String msg; synchronized boolean passed() { @@ -307,8 +305,9 @@ public class JMSClientTest extends JMSClientTestSupport { producer.setDeliveryMode(DeliveryMode.PERSISTENT); final Message m = session.createTextMessage("Sample text"); - + Testable t = new Testable() { + @Override public synchronized void run() { try { for (int i = 0; i < 30; ++i) { @@ -329,7 +328,7 @@ public class JMSClientTest extends JMSClientTestSupport { //wait until we know that the producer was able to send a message producer.wait(10000); } - + stopBroker(); assertTrue(t.passed()); } @@ -341,8 +340,9 @@ public class JMSClientTest extends JMSClientTestSupport { final Queue queue = session.createQueue(getDestinationName()); connection.start(); - + Testable t = new Testable() { + @Override public synchronized void run() { try { for (int i = 0; i < 10; ++i) { @@ -400,6 +400,7 @@ public class JMSClientTest extends JMSClientTestSupport { final MessageConsumer consumer=session.createConsumer(queue); Testable t = new Testable() { + @Override public synchronized void run() { try { for (int i = 0; i < 10; ++i) { @@ -414,7 +415,7 @@ public class JMSClientTest extends JMSClientTestSupport { LOG.info("Caught exception on receiveNoWait: {}", ex); } } - + }; synchronized (consumer) { new Thread(t).start(); @@ -433,6 +434,7 @@ public class JMSClientTest extends JMSClientTestSupport { final MessageConsumer consumer=session.createConsumer(queue); Testable t = new Testable() { + @Override public synchronized void run() { try { for (int i = 0; i < 10; ++i) { @@ -464,8 +466,9 @@ public class JMSClientTest extends JMSClientTestSupport { connection.start(); final MessageConsumer consumer=session.createConsumer(queue); - + Testable t = new Testable() { + @Override public synchronized void run() { try { Message m = consumer.receive(1); @@ -747,7 +750,7 @@ public class JMSClientTest extends JMSClientTestSupport { s.createTemporaryQueue().delete(); stopBroker(); - + assertTrue("No exception listener event fired.", called.await(15, TimeUnit.SECONDS)); }
