Repository: activemq Updated Branches: refs/heads/trunk 266d23ef7 -> 75eb814ca
https://issues.apache.org/jira/browse/AMQ-5068 - don't rewrite durable subs as the message instance is shared Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/75eb814c Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/75eb814c Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/75eb814c Branch: refs/heads/trunk Commit: 75eb814ca7f05a78129628606488d47d89f7cf85 Parents: 266d23e Author: gtully <[email protected]> Authored: Wed Mar 26 11:50:13 2014 +0000 Committer: gtully <[email protected]> Committed: Wed Mar 26 11:50:13 2014 +0000 ---------------------------------------------------------------------- .../broker/region/policy/PolicyEntry.java | 3 +- .../activemq/broker/RedeliveryRestartTest.java | 57 +++++++++++++++++--- 2 files changed, 51 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/75eb814c/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 624d490..f7ae6c1 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -128,6 +128,7 @@ public class PolicyEntry extends DestinationMapEntry { queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts()); queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts()); queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault()); + queue.setPersistJMSRedelivered(isPersistJMSRedelivered()); } public void update(Queue queue) { @@ -142,6 +143,7 @@ public class PolicyEntry extends DestinationMapEntry { queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts()); queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts()); queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault()); + queue.setPersistJMSRedelivered(isPersistJMSRedelivered()); } public void configure(Broker broker,Topic topic) { @@ -197,7 +199,6 @@ public class PolicyEntry extends DestinationMapEntry { destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers()); destination.setAdvisoryWhenFull(isAdvisoryWhenFull()); destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers()); - destination.setPersistJMSRedelivered(isPersistJMSRedelivered()); } public void baseConfiguration(Broker broker, BaseDestination destination) { http://git-wip-us.apache.org/repos/asf/activemq/blob/75eb814c/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java index 8eba729..032934b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java @@ -24,11 +24,13 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.TestSupport; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.transport.failover.FailoverTransport; import org.junit.After; import org.junit.Before; @@ -91,10 +93,9 @@ public class RedeliveryRestartTest extends TestSupport { connection = (ActiveMQConnection) connectionFactory.createConnection(); connection.start(); - populateDestination(10, queueName, connection); - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Destination destination = session.createQueue(queueName); + populateDestination(10, destination, connection); MessageConsumer consumer = session.createConsumer(destination); TextMessage msg = null; @@ -136,17 +137,59 @@ public class RedeliveryRestartTest extends TestSupport { } @org.junit.Test - public void testValidateRedeliveryFlagAfterRestart() throws Exception { + public void testDurableSubRedeliveryFlagAfterRestartNotSupported() throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getPublishableConnectString() + ")?jms.prefetchPolicy.all=0"); connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.setClientID("id"); connection.start(); - populateDestination(10, queueName, connection); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + ActiveMQTopic destination = new ActiveMQTopic(queueName); + + TopicSubscriber durableSub = session.createDurableSubscriber(destination, "id"); + + populateDestination(10, destination, connection); + + TextMessage msg = null; + for (int i = 0; i < 5; i++) { + msg = (TextMessage) durableSub.receive(20000); + LOG.info("not redelivered? got: " + msg); + assertNotNull("got the message", msg); + assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount")); + assertEquals("not a redelivery", false, msg.getJMSRedelivered()); + } + durableSub.close(); + + restartBroker(); + + // make failover aware of the restarted auto assigned port + connection.getTransport().narrow(FailoverTransport.class).add(true, broker.getTransportConnectors().get(0) + .getPublishableConnectString()); + + durableSub = session.createDurableSubscriber(destination, "id"); + for (int i = 0; i < 10; i++) { + msg = (TextMessage) durableSub.receive(4000); + LOG.info("redelivered? got: " + msg); + assertNotNull("got the message again", msg); + assertEquals("no reDelivery flag", false, msg.getJMSRedelivered()); + msg.acknowledge(); + } + connection.close(); + } + + @org.junit.Test + public void testValidateRedeliveryFlagAfterRestart() throws Exception { + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getPublishableConnectString() + + ")?jms.prefetchPolicy.all=0"); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Destination destination = session.createQueue(queueName); + populateDestination(10, destination, connection); MessageConsumer consumer = session.createConsumer(destination); TextMessage msg = null; @@ -196,10 +239,9 @@ public class RedeliveryRestartTest extends TestSupport { connection = (ActiveMQConnection) connectionFactory.createConnection(); connection.start(); - populateDestination(1, queueName, connection); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Destination destination = session.createQueue(queueName); + populateDestination(1, destination, connection); MessageConsumer consumer = session.createConsumer(destination); TextMessage msg = (TextMessage) consumer.receive(5000); @@ -243,9 +285,8 @@ public class RedeliveryRestartTest extends TestSupport { return broker; } - private void populateDestination(final int nbMessages, final String destinationName, javax.jms.Connection connection) throws JMSException { + private void populateDestination(final int nbMessages, final Destination destination, javax.jms.Connection connection) throws JMSException { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createQueue(destinationName); MessageProducer producer = session.createProducer(destination); for (int i = 1; i <= nbMessages; i++) { producer.send(session.createTextMessage("<hello id='" + i + "'/>"));
