This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git

commit ef0ec42885dc82bb31f266cc6cfe3c4065438453
Author: gtully <[email protected]>
AuthorDate: Mon Jun 10 15:31:23 2019 +0100

    AMQ-7225 - defer cleanup task operation till recovery processing complete, 
track prepared location in recovered ops to ensure they are retained on 
recovery failure. Fix and test
    
    (cherry picked from commit 93e726d6a7ba9ed44f5440369f8f9f1b41f49373)
---
 .../kahadb/MultiKahaDBPersistenceAdapter.java      |  10 +-
 .../store/kahadb/MultiKahaDBTransactionStore.java  |  23 ++-
 .../store/kahadb/disk/journal/Journal.java         |  11 +-
 .../activemq/bugs/MKahaDBTxRecoveryTest.java       | 224 +++++++++++++++++++++
 4 files changed, 262 insertions(+), 6 deletions(-)

diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
index 4bdb8de..56e5e92 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
@@ -56,7 +56,6 @@ import org.apache.activemq.store.TransactionIdTransformer;
 import org.apache.activemq.store.TransactionIdTransformerAware;
 import org.apache.activemq.store.TransactionStore;
 import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
-import org.apache.activemq.usage.StoreUsage;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IOHelper;
@@ -554,6 +553,15 @@ public class MultiKahaDBPersistenceAdapter extends 
LockableServiceSupport implem
         return transactionStore.getJournalMaxWriteBatchSize();
     }
 
