Repository: activemq Updated Branches: refs/heads/trunk dcbac84a8 -> c6fe94ec0
https://issues.apache.org/jira/browse/AMQ-5070 - broker blocked on shutdown Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c6fe94ec Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c6fe94ec Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c6fe94ec Branch: refs/heads/trunk Commit: c6fe94ec0388f2acb169c9e3839ade5dc7c2c65f Parents: dcbac84 Author: Dejan Bosanac <[email protected]> Authored: Fri Feb 21 11:22:00 2014 +0100 Committer: Dejan Bosanac <[email protected]> Committed: Fri Feb 21 11:22:11 2014 +0100 ---------------------------------------------------------------------- .../activemq/broker/TransportConnection.java | 10 +- .../activemq/broker/TransportConnector.java | 8 +- .../xbean/ConnectorXBeanConfigTest.java | 114 +++++++++++++++++-- .../apache/activemq/xbean/connector-test.xml | 26 +++-- 4 files changed, 137 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/c6fe94ec/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index 972ffe2..65d044b 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -144,9 +144,13 @@ public class TransportConnection implements Connection, Task, CommandVisitor { throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString()); } Command command = (Command) o; - Response response = service(command); - if (response != null && !brokerService.isStopping() ) { - dispatchSync(response); + if (!brokerService.isStopping()) { + Response response = service(command); + if (response != null && !brokerService.isStopping()) { + dispatchSync(response); + } + } else { + throw new BrokerStoppedException("Broker " + brokerService + " is being stopped"); } } finally { serviceLock.readLock().unlock(); http://git-wip-us.apache.org/repos/asf/activemq/blob/c6fe94ec/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java index 582bc3f..5aa074b 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java @@ -216,8 +216,12 @@ public class TransportConnector implements Connector, BrokerServiceAware { @Override public void run() { try { - Connection connection = createConnection(transport); - connection.start(); + if (!brokerService.isStopping()) { + Connection connection = createConnection(transport); + connection.start(); + } else { + throw new BrokerStoppedException("Broker " + brokerService + " is being stopped"); + } } catch (Exception e) { String remoteHost = transport.getRemoteAddress(); ServiceSupport.dispose(transport); http://git-wip-us.apache.org/repos/asf/activemq/blob/c6fe94ec/activemq-unit-tests/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java index 4b328dd..ad96459 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java @@ -18,13 +18,9 @@ package org.apache.activemq.xbean; import java.net.URI; import java.util.List; +import java.util.concurrent.CountDownLatch; -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; +import javax.jms.*; import junit.framework.TestCase; @@ -36,6 +32,7 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.util.MessageIdList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,13 +83,116 @@ public class ConnectorXBeanConfigTest extends TestCase { Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); Destination dest = new ActiveMQQueue("test"); - MessageProducer producer = sess.createProducer(dest); MessageConsumer consumer = sess.createConsumer(dest); + MessageProducer producer = sess.createProducer(dest); producer.send(sess.createTextMessage("test")); TextMessage msg = (TextMessage)consumer.receive(1000); assertEquals("test", msg.getText()); } + + public void testBrokerWontStop() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?async=false"); + factory.setDispatchAsync(false); + factory.setAlwaysSessionAsync(false); + Connection conn = factory.createConnection(); + final Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + conn.start(); + final Destination dest = new ActiveMQQueue("TEST"); + final CountDownLatch stop = new CountDownLatch(1); + final CountDownLatch sendSecond = new CountDownLatch(1); + final CountDownLatch shutdown = new CountDownLatch(1); + final CountDownLatch test = new CountDownLatch(1); + + ActiveMQConnectionFactory testFactory = new ActiveMQConnectionFactory("vm://localhost?async=false"); + Connection testConn = testFactory.createConnection(); + testConn.start(); + Destination testDestination = sess.createQueue("NEW"); + Session testSess = testConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer testProducer = testSess.createProducer(testDestination); + + final Thread consumerThread = new Thread() { + @Override + public void run() { + try { + MessageProducer producer = sess.createProducer(dest); + producer.send(sess.createTextMessage("msg1")); + MessageConsumer consumer = sess.createConsumer(dest); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + try { + // send a message that will block + Thread.sleep(2000); + sendSecond.countDown(); + // try to stop the broker + Thread.sleep(5000); + stop.countDown(); + // run the test + Thread.sleep(5000); + test.countDown(); + shutdown.await(); + } catch (InterruptedException ie) { + } + } + }); + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + + consumerThread.start(); + + final Thread producerThread = new Thread() { + @Override + public void run() { + try { + sendSecond.await(); + MessageProducer producer = sess.createProducer(dest); + producer.send(sess.createTextMessage("msg2")); + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + + producerThread.start(); + + final Thread stopThread = new Thread() { + @Override + public void run() { + try { + stop.await(); + brokerService.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + + stopThread.start(); + + test.await(); + try { + testSess.createConsumer(testDestination); + fail("Should have failed creating a consumer!"); + } catch (Exception e) { + e.printStackTrace(); + } + + try { + testProducer.send(testSess.createTextMessage("msg3")); + fail("Should have failed sending a message!"); + } catch (Exception e) { + e.printStackTrace(); + } + + shutdown.countDown(); + + + } + @Override protected void setUp() throws Exception { brokerService = createBroker(); http://git-wip-us.apache.org/repos/asf/activemq/blob/c6fe94ec/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/connector-test.xml ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/connector-test.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/connector-test.xml index 99dd04f..5fb6403 100644 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/connector-test.xml +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/connector-test.xml @@ -27,19 +27,27 @@ <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> - <broker useJmx="true" xmlns="http://activemq.apache.org/schema/core"> + <broker useJmx="true" xmlns="http://activemq.apache.org/schema/core" persistent="false"> + <destinationPolicy> + <policyMap> + <policyEntries> + <policyEntry queue=">" producerFlowControl="true" memoryLimit="20mb" optimizedDispatch="true"> + </policyEntry> + </policyEntries> + </policyMap> + </destinationPolicy> <networkConnectors> <networkConnector uri="static://(tcp://localhost:61616)"> - <dynamicallyIncludedDestinations> - <queue physicalName="include.test.foo"/> - <topic physicalName="include.test.bar"/> - </dynamicallyIncludedDestinations> - <excludedDestinations> - <queue physicalName="exclude.test.foo"/> - <topic physicalName="exclude.test.bar"/> - </excludedDestinations> + <dynamicallyIncludedDestinations> + <queue physicalName="include.test.foo"/> + <topic physicalName="include.test.bar"/> + </dynamicallyIncludedDestinations> + <excludedDestinations> + <queue physicalName="exclude.test.foo"/> + <topic physicalName="exclude.test.bar"/> + </excludedDestinations> </networkConnector> </networkConnectors>
