This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git

commit 24b5944ecbf315866ce227862fb2d264f5b1654f
Author: Christopher L. Shannon (cshannon) <[email protected]>
AuthorDate: Wed Jan 9 12:51:03 2019 -0500

    AMQ-7129 - Properly recover messages from KahaDB for a durable when there 
are
    messages to recover before the stored lastAck value
    
    With individual ack mode we need to check the durable ackPosition
    sequence set in the KahaDB index on subsription load to see if there are
    earlier messages before the lastAck value that still haven't been acked.
    While this normally wouldn't happen it is possible in individual ack
    mode
    
    (cherry picked from commit 25de20c77ec0bf6cdc699cac2ad50e34ec707453)
---
 .../apache/activemq/store/kahadb/KahaDBStore.java  |  37 ++-
 .../activemq/store/kahadb/MessageDatabase.java     |   9 +
 .../kahadb/KahaDBDurableMessageRecoveryTest.java   | 350 +++++++++++++++++++++
 3 files changed, 394 insertions(+), 2 deletions(-)

diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index c552d79..cbbf9b6 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -79,6 +79,7 @@ import 
org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
 import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
 import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
@@ -1001,7 +1002,17 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         LastAck cursorPos = getLastAck(tx, sd, 
subscriptionKey);
-                        sd.orderIndex.setBatch(tx, cursorPos);
+                        SequenceSet subAckPositions = getSequenceSet(tx, sd, 
subscriptionKey);
+                        //If we have ackPositions tracked then compare the 
first one as individual acknowledge mode
+                        //may have bumped lastAck even though there are 
earlier messages to still consume
+                        if (subAckPositions != null && 
!subAckPositions.isEmpty()
+                                && subAckPositions.getHead().getFirst() < 
cursorPos.lastAckedSequence) {
+                            //we have messages to ack before lastAckedSequence
+                            sd.orderIndex.setBatch(tx, 
subAckPositions.getHead().getFirst() - 1);
+                        } else {
+                            subAckPositions = null;
+                            sd.orderIndex.setBatch(tx, cursorPos);
+                        }
                         recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, 
listener);
                         for (Iterator<Entry<Long, MessageKeys>> iterator = 
sd.orderIndex.iterator(tx); iterator
                                 .hasNext();) {
@@ -1009,6 +1020,11 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
                             if 
(ackedAndPrepared.contains(entry.getValue().messageId)) {
                                 continue;
                             }
+                            //If subAckPositions is set then verify the 
sequence set contains the message still
+                            //and if it doesn't skip it
+                            if (subAckPositions != null && 
!subAckPositions.contains(entry.getKey())) {
+                                continue;
+                            }
                             
listener.recoverMessage(loadMessage(entry.getValue().location));
                         }
                         sd.orderIndex.resetCursorPosition();
@@ -1033,13 +1049,24 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
                         StoredDestination sd = getStoredDestination(dest, tx);
                         sd.orderIndex.resetCursorPosition();
                         MessageOrderCursor moc = 
sd.subscriptionCursors.get(subscriptionKey);
+                        SequenceSet subAckPositions = null;
                         if (moc == null) {
                             LastAck pos = getLastAck(tx, sd, subscriptionKey);
                             if (pos == null) {
                                 // sub deleted
                                 return;
                             }
-                            sd.orderIndex.setBatch(tx, pos);
+                            subAckPositions = getSequenceSet(tx, sd, 
subscriptionKey);
+                            //If we have ackPositions tracked then compare the 
first one as individual acknowledge mode
+                            //may have bumped lastAck even though there are 
earlier messages to still consume
+                            if (subAckPositions != null && 
!subAckPositions.isEmpty()
+                                    && subAckPositions.getHead().getFirst() < 
pos.lastAckedSequence) {
+                                //we have messages to ack before 
lastAckedSequence
+                                sd.orderIndex.setBatch(tx, 
subAckPositions.getHead().getFirst() - 1);
+                            } else {
+                                subAckPositions = null;
+                                sd.orderIndex.setBatch(tx, pos);
+                            }
                             moc = sd.orderIndex.cursor;
                         } else {
                             sd.orderIndex.cursor.sync(moc);
@@ -1053,6 +1080,11 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
                             if 
(ackedAndPrepared.contains(entry.getValue().messageId)) {
                                 continue;
                             }
+                            //If subAckPositions is set then verify the 
sequence set contains the message still
+                            //and if it doesn't skip it
+                            if (subAckPositions != null && 
!subAckPositions.contains(entry.getKey())) {
+                                continue;
+                            }
                             if 
(listener.recoverMessage(loadMessage(entry.getValue().location))) {
                                 counter++;
                             }
@@ -1451,6 +1483,7 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
                 super(runnable, null);
             }
 
