Repository: activemq
Updated Branches:
  refs/heads/master b6fea8312 -> 2c53dbcc6


https://issues.apache.org/jira/browse/AMQ-5864 - fix and test. A replayed 
update command did not check if already updated


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2c53dbcc
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2c53dbcc
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2c53dbcc

Branch: refs/heads/master
Commit: 2c53dbcc634920b42ab19a02a183219e58f1ecac
Parents: b6fea83
Author: gtully <[email protected]>
Authored: Thu Jun 25 15:53:45 2015 +0100
Committer: gtully <[email protected]>
Committed: Thu Jun 25 15:53:45 2015 +0100

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |  11 +-
 .../activemq/broker/RedeliveryRecoveryTest.java | 139 +++++++++++++++++++
 2 files changed, 147 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2c53dbcc/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index e86bed0..0ff203c 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1320,8 +1320,12 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
                 }
                 metadata.lastUpdate = location;
             } else {
-                // If the message ID is indexed, then the broker asked us to 
store a duplicate before the message was dispatched and acked, we ignore this 
add attempt
-                LOG.warn("Duplicate message add attempt rejected. Destination: 
{}://{}, Message id: {}", command.getDestination().getType(), 
command.getDestination().getName(), command.getMessageId());
+
+                MessageKeys messageKeys = sd.orderIndex.get(tx, previous);
+                if (messageKeys != null && 
messageKeys.location.compareTo(location) < 0) {
+                    // If the message ID is indexed, then the broker asked us 
to store a duplicate before the message was dispatched and acked, we ignore 
this add attempt
+                    LOG.warn("Duplicate message add attempt rejected. 
Destination: {}://{}, Message id: {}", command.getDestination().getType(), 
command.getDestination().getName(), command.getMessageId());
+                }
                 sd.messageIdIndex.put(tx, command.getMessageId(), previous);
                 sd.locationIndex.remove(tx, location);
                 id = -1;
@@ -1365,7 +1369,8 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
                     new MessageKeys(command.getMessageId(), location)
             );
             sd.locationIndex.put(tx, location, id);
-            if(previousKeys != null) {
+            // on first update previous is original location, on 
recovery/replay it may be the updated location
+            if(previousKeys != null && 
!previousKeys.location.equals(location)) {
                 sd.locationIndex.remove(tx, previousKeys.location);
             }
             metadata.lastUpdate = location;

http://git-wip-us.apache.org/repos/asf/activemq/blob/2c53dbcc/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRecoveryTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRecoveryTest.java
new file mode 100644
index 0000000..7a59c06
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRecoveryTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class RedeliveryRecoveryTest {
+
+    static final Logger LOG = 
LoggerFactory.getLogger(RedeliveryRecoveryTest.class);
+    ActiveMQConnection connection;
+    BrokerService broker = null;
+    String queueName = "redeliveryRecoveryQ";
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        configureBroker(broker);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.start();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        broker.stop();
+    }
+
+    protected void configureBroker(BrokerService broker) throws Exception {
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policy = new PolicyEntry();
+        policy.setPersistJMSRedelivered(true);
+        policyMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(policyMap);
+        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = 
(KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+        kahaDBPersistenceAdapter.setForceRecoverIndex(true);
+        broker.addConnector("tcp://0.0.0.0:0");
+    }
+
+    @org.junit.Test
+    public void testValidateRedeliveryFlagAfterRestart() throws Exception {
+
+        ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()
+                + "?jms.prefetchPolicy.all=0");
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+        Destination destination = session.createQueue(queueName);
+        populateDestination(1, destination, connection);
+        MessageConsumer consumer = session.createConsumer(destination);
+        Message msg = consumer.receive(5000);
+        LOG.info("got: " + msg);
+        assertNotNull("got the message", msg);
+        assertFalse("got the message", msg.getJMSRedelivered());
+        consumer.close();
+        connection.close();
+
+        restartBroker();
+
+        connectionFactory = new 
ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()
+                + "?jms.prefetchPolicy.all=0");
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        destination = session.createQueue(queueName);
+        consumer = session.createConsumer(destination);
+
+        msg = consumer.receive(5000);
+        LOG.info("got: " + msg);
+        assertNotNull("got the message", msg);
+        assertTrue("got the message has redelivered flag", 
msg.getJMSRedelivered());
+
+        connection.close();
+    }
+
+
+    private void restartBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+        broker = createRestartedBroker();
+        broker.start();
+    }
+
+    private BrokerService createRestartedBroker() throws Exception {
+        broker = new BrokerService();
+        configureBroker(broker);
+        return broker;
+    }
+
+    private void populateDestination(final int nbMessages, final Destination 
destination, javax.jms.Connection connection) throws JMSException {
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        for (int i = 1; i <= nbMessages; i++) {
+            producer.send(session.createTextMessage("<hello id='" + i + 
"'/>"));
+        }
+        producer.close();
+        session.close();
+    }
+
+}
\ No newline at end of file

Reply via email to