This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.15.x in repository https://gitbox.apache.org/repos/asf/activemq.git
commit ef0ec42885dc82bb31f266cc6cfe3c4065438453 Author: gtully <[email protected]> AuthorDate: Mon Jun 10 15:31:23 2019 +0100 AMQ-7225 - defer cleanup task operation till recovery processing complete, track prepared location in recovered ops to ensure they are retained on recovery failure. Fix and test (cherry picked from commit 93e726d6a7ba9ed44f5440369f8f9f1b41f49373) --- .../kahadb/MultiKahaDBPersistenceAdapter.java | 10 +- .../store/kahadb/MultiKahaDBTransactionStore.java | 23 ++- .../store/kahadb/disk/journal/Journal.java | 11 +- .../activemq/bugs/MKahaDBTxRecoveryTest.java | 224 +++++++++++++++++++++ 4 files changed, 262 insertions(+), 6 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java index 4bdb8de..56e5e92 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java @@ -56,7 +56,6 @@ import org.apache.activemq.store.TransactionIdTransformer; import org.apache.activemq.store.TransactionIdTransformerAware; import org.apache.activemq.store.TransactionStore; import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; -import org.apache.activemq.usage.StoreUsage; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOHelper; @@ -554,6 +553,15 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem return transactionStore.getJournalMaxWriteBatchSize(); } + + public void setJournalCleanupInterval(long journalCleanupInterval) { + transactionStore.setJournalCleanupInterval(journalCleanupInterval); + } + + public long getJournalCleanupInterval() { + return transactionStore.getJournalCleanupInterval(); + } + public List<PersistenceAdapter> getAdapters() { return Collections.unmodifiableList(adapters); } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java index ff70076..5befa92 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java @@ -66,6 +66,8 @@ public class MultiKahaDBTransactionStore implements TransactionStore { private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean recovered = new AtomicBoolean(false); + private long journalCleanupInterval = Journal.DEFAULT_CLEANUP_INTERVAL; public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) { this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter; @@ -188,6 +190,14 @@ public class MultiKahaDBTransactionStore implements TransactionStore { this.journalWriteBatchSize = journalWriteBatchSize; } + public void setJournalCleanupInterval(long journalCleanupInterval) { + this.journalCleanupInterval = journalCleanupInterval; + } + + public long getJournalCleanupInterval() { + return journalCleanupInterval; + } + public class Tx { private final Set<TransactionStore> stores = new HashSet<TransactionStore>(); private int prepareLocationId = 0; @@ -308,14 +318,19 @@ public class MultiKahaDBTransactionStore implements TransactionStore { journal.setDirectory(getDirectory()); journal.setMaxFileLength(journalMaxFileLength); journal.setWriteBatchSize(journalWriteBatchSize); + journal.setCleanupInterval(journalCleanupInterval); IOHelper.mkdirs(journal.getDirectory()); journal.start(); recoverPendingLocalTransactions(); + recovered.set(true); store(new KahaTraceCommand().setMessage("LOADED " + new Date())); } } private void txStoreCleanup() { + if (!recovered.get()) { + return; + } Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet()); for (Tx tx : inflightTransactions.values()) { knownDataFileIds.remove(tx.getPreparedLocationId()); @@ -342,7 +357,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore { private void recoverPendingLocalTransactions() throws IOException { Location location = journal.getNextLocation(null); while (location != null) { - process(load(location)); + process(location, load(location)); location = journal.getNextLocation(location); } recoveredPendingCommit.addAll(inflightTransactions.keySet()); @@ -361,11 +376,11 @@ public class MultiKahaDBTransactionStore implements TransactionStore { return message; } - public void process(JournalCommand<?> command) throws IOException { + public void process(final Location location, JournalCommand<?> command) throws IOException { switch (command.type()) { case KAHA_PREPARE_COMMAND: KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command; - getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo())); + getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo())).trackPrepareLocation(location); break; case KAHA_COMMIT_COMMAND: KahaCommitCommand commitCommand = (KahaCommitCommand) command; @@ -405,7 +420,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore { if (recoveredPendingCommit.contains(txid)) { LOG.info("delivering pending commit outcome for tid: " + txid); broker.commitTransaction(null, txid, false); - + recoveredPendingCommit.remove(txid); } else { LOG.info("delivering rollback outcome to store for tid: " + txid); broker.forgetTransaction(null, txid); diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java index 67d4c86..9a6e256 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java @@ -124,6 +124,14 @@ public class Journal { } } + public void setCleanupInterval(long cleanupInterval) { + this.cleanupInterval = cleanupInterval; + } + + public long getCleanupInterval() { + return cleanupInterval; + } + public enum PreallocationStrategy { SPARSE_FILE, OS_KERNEL_COPY, @@ -230,6 +238,7 @@ public class Journal { protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE; private File osKernelCopyTemplateFile = null; private ByteBuffer preAllocateDirectBuffer = null; + private long cleanupInterval = DEFAULT_CLEANUP_INTERVAL; protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS; @@ -345,7 +354,7 @@ public class Journal { public void run() { cleanup(); } - }, DEFAULT_CLEANUP_INTERVAL, DEFAULT_CLEANUP_INTERVAL, TimeUnit.MILLISECONDS); + }, cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS); long end = System.currentTimeMillis(); LOG.trace("Startup took: "+(end-start)+" ms"); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java new file mode 100644 index 0000000..4a7e9c6 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerPluginSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.TransactionIdTransformer; +import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.*; + +public class MKahaDBTxRecoveryTest { + + static final Logger LOG = LoggerFactory.getLogger(MKahaDBTxRecoveryTest.class); + private final static int maxFileLength = 1024*1024*32; + + private final static String PREFIX_DESTINATION_NAME = "queue"; + + private final static String DESTINATION_NAME = PREFIX_DESTINATION_NAME + ".test"; + private final static String DESTINATION_NAME_2 = PREFIX_DESTINATION_NAME + "2.test"; + private final static int CLEANUP_INTERVAL_MILLIS = 500; + + BrokerService broker; + private List<KahaDBPersistenceAdapter> kahadbs = new LinkedList<KahaDBPersistenceAdapter>(); + + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + } + } + + protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception { + BrokerService broker = new BrokerService(); + broker.setUseJmx(true); + broker.setBrokerName("localhost"); + broker.setPersistenceAdapter(kaha); + return broker; + } + + @Test + public void testCommitOutcomeDeliveryOnRecovery() throws Exception { + + prepareBrokerWithMultiStore(true); + broker.start(); + broker.waitUntilStarted(); + + + // Ensure we have an Admin View. + assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return (broker.getAdminView()) != null; + } + })); + + + final AtomicBoolean injectFailure = new AtomicBoolean(true); + + final AtomicInteger reps = new AtomicInteger(); + final AtomicReference<TransactionIdTransformer> delegate = new AtomicReference<TransactionIdTransformer>(); + + TransactionIdTransformer faultInjector = new TransactionIdTransformer() { + @Override + public TransactionId transform(TransactionId txid) { + if (injectFailure.get() && reps.incrementAndGet() > 5) { + throw new RuntimeException("Bla"); + } + return delegate.get().transform(txid); + } + }; + // set up kahadb to fail after N ops + for (KahaDBPersistenceAdapter pa : kahadbs) { + if (delegate.get() == null) { + delegate.set(pa.getStore().getTransactionIdTransformer()); + } + pa.setTransactionIdTransformer(faultInjector); + } + + ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost"); + f.setAlwaysSyncSend(true); + Connection c = f.createConnection(); + c.start(); + Session s = c.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = s.createProducer(new ActiveMQQueue(DESTINATION_NAME + "," + DESTINATION_NAME_2)); + producer.send(s.createTextMessage("HI")); + try { + s.commit(); + } catch (Exception expected) { + expected.printStackTrace(); + } + + assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME))); + assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2))); + + final Destination destination1 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)); + final Destination destination2 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)); + + assertTrue("Partial commit - one dest has message", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return destination2.getMessageStore().getMessageCount() != destination1.getMessageStore().getMessageCount(); + } + })); + + // check completion on recovery + injectFailure.set(false); + + // fire in many more local transactions to use N txStore journal files + for (int i=0; i<100; i++) { + producer.send(s.createTextMessage("HI")); + s.commit(); + } + + broker.stop(); + + // fail recovery processing on first attempt + prepareBrokerWithMultiStore(false); + broker.setPlugins(new BrokerPlugin[] {new BrokerPluginSupport() { + + @Override + public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { + // longer than CleanupInterval + TimeUnit.SECONDS.sleep( 2); + throw new RuntimeException("Sorry"); + } + }}); + broker.start(); + + // second recovery attempt should sort it + broker.stop(); + prepareBrokerWithMultiStore(false); + broker.start(); + broker.waitUntilStarted(); + + // verify commit completed + Destination destination = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)); + assertEquals(101, destination.getMessageStore().getMessageCount()); + + destination = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)); + assertEquals(101, destination.getMessageStore().getMessageCount()); + } + + + protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException { + KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter(); + kaha.setJournalMaxFileLength(maxFileLength); + kaha.setCleanupInterval(CLEANUP_INTERVAL_MILLIS); + if (delete) { + kaha.deleteAllMessages(); + } + kahadbs.add(kaha); + return kaha; + } + + public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception { + + MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter(); + if (deleteAllMessages) { + multiKahaDBPersistenceAdapter.deleteAllMessages(); + } + ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<FilteredKahaDBPersistenceAdapter>(); + + adapters.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME, deleteAllMessages)); + adapters.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME + "2", deleteAllMessages)); + + multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters); + multiKahaDBPersistenceAdapter.setJournalMaxFileLength(4*1024); + multiKahaDBPersistenceAdapter.setJournalCleanupInterval(CLEANUP_INTERVAL_MILLIS); + + broker = createBroker(multiKahaDBPersistenceAdapter); + } + + private FilteredKahaDBPersistenceAdapter createFilteredKahaDBByDestinationPrefix(String destinationPrefix, boolean deleteAllMessages) + throws IOException { + FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter(); + template.setPersistenceAdapter(createStore(deleteAllMessages)); + if (destinationPrefix != null) { + template.setQueue(destinationPrefix + ".>"); + } + return template; + } +} \ No newline at end of file
