Repository: activemq Updated Branches: refs/heads/trunk 79ba2a79c -> 29f34f4da
AMQ-5107: Do not resend messages while the broker is shutting down. Includes junit. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/29f34f4d Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/29f34f4d Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/29f34f4d Branch: refs/heads/trunk Commit: 29f34f4dab68ceb3138a6194617fa8f13f4d3875 Parents: 79ba2a7 Author: artnaseef <a...@artnaseef.com> Authored: Thu Mar 27 08:07:59 2014 -0700 Committer: artnaseef <a...@artnaseef.com> Committed: Thu Mar 27 08:07:59 2014 -0700 ---------------------------------------------------------------------- .../apache/activemq/broker/region/Queue.java | 7 +- .../region/QueueResendDuringShutdownTest.java | 250 +++++++++++++++++++ 2 files changed, 256 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/29f34f4d/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 60c6228..b6af75c 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -558,6 +558,10 @@ public class Queue extends BaseDestination implements Task, UsageListener { } for (MessageReference ref : unAckedMessages) { + // AMQ-5107: don't resend if the broker is shutting down + if ( this.brokerService.isStopping() ) { + break; + } QueueMessageReference qmr = (QueueMessageReference) ref; if (qmr.getLockOwner() == sub) { qmr.unlock(); @@ -583,7 +587,8 @@ public class Queue extends BaseDestination implements Task, UsageListener { ((QueueBrowserSubscription)sub).decrementQueueRef(); browserDispatches.remove(sub); } - if (!redeliveredWaitingDispatch.isEmpty()) { + // AMQ-5107: don't resend if the broker is shutting down + if (!redeliveredWaitingDispatch.isEmpty() && (! this.brokerService.isStopping())) { doDispatch(new OrderedPendingList()); } } finally { http://git-wip-us.apache.org/repos/asf/activemq/blob/29f34f4d/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java new file mode 100644 index 0000000..0439fa8 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region; + +import java.io.File; + +import static org.junit.matchers.JUnitMatchers.containsString; +import static org.junit.Assert.*; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.broker.BrokerService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.junit.*; + +/** + * Confirm that the broker does not resend unacknowledged messages during a broker shutdown. + */ +public class QueueResendDuringShutdownTest { + private static final Logger LOG = LoggerFactory.getLogger(QueueResendDuringShutdownTest.class); + public static final int NUM_CONNECTION_TO_TEST = 8; + + private static boolean iterationFoundFailure = false; + + private BrokerService broker; + private ActiveMQConnectionFactory factory; + private Connection[] connections; + private Connection producerConnection; + private Queue queue; + + private Object messageReceiveSync = new Object(); + private int receiveCount; + + @Before + public void setUp () throws Exception { + this.receiveCount = 0; + + this.broker = new BrokerService(); + this.broker.setPersistent(false); + this.broker.start(); + this.broker.waitUntilStarted(); + + this.factory = new ActiveMQConnectionFactory(broker.getVmConnectorURI()); + this.queue = new ActiveMQQueue("TESTQUEUE"); + + connections = new Connection[NUM_CONNECTION_TO_TEST]; + int iter = 0; + while ( iter < NUM_CONNECTION_TO_TEST ) { + this.connections[iter] = factory.createConnection(); + iter++; + } + + this.producerConnection = factory.createConnection(); + this.producerConnection.start(); + } + + @After + public void cleanup () throws Exception { + for ( Connection oneConnection : connections ) { + if ( oneConnection != null ) { + closeConnection(oneConnection); + } + } + connections = null; + + if ( this.producerConnection != null ) { + closeConnection(this.producerConnection); + this.producerConnection = null; + } + + this.broker.stop(); + this.broker.waitUntilStopped(); + } + + @Test(timeout=3000) + public void testRedeliverAtBrokerShutdownAutoAckMsgListenerIter1 () throws Throwable { + runTestIteration(); + } + + @Test(timeout=3000) + public void testRedeliverAtBrokerShutdownAutoAckMsgListenerIter2 () throws Throwable { + runTestIteration(); + } + + @Test(timeout=3000) + public void testRedeliverAtBrokerShutdownAutoAckMsgListenerIter3 () throws Throwable { + runTestIteration(); + } + + /** + * Run one iteration of the test, skipping it if a failure was found on a prior iteration since a single failure is + * enough. Also keep track of the state of failure for the iteration. + */ + protected void runTestIteration () throws Throwable { + if ( iterationFoundFailure ) { + LOG.info("skipping test iteration; failure previously detected"); + return; + } try { + testRedeliverAtBrokerShutdownAutoAckMsgListener(); + } catch ( Throwable thrown ) { + iterationFoundFailure = true; + throw thrown; + } + } + + protected void testRedeliverAtBrokerShutdownAutoAckMsgListener () throws Exception { + // Start consumers on all of the connections + for ( Connection oneConnection : connections ) { + MessageConsumer consumer = startupConsumer(oneConnection, false, Session.AUTO_ACKNOWLEDGE); + configureMessageListener(consumer); + oneConnection.start(); + } + + // Send one message to the Queue and wait a short time for the dispatch to occur. + this.sendMessage(); + waitForMessage(1000); + + // Verify one consumer received it + assertEquals(1, this.receiveCount); + + // Shutdown the broker + this.broker.stop(); + this.broker.waitUntilStopped(); + delay(100, "give queue time flush"); + + // Verify still only one consumer received it + assertEquals(1, this.receiveCount); + } + + /** + * Start a consumer on the given connection using the session transaction and acknowledge settings given. + */ + protected MessageConsumer startupConsumer (Connection conn, boolean transInd, int ackMode) + throws JMSException { + Session sess; + MessageConsumer consumer; + + sess = conn.createSession(transInd, ackMode); + consumer = sess.createConsumer(queue); + + return consumer; + } + + /** + * Mark the receipt of a message from one of the consumers. + */ + protected void messageReceived () { + synchronized ( this ) { + this.receiveCount++; + synchronized ( this.messageReceiveSync ) { + this.messageReceiveSync.notifyAll(); + } + } + } + + /** + * Setup the MessageListener for the given consumer. The listener uses a long delay on receiving the message to + * simulate the reported case of problems at shutdown caused by a message listener's connection closing while it is + * still processing. + */ + protected void configureMessageListener (MessageConsumer consumer) throws JMSException { + final MessageConsumer fConsumer = consumer; + + consumer.setMessageListener(new MessageListener() { + public void onMessage (Message msg) { + LOG.debug("got a message on consumer {}", fConsumer); + messageReceived(); + + // Delay long enough for the consumer to get closed while this delay is active. + delay(3000, "pause so connection shutdown leads to unacked message redelivery"); + } + }); + } + + /** + * Send a test message now. + */ + protected void sendMessage () throws JMSException { + Session sess = this.producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer prod = sess.createProducer(queue); + prod.send(sess.createTextMessage("X-TEST-MSG-X")); + prod.close(); + sess.close(); + } + + /** + * Close the given connection safely and log any exception caught. + */ + protected void closeConnection (Connection conn) { + try { + conn.close(); + } catch ( JMSException jmsExc ) { + LOG.info("failed to cleanup connection", jmsExc); + } + } + + /** + * Pause for the given length of time, in milliseconds, logging an interruption if one occurs. Don't try to + * recover from interrupt - the test case does not support interrupting and such an occurrence likely means the + * test is being aborted. + */ + protected void delay (long delayMs, String desc) { + try { + Thread.sleep(delayMs); + } catch ( InterruptedException intExc ) { + LOG.warn("sleep interrupted: " + desc, intExc); + } + } + + /** + * Wait up to the specified duration for a message to be received by any consumer. + */ + protected void waitForMessage (long delayMs) { + try { + synchronized ( this.messageReceiveSync ) { + if ( this.receiveCount == 0 ) { + this.messageReceiveSync.wait(delayMs); + } + } + } catch ( InterruptedException intExc ) { + LOG.warn("sleep interrupted: wait for message to arrive"); + } + } +}