+            @Override
             public void setException(final Throwable e) {
                 super.setException(e);
             }
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 5fb1e90..8bb902d 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -2981,6 +2981,15 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
         return sd.subscriptionAcks.get(tx, subscriptionKey);
     }
 
+    protected SequenceSet getSequenceSet(Transaction tx, StoredDestination sd, 
String subscriptionKey) throws IOException {
+        if (sd.ackPositions != null) {
+            final SequenceSet messageSequences = sd.ackPositions.get(tx, 
subscriptionKey);
+            return messageSequences;
+        }
+
+        return null;
+    }
+
     protected long getStoredMessageCount(Transaction tx, StoredDestination sd, 
String subscriptionKey) throws IOException {
         if (sd.ackPositions != null) {
             SequenceSet messageSequences = sd.ackPositions.get(tx, 
subscriptionKey);
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
new file mode 100644
index 0000000..66890a9
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class KahaDBDurableMessageRecoveryTest {
+
+    @Parameters(name = "recoverIndex")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] { { false }, { true } });
+    }
+
+    @Rule
+    public TemporaryFolder dataFileDir = new TemporaryFolder(new 
File("target"));
+    private BrokerService broker;
+    private URI brokerConnectURI;
+
+    private boolean recoverIndex;
+
+    @Before
+    public void setUpBroker() throws Exception {
+        startBroker(false);
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    /**
+     * @param deleteIndex
+     */
+    public KahaDBDurableMessageRecoveryTest(boolean recoverIndex) {
+        super();
+        this.recoverIndex = recoverIndex;
+    }
+
+    protected void startBroker(boolean recoverIndex) throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(true);
+        broker.setDataDirectoryFile(dataFileDir.getRoot());
+
+        TransportConnector connector = broker.addConnector(new 
TransportConnector());
+        connector.setUri(new URI("tcp://0.0.0.0:0"));
+        connector.setName("tcp");
+        configurePersistence(broker, recoverIndex);
+
+        broker.start();
+        broker.waitUntilStarted();
+        brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri();
+    }
+
+    protected void configurePersistence(BrokerService brokerService, boolean 
forceRecoverIndex) throws Exception {
+        KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) 
brokerService.getPersistenceAdapter();
+
+        adapter.setForceRecoverIndex(forceRecoverIndex);
+
+        // set smaller size for test
+        adapter.setJournalMaxFileLength(1024 * 20);
+    }
+
+    protected void restartBroker(boolean deleteIndex) throws Exception {
+        stopBroker();
+        startBroker(deleteIndex);
+    }
+
+    protected Session getSession(int ackMode) throws Exception {
+        Connection connection = new 
ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId1");
+        connection.start();
+        Session session = connection.createSession(false, ackMode);
+
+        return session;
+    }
+
+    /**
+     * Test that on broker restart a durable topic subscription will recover 
all
+     * messages before the "last ack" in KahaDB which could happen if using
+     * individual acknowledge mode and skipping messages
+     */
+    @Test
+    public void durableRecoveryIndividualAcknowledge() throws Exception {
+        String testTopic = "test.topic";
+
+        Session session = getSession(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+        ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(testTopic);
+        MessageProducer producer = session.createProducer(topic);
+        TopicSubscriber subscriber = session.createDurableSubscriber(topic, 
"sub1");
+
+        for (int i = 1; i <= 10; i++) {
+            producer.send(session.createTextMessage("msg: " + i));
+        }
+        producer.close();
+        assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, 
"clientId1", "sub1"), 3000, 500));
+
+        // Receive only the 5th message using individual ack mode
+        for (int i = 1; i <= 10; i++) {
+            TextMessage received = (TextMessage) subscriber.receive(1000);
+            assertNotNull(received);
+            if (i == 5) {
+                received.acknowledge();
+            }
+        }
+
+        // Verify there are 9 messages left still and restart broker
+        assertTrue(Wait.waitFor(() -> 9 == getPendingMessageCount(topic, 
"clientId1", "sub1"), 3000, 500));
+        subscriber.close();
+        restartBroker(recoverIndex);
+
+        // Verify 9 messages exist in store on startup
+        assertTrue(Wait.waitFor(() -> 9 == getPendingMessageCount(topic, 
"clientId1", "sub1"), 3000, 500));
+
+        // Recreate subscriber and try and receive the other 9 messages
+        session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE);
+        subscriber = session.createDurableSubscriber(topic, "sub1");
+
+        for (int i = 1; i <= 4; i++) {
+            TextMessage received = (TextMessage) subscriber.receive(1000);
+            assertNotNull(received);
+            assertEquals("msg: " + i, received.getText());
+        }
+        for (int i = 6; i <= 10; i++) {
+            TextMessage received = (TextMessage) subscriber.receive(1000);
+            assertNotNull(received);
+            assertEquals("msg: " + i, received.getText());
+        }
+
+        subscriber.close();
+        assertTrue(Wait.waitFor(() -> 0 == getPendingMessageCount(topic, 
"clientId1", "sub1"), 3000, 500));
+    }
+
+    @Test
+    public void multipleDurableRecoveryIndividualAcknowledge() throws 
Exception {
+        String testTopic = "test.topic";
+
+        Session session = getSession(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+        ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(testTopic);
+        MessageProducer producer = session.createProducer(topic);
+        TopicSubscriber subscriber1 = session.createDurableSubscriber(topic, 
"sub1");
+        TopicSubscriber subscriber2 = session.createDurableSubscriber(topic, 
"sub2");
+
+        for (int i = 1; i <= 10; i++) {
+            producer.send(session.createTextMessage("msg: " + i));
+        }
+        producer.close();
+        assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, 
"clientId1", "sub1"), 3000, 500));
+        assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, 
"clientId1", "sub2"), 3000, 500));
+
+        // Receive 2 messages using individual ack mode only on first sub
+        for (int i = 1; i <= 10; i++) {
+            TextMessage received = (TextMessage) subscriber1.receive(1000);
+            assertNotNull(received);
+            if (i == 3 || i == 7) {
+                received.acknowledge();
+            }
+        }
+
+        // Verify there are 8 messages left still and restart broker
+        assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, 
"clientId1", "sub1"), 3000, 500));
+        assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, 
"clientId1", "sub2"), 3000, 500));
+        subscriber1.close();
+        subscriber2.close();
+        restartBroker(recoverIndex);
+
+        // Verify 8 messages exist in store on startup on sub 1 and 10 on sub 2
+        assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, 
"clientId1", "sub1"), 3000, 500));
+        assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, 
"clientId1", "sub2"), 3000, 500));
+
+        // Recreate subscriber and try and receive the other 8 messages
+        session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE);
+        subscriber1 = session.createDurableSubscriber(topic, "sub1");
+        subscriber2 = session.createDurableSubscriber(topic, "sub2");
+
+        for (int i = 1; i <= 2; i++) {
+            TextMessage received = (TextMessage) subscriber1.receive(1000);
+            assertNotNull(received);
+            assertEquals("msg: " + i, received.getText());
+        }
+        for (int i = 4; i <= 6; i++) {
+            TextMessage received = (TextMessage) subscriber1.receive(1000);
+            assertNotNull(received);
+            assertEquals("msg: " + i, received.getText());
+        }
+        for (int i = 8; i <= 10; i++) {
+            TextMessage received = (TextMessage) subscriber1.receive(1000);
+            assertNotNull(received);
+            assertEquals("msg: " + i, received.getText());
+        }
+
+        // Make sure sub 2 gets all 10
+        for (int i = 1; i <= 10; i++) {
+            TextMessage received = (TextMessage) subscriber2.receive(1000);
+            assertNotNull(received);
+            assertEquals("msg: " + i, received.getText());
+        }
+
+        subscriber1.close();
+        subscriber2.close();
+        assertTrue(Wait.waitFor(() -> 0 == getPendingMessageCount(topic, 
"clientId1", "sub1"), 3000, 500));
+        assertTrue(Wait.waitFor(() -> 0 == getPendingMessageCount(topic, 
"clientId1", "sub2"), 3000, 500));
+    }
+
+    @Test
+    public void multipleDurableTestRecoverSubscription() throws Exception {
+        String testTopic = "test.topic";
+
+        Session session = getSession(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+        ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(testTopic);
+        MessageProducer producer = session.createProducer(topic);
+        TopicSubscriber subscriber1 = session.createDurableSubscriber(topic, 
"sub1");
+        TopicSubscriber subscriber2 = session.createDurableSubscriber(topic, 
"sub2");
+
+        for (int i = 1; i <= 10; i++) {
+            producer.send(session.createTextMessage("msg: " + i));
+        }
+        producer.close();
+
+        // Receive 2 messages using individual ack mode only on first sub
+        for (int i = 1; i <= 10; i++) {
+            TextMessage received = (TextMessage) subscriber1.receive(1000);
+            assertNotNull(received);
+            if (i == 3 || i == 7) {
+                received.acknowledge();
+            }
+        }
+
+        // Verify there are 8 messages left on sub 1 and 10 on sub2 and restart
+        assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, 
"clientId1", "sub1"), 3000, 500));
+        assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, 
"clientId1", "sub2"), 3000, 500));
+        subscriber1.close();
+        subscriber2.close();
+        restartBroker(recoverIndex);
+
+        //Manually recover subscription and verify proper messages are loaded
+        final Topic brokerTopic = (Topic) broker.getDestination(topic);
+        final TopicMessageStore store = (TopicMessageStore) 
brokerTopic.getMessageStore();
+        final AtomicInteger sub1Recovered = new AtomicInteger();
+        final AtomicInteger sub2Recovered = new AtomicInteger();
+        store.recoverSubscription("clientId1", "sub1", new 
MessageRecoveryListener() {
+            @Override
+            public boolean recoverMessageReference(MessageId ref) throws 
Exception {
+                return false;
+            }
+
+            @Override
+            public boolean recoverMessage(Message message) throws Exception {
+                TextMessage textMessage = (TextMessage) message;
+                if (textMessage.getText().equals("msg: " + 3) || 
textMessage.getText().equals("msg: " + 7)) {
+                    throw new IllegalStateException("Got wrong message: " + 
textMessage.getText());
+                }
+                sub1Recovered.incrementAndGet();
+                return true;
+            }
+
+            @Override
+            public boolean isDuplicate(MessageId ref) {
+                return false;
+            }
+
+            @Override
+            public boolean hasSpace() {
+                return true;
+            }
+        });
+
+        store.recoverSubscription("clientId1", "sub2", new 
MessageRecoveryListener() {
+            @Override
+            public boolean recoverMessageReference(MessageId ref) throws 
Exception {
+                return false;
+            }
+
+            @Override
+            public boolean recoverMessage(Message message) throws Exception {
+                sub2Recovered.incrementAndGet();
+                return true;
+            }
+
+            @Override
+            public boolean isDuplicate(MessageId ref) {
+                return false;
+            }
+
+            @Override
+            public boolean hasSpace() {
+                return true;
+            }
+        });
+
+        //Verify proper number of messages are recovered
+        assertEquals(8, sub1Recovered.get());
+        assertEquals(10, sub2Recovered.get());
+    }
+
+    protected long getPendingMessageCount(ActiveMQTopic topic, String 
clientId, String subId) throws Exception {
+        final Topic brokerTopic = (Topic) broker.getDestination(topic);
+        final TopicMessageStore store = (TopicMessageStore) 
brokerTopic.getMessageStore();
+        return store.getMessageCount(clientId, subId);
+    }
+}

Reply via email to