Repository: activemq Updated Branches: refs/heads/master b6fea8312 -> 2c53dbcc6
https://issues.apache.org/jira/browse/AMQ-5864 - fix and test. A replayed update command did not check if already updated Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2c53dbcc Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2c53dbcc Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2c53dbcc Branch: refs/heads/master Commit: 2c53dbcc634920b42ab19a02a183219e58f1ecac Parents: b6fea83 Author: gtully <[email protected]> Authored: Thu Jun 25 15:53:45 2015 +0100 Committer: gtully <[email protected]> Committed: Thu Jun 25 15:53:45 2015 +0100 ---------------------------------------------------------------------- .../activemq/store/kahadb/MessageDatabase.java | 11 +- .../activemq/broker/RedeliveryRecoveryTest.java | 139 +++++++++++++++++++ 2 files changed, 147 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/2c53dbcc/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index e86bed0..0ff203c 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1320,8 +1320,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } metadata.lastUpdate = location; } else { - // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt - LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId()); + + MessageKeys messageKeys = sd.orderIndex.get(tx, previous); + if (messageKeys != null && messageKeys.location.compareTo(location) < 0) { + // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt + LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId()); + } sd.messageIdIndex.put(tx, command.getMessageId(), previous); sd.locationIndex.remove(tx, location); id = -1; @@ -1365,7 +1369,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe new MessageKeys(command.getMessageId(), location) ); sd.locationIndex.put(tx, location, id); - if(previousKeys != null) { + // on first update previous is original location, on recovery/replay it may be the updated location + if(previousKeys != null && !previousKeys.location.equals(location)) { sd.locationIndex.remove(tx, previousKeys.location); } metadata.lastUpdate = location; http://git-wip-us.apache.org/repos/asf/activemq/blob/2c53dbcc/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRecoveryTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRecoveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRecoveryTest.java new file mode 100644 index 0000000..7a59c06 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRecoveryTest.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.junit.After; +import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class RedeliveryRecoveryTest { + + static final Logger LOG = LoggerFactory.getLogger(RedeliveryRecoveryTest.class); + ActiveMQConnection connection; + BrokerService broker = null; + String queueName = "redeliveryRecoveryQ"; + + @Before + public void setUp() throws Exception { + broker = new BrokerService(); + configureBroker(broker); + broker.setDeleteAllMessagesOnStartup(true); + broker.start(); + } + + @After + public void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + broker.stop(); + } + + protected void configureBroker(BrokerService broker) throws Exception { + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policy = new PolicyEntry(); + policy.setPersistJMSRedelivered(true); + policyMap.setDefaultEntry(policy); + broker.setDestinationPolicy(policyMap); + KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + kahaDBPersistenceAdapter.setForceRecoverIndex(true); + broker.addConnector("tcp://0.0.0.0:0"); + } + + @org.junit.Test + public void testValidateRedeliveryFlagAfterRestart() throws Exception { + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() + + "?jms.prefetchPolicy.all=0"); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Destination destination = session.createQueue(queueName); + populateDestination(1, destination, connection); + MessageConsumer consumer = session.createConsumer(destination); + Message msg = consumer.receive(5000); + LOG.info("got: " + msg); + assertNotNull("got the message", msg); + assertFalse("got the message", msg.getJMSRedelivered()); + consumer.close(); + connection.close(); + + restartBroker(); + + connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() + + "?jms.prefetchPolicy.all=0"); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + destination = session.createQueue(queueName); + consumer = session.createConsumer(destination); + + msg = consumer.receive(5000); + LOG.info("got: " + msg); + assertNotNull("got the message", msg); + assertTrue("got the message has redelivered flag", msg.getJMSRedelivered()); + + connection.close(); + } + + + private void restartBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + broker = createRestartedBroker(); + broker.start(); + } + + private BrokerService createRestartedBroker() throws Exception { + broker = new BrokerService(); + configureBroker(broker); + return broker; + } + + private void populateDestination(final int nbMessages, final Destination destination, javax.jms.Connection connection) throws JMSException { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + for (int i = 1; i <= nbMessages; i++) { + producer.send(session.createTextMessage("<hello id='" + i + "'/>")); + } + producer.close(); + session.close(); + } + +} \ No newline at end of file