+
+    public void setJournalCleanupInterval(long journalCleanupInterval) {
+        transactionStore.setJournalCleanupInterval(journalCleanupInterval);
+    }
+
+    public long getJournalCleanupInterval() {
+        return transactionStore.getJournalCleanupInterval();
+    }
+
     public List<PersistenceAdapter> getAdapters() {
         return Collections.unmodifiableList(adapters);
     }
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
index ff70076..5befa92 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
@@ -66,6 +66,8 @@ public class MultiKahaDBTransactionStore implements 
TransactionStore {
     private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
     private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
     private final AtomicBoolean started = new AtomicBoolean(false);
+    private final AtomicBoolean recovered = new AtomicBoolean(false);
+    private long journalCleanupInterval = Journal.DEFAULT_CLEANUP_INTERVAL;
 
     public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter 
multiKahaDBPersistenceAdapter) {
         this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter;
@@ -188,6 +190,14 @@ public class MultiKahaDBTransactionStore implements 
TransactionStore {
         this.journalWriteBatchSize = journalWriteBatchSize;
     }
 
+    public void setJournalCleanupInterval(long journalCleanupInterval) {
+        this.journalCleanupInterval = journalCleanupInterval;
+    }
+
+    public long getJournalCleanupInterval() {
+        return journalCleanupInterval;
+    }
+
     public class Tx {
         private final Set<TransactionStore> stores = new 
HashSet<TransactionStore>();
         private int prepareLocationId = 0;
@@ -308,14 +318,19 @@ public class MultiKahaDBTransactionStore implements 
TransactionStore {
             journal.setDirectory(getDirectory());
             journal.setMaxFileLength(journalMaxFileLength);
             journal.setWriteBatchSize(journalWriteBatchSize);
+            journal.setCleanupInterval(journalCleanupInterval);
             IOHelper.mkdirs(journal.getDirectory());
             journal.start();
             recoverPendingLocalTransactions();
+            recovered.set(true);
             store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
         }
     }
 
     private void txStoreCleanup() {
+        if (!recovered.get()) {
+            return;
+        }
         Set<Integer> knownDataFileIds = new 
TreeSet<Integer>(journal.getFileMap().keySet());
         for (Tx tx : inflightTransactions.values()) {
             knownDataFileIds.remove(tx.getPreparedLocationId());
@@ -342,7 +357,7 @@ public class MultiKahaDBTransactionStore implements 
TransactionStore {
     private void recoverPendingLocalTransactions() throws IOException {
         Location location = journal.getNextLocation(null);
         while (location != null) {
-            process(load(location));
+            process(location, load(location));
             location = journal.getNextLocation(location);
         }
         recoveredPendingCommit.addAll(inflightTransactions.keySet());
@@ -361,11 +376,11 @@ public class MultiKahaDBTransactionStore implements 
TransactionStore {
         return message;
     }
 
-    public void process(JournalCommand<?> command) throws IOException {
+    public void process(final Location location, JournalCommand<?> command) 
throws IOException {
         switch (command.type()) {
             case KAHA_PREPARE_COMMAND:
                 KahaPrepareCommand prepareCommand = (KahaPrepareCommand) 
command;
-                
getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo()));
+                
getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo())).trackPrepareLocation(location);
                 break;
             case KAHA_COMMIT_COMMAND:
                 KahaCommitCommand commitCommand = (KahaCommitCommand) command;
@@ -405,7 +420,7 @@ public class MultiKahaDBTransactionStore implements 
TransactionStore {
                         if (recoveredPendingCommit.contains(txid)) {
                             LOG.info("delivering pending commit outcome for 
tid: " + txid);
                             broker.commitTransaction(null, txid, false);
-
+                            recoveredPendingCommit.remove(txid);
                         } else {
                             LOG.info("delivering rollback outcome to store for 
tid: " + txid);
                             broker.forgetTransaction(null, txid);
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index 67d4c86..9a6e256 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -124,6 +124,14 @@ public class Journal {
         }
     }
 
+    public void setCleanupInterval(long cleanupInterval) {
+        this.cleanupInterval = cleanupInterval;
+    }
+
+    public long getCleanupInterval() {
+        return cleanupInterval;
+    }
+
     public enum PreallocationStrategy {
         SPARSE_FILE,
         OS_KERNEL_COPY,
@@ -230,6 +238,7 @@ public class Journal {
     protected PreallocationStrategy preallocationStrategy = 
PreallocationStrategy.SPARSE_FILE;
     private File osKernelCopyTemplateFile = null;
     private ByteBuffer preAllocateDirectBuffer = null;
+    private long cleanupInterval = DEFAULT_CLEANUP_INTERVAL;
 
     protected JournalDiskSyncStrategy journalDiskSyncStrategy = 
JournalDiskSyncStrategy.ALWAYS;
 
@@ -345,7 +354,7 @@ public class Journal {
             public void run() {
                 cleanup();
             }
-        }, DEFAULT_CLEANUP_INTERVAL, DEFAULT_CLEANUP_INTERVAL, 
TimeUnit.MILLISECONDS);
+        }, cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS);
 
         long end = System.currentTimeMillis();
         LOG.trace("Startup took: "+(end-start)+" ms");
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
new file mode 100644
index 0000000..4a7e9c6
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TransactionIdTransformer;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.*;
+
+public class MKahaDBTxRecoveryTest {
+
+    static final Logger LOG = 
LoggerFactory.getLogger(MKahaDBTxRecoveryTest.class);
+    private final static int maxFileLength = 1024*1024*32;
+
+    private final static String PREFIX_DESTINATION_NAME = "queue";
+
+    private final static String DESTINATION_NAME = PREFIX_DESTINATION_NAME + 
".test";
+    private final static String DESTINATION_NAME_2 = PREFIX_DESTINATION_NAME + 
"2.test";
+    private final static int CLEANUP_INTERVAL_MILLIS = 500;
+
+    BrokerService broker;
+    private List<KahaDBPersistenceAdapter> kahadbs = new 
LinkedList<KahaDBPersistenceAdapter>();
+
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected BrokerService createBroker(PersistenceAdapter kaha) throws 
Exception {
+        BrokerService broker = new BrokerService();
+        broker.setUseJmx(true);
+        broker.setBrokerName("localhost");
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+
+    @Test
+    public void testCommitOutcomeDeliveryOnRecovery() throws Exception {
+
+        prepareBrokerWithMultiStore(true);
+        broker.start();
+        broker.waitUntilStarted();
+
+
+        // Ensure we have an Admin View.
+        assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new 
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return (broker.getAdminView()) != null;
+            }
+        }));
+
+
+        final AtomicBoolean injectFailure = new AtomicBoolean(true);
+
+        final AtomicInteger reps = new AtomicInteger();
+        final AtomicReference<TransactionIdTransformer> delegate = new 
AtomicReference<TransactionIdTransformer>();
+
+        TransactionIdTransformer faultInjector  = new 
TransactionIdTransformer() {
+            @Override
+            public TransactionId transform(TransactionId txid) {
+                if (injectFailure.get() && reps.incrementAndGet() > 5) {
+                    throw new RuntimeException("Bla");
+                }
+                return delegate.get().transform(txid);
+            }
+        };
+        // set up kahadb to fail after N ops
+        for (KahaDBPersistenceAdapter pa : kahadbs) {
+            if (delegate.get() == null) {
+                delegate.set(pa.getStore().getTransactionIdTransformer());
+            }
+            pa.setTransactionIdTransformer(faultInjector);
+        }
+
+        ActiveMQConnectionFactory f = new 
ActiveMQConnectionFactory("vm://localhost");
+        f.setAlwaysSyncSend(true);
+        Connection c = f.createConnection();
+        c.start();
+        Session s = c.createSession(true, Session.SESSION_TRANSACTED);
+        MessageProducer producer = s.createProducer(new 
ActiveMQQueue(DESTINATION_NAME  + "," + DESTINATION_NAME_2));
+        producer.send(s.createTextMessage("HI"));
+        try {
+            s.commit();
+        } catch (Exception expected) {
+            expected.printStackTrace();
+        }
+
+        assertNotNull(broker.getDestination(new 
ActiveMQQueue(DESTINATION_NAME)));
+        assertNotNull(broker.getDestination(new 
ActiveMQQueue(DESTINATION_NAME_2)));
+
+        final Destination destination1 = broker.getDestination(new 
ActiveMQQueue(DESTINATION_NAME));
+        final Destination destination2 = broker.getDestination(new 
ActiveMQQueue(DESTINATION_NAME_2));
+
+        assertTrue("Partial commit - one dest has message", Wait.waitFor(new 
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return destination2.getMessageStore().getMessageCount() != 
destination1.getMessageStore().getMessageCount();
+            }
+        }));
+
+        // check completion on recovery
+        injectFailure.set(false);
+
+        // fire in many more local transactions to use N txStore journal files
+        for (int i=0; i<100; i++) {
+            producer.send(s.createTextMessage("HI"));
+            s.commit();
+        }
+
+        broker.stop();
+
+        // fail recovery processing on first attempt
+        prepareBrokerWithMultiStore(false);
+        broker.setPlugins(new BrokerPlugin[] {new BrokerPluginSupport() {
+
+            @Override
+            public void commitTransaction(ConnectionContext context, 
TransactionId xid, boolean onePhase) throws Exception {
+                // longer than CleanupInterval
+                TimeUnit.SECONDS.sleep( 2);
+                throw new RuntimeException("Sorry");
+            }
+        }});
+        broker.start();
+
+        // second recovery attempt should sort it
+        broker.stop();
+        prepareBrokerWithMultiStore(false);
+        broker.start();
+        broker.waitUntilStarted();
+
+        // verify commit completed
+        Destination destination = broker.getDestination(new 
ActiveMQQueue(DESTINATION_NAME));
+        assertEquals(101, destination.getMessageStore().getMessageCount());
+
+        destination = broker.getDestination(new 
ActiveMQQueue(DESTINATION_NAME_2));
+        assertEquals(101, destination.getMessageStore().getMessageCount());
+    }
+
+
+    protected KahaDBPersistenceAdapter createStore(boolean delete) throws 
IOException {
+        KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
+        kaha.setJournalMaxFileLength(maxFileLength);
+        kaha.setCleanupInterval(CLEANUP_INTERVAL_MILLIS);
+        if (delete) {
+            kaha.deleteAllMessages();
+        }
+        kahadbs.add(kaha);
+        return kaha;
+    }
+
+    public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws 
Exception {
+
+        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new 
MultiKahaDBPersistenceAdapter();
+        if (deleteAllMessages) {
+            multiKahaDBPersistenceAdapter.deleteAllMessages();
+        }
+        ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new 
ArrayList<FilteredKahaDBPersistenceAdapter>();
+
+        
adapters.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME, 
deleteAllMessages));
+        
adapters.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME + 
"2", deleteAllMessages));
+
+        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
+        multiKahaDBPersistenceAdapter.setJournalMaxFileLength(4*1024);
+        
multiKahaDBPersistenceAdapter.setJournalCleanupInterval(CLEANUP_INTERVAL_MILLIS);
+
+        broker = createBroker(multiKahaDBPersistenceAdapter);
+    }
+
+       private FilteredKahaDBPersistenceAdapter 
createFilteredKahaDBByDestinationPrefix(String destinationPrefix, boolean 
deleteAllMessages)
+                       throws IOException {
+               FilteredKahaDBPersistenceAdapter template = new 
FilteredKahaDBPersistenceAdapter();
+        template.setPersistenceAdapter(createStore(deleteAllMessages));
+        if (destinationPrefix != null) {
+            template.setQueue(destinationPrefix + ".>");
+        }
+               return template;
+       }
+}
\ No newline at end of file

Reply via email to