Repository: activemq Updated Branches: refs/heads/master 3ac3a420a -> 7c890d477
AMQ-7067 - test and fix for eager ack compaction moving acks from data files with in progress tx and clearing the link from the prepare record, that now uses the same ack map Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7c890d47 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7c890d47 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7c890d47 Branch: refs/heads/master Commit: 7c890d477663d91aef518e30d60cf3c13827877a Parents: 3ac3a42 Author: gtully <[email protected]> Authored: Fri Oct 12 14:51:37 2018 +0100 Committer: gtully <[email protected]> Committed: Fri Oct 12 14:51:37 2018 +0100 ---------------------------------------------------------------------- .../activemq/store/kahadb/MessageDatabase.java | 27 +++++- .../org/apache/activemq/bugs/AMQ7067Test.java | 90 +++++++++++++++++++- 2 files changed, 113 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/7c890d47/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index d231a86..dfb40ec 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -65,7 +65,6 @@ import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.Topic; -import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.TransactionId; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.protobuf.Buffer; @@ -2037,7 +2036,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } // Check if we found one, or if we only found the current file being written to. - if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) { + if (journalToAdvance == -1 || blockedFromCompaction(journalToAdvance)) { return; } @@ -2077,8 +2076,30 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } + // called with the index lock held + private boolean blockedFromCompaction(int journalToAdvance) { + // don't forward the current data file + if (journalToAdvance == journal.getCurrentDataFileId()) { + return true; + } + // don't forward any data file with inflight transaction records because it will whack the tx - data file link + // in the ack map when all acks are migrated (now that the ack map is not just for acks) + // TODO: prepare records can be dropped but completion records (maybe only commit outcomes) need to be migrated + // as part of the forward work. + Location[] inProgressTxRange = getInProgressTxLocationRange(); + if (inProgressTxRange[0] != null) { + for (int pendingTx = inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) { + if (journalToAdvance == pendingTx) { + LOG.trace("Compaction target:{} blocked by inflight transaction records: {}", journalToAdvance, inProgressTxRange); + return true; + } + } + } + return false; + } + private void forwardAllAcks(Integer journalToRead, Set<Integer> journalLogsReferenced) throws IllegalStateException, IOException { - LOG.trace("Attempting to move all acks in journal:{} to the front.", journalToRead); + LOG.trace("Attempting to move all acks in journal:{} to the front. Referenced files:{}", journalToRead, journalLogsReferenced); DataFile forwardsFile = journal.reserveDataFile(); forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE); http://git-wip-us.apache.org/repos/asf/activemq/blob/7c890d47/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java index d00ee41..4997632 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java @@ -13,9 +13,12 @@ import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.MessageDatabase; import org.apache.activemq.util.JMXSupport; import org.apache.activemq.util.Wait; import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Level; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -114,6 +117,82 @@ public class AMQ7067Test { ((org.apache.activemq.broker.region.Queue) broker.getRegionBroker().getDestinationMap().get(queue)).purge(); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 0 == getQueueSize(queue.getQueueName()); + } + }); + + // force gc + broker.getPersistenceAdapter().checkpoint(true); + + Xid[] xids = xaRes.recover(TMSTARTRSCAN); + + //Should be 1 since we have only 1 prepared + assertEquals(1, xids.length); + connection.close(); + + broker.stop(); + broker.waitUntilStopped(); + createBroker(); + + setupXAConnection(); + xids = xaRes.recover(TMSTARTRSCAN); + + System.out.println("****** recovered = " + xids); + + // THIS SHOULD NOT FAIL AS THERE SHOULD DBE ONLY 1 TRANSACTION! + assertEquals(1, xids.length); + } + + @Test + public void testXAPrepareWithAckCompactionDoesNotLooseInflight() throws Exception { + + // investigate liner gc issue - store usage not getting released + org.apache.log4j.Logger.getLogger(MessageDatabase.class).setLevel(Level.TRACE); + + + setupXAConnection(); + + Queue holdKahaDb = xaSession.createQueue("holdKahaDb"); + + MessageProducer holdKahaDbProducer = xaSession.createProducer(holdKahaDb); + + XATransactionId txid = createXATransaction(); + System.out.println("****** create new txid = " + txid); + xaRes.start(txid, TMNOFLAGS); + + TextMessage helloMessage = xaSession.createTextMessage(StringUtils.repeat("a", 10)); + holdKahaDbProducer.send(helloMessage); + xaRes.end(txid, TMSUCCESS); + + Queue queue = xaSession.createQueue("test"); + + produce(xaRes, xaSession, queue, 100, 512 * 1024); + + xaRes.prepare(txid); + + produce(xaRes, xaSession, queue, 100, 512 * 1024); + + ((org.apache.activemq.broker.region.Queue) broker.getRegionBroker().getDestinationMap().get(queue)).purge(); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 0 == getQueueSize(queue.getQueueName()); + } + }); + + // force gc, two data files requires two cycles + int limit = ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).getCompactAcksAfterNoGC() + 1; + for (int i=0; i<limit*2; i++) { + broker.getPersistenceAdapter().checkpoint(true); + } + + // ack compaction task operates in the background + TimeUnit.SECONDS.sleep(5); + Xid[] xids = xaRes.recover(TMSTARTRSCAN); //Should be 1 since we have only 1 prepared @@ -160,6 +239,16 @@ public class AMQ7067Test { ((org.apache.activemq.broker.region.Queue) broker.getRegionBroker().getDestinationMap().get(queue)).purge(); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 0 == getQueueSize(queue.getQueueName()); + } + }); + + // force gc + broker.getPersistenceAdapter().checkpoint(true); + Xid[] xids = xaRes.recover(TMSTARTRSCAN); //Should be 1 since we have only 1 prepared @@ -343,7 +432,6 @@ public class AMQ7067Test { TextMessage helloMessage = xaSession.createTextMessage(StringUtils.repeat("a", messageSize)); producer.send(helloMessage); xaRes.end(txid, TMSUCCESS); - xaRes.prepare(txid); xaRes.commit(txid, true); } }
