Repository: activemq Updated Branches: refs/heads/trunk bae0e60a7 -> dc25f2a8f
https://issues.apache.org/jira/browse/AMQ-5541 - support preemptive redelivery for non persistent case, fix and test Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/dc25f2a8 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/dc25f2a8 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/dc25f2a8 Branch: refs/heads/trunk Commit: dc25f2a8f9d4e54c2349946b1337eb4b72890e07 Parents: bae0e60 Author: gtully <[email protected]> Authored: Tue Jan 27 16:43:35 2015 +0000 Committer: gtully <[email protected]> Committed: Tue Jan 27 16:43:35 2015 +0000 ---------------------------------------------------------------------- .../activemq/broker/region/RegionBroker.java | 6 +- .../RedeliveryRestartWithExceptionTest.java | 59 +++++++++++++++++++- 2 files changed, 60 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/dc25f2a8/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 2943c98..3acc135 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -623,11 +623,13 @@ public class RegionBroker extends EmptyBroker { long totalTime = endTime - message.getBrokerInTime(); ((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime); } - if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered() && !message.isRedelivered() && message.isPersistent()) { + if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered() && !message.isRedelivered()) { final int originalValue = message.getRedeliveryCounter(); message.incrementRedeliveryCounter(); try { - ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message); + if (message.isPersistent()) { + ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message); + } messageDispatch.setTransmitCallback(new TransmitCallback() { // dispatch is considered a delivery, so update sub state post dispatch otherwise // on a disconnect/reconnect cached messages will not reflect initial delivery attempt http://git-wip-us.apache.org/repos/asf/activemq/blob/dc25f2a8/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java index eae86d6..5d8b62e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; import java.util.Set; import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; @@ -44,6 +45,7 @@ import org.apache.activemq.store.ProxyTopicMessageStore; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TransactionStore; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.transport.tcp.TcpTransport; import org.apache.activemq.usage.SystemUsage; import org.junit.After; import org.junit.Before; @@ -97,7 +99,7 @@ public class RedeliveryRestartWithExceptionTest extends TestSupport { Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Destination destination = session.createQueue(queueName); - populateDestination(10, destination, connection); + populateDestination(10, destination, connection, true); TextMessage msg = null; MessageConsumer consumer = session.createConsumer(destination); Exception expectedException = null; @@ -165,7 +167,7 @@ public class RedeliveryRestartWithExceptionTest extends TestSupport { Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Destination destination = session.createQueue(queueName); - populateDestination(10, destination, connection); + populateDestination(10, destination, connection, true); TextMessage msg = null; MessageConsumer consumer = session.createConsumer(destination); Exception expectedException = null; @@ -218,6 +220,56 @@ public class RedeliveryRestartWithExceptionTest extends TestSupport { connection.close(); } + @org.junit.Test + public void testValidateRedeliveryFlagOnNonPersistentAfterTransientFailureConnectionDrop() throws Exception { + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() + + "?jms.prefetchPolicy.all=0"); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Destination destination = session.createQueue(queueName); + populateDestination(10, destination, connection, false); + TextMessage msg = null; + MessageConsumer consumer = session.createConsumer(destination); + for (int i = 0; i < 5; i++) { + msg = (TextMessage) consumer.receive(5000); + assertNotNull("got the message", msg); + assertFalse("not redelivered", msg.getJMSRedelivered()); + } + + connection.getTransport().narrow(TcpTransport.class).getTransportListener().onException(new IOException("Die")); + + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + destination = session.createQueue(queueName); + consumer = session.createConsumer(destination); + + // consume the messages that were previously delivered + for (int i = 0; i < 5; i++) { + msg = (TextMessage) consumer.receive(4000); + LOG.info("redelivered? got: " + msg); + assertNotNull("got the message again", msg); + assertEquals("redelivery flag set on:" + i, true, msg.getJMSRedelivered()); + assertTrue("redelivery count survives reconnect for:" + i, msg.getLongProperty("JMSXDeliveryCount") > 1); + msg.acknowledge(); + } + + // consume the rest that were not redeliveries + for (int i = 0; i < 5; i++) { + msg = (TextMessage) consumer.receive(4000); + LOG.info("not redelivered? got: " + msg); + assertNotNull("got the message", msg); + assertEquals("not a redelivery", false, msg.getJMSRedelivered()); + assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount")); + msg.acknowledge(); + } + connection.close(); + } + private void restartBroker() throws Exception { broker.stop(); broker.waitUntilStopped(); @@ -231,9 +283,10 @@ public class RedeliveryRestartWithExceptionTest extends TestSupport { return broker; } - private void populateDestination(final int nbMessages, final Destination destination, javax.jms.Connection connection) throws JMSException { + private void populateDestination(final int nbMessages, final Destination destination, javax.jms.Connection connection, boolean persistent) throws JMSException { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); for (int i = 1; i <= nbMessages; i++) { producer.send(session.createTextMessage("<hello id='" + i + "'/>")); }
