Repository: activemq Updated Branches: refs/heads/master ed395d1a8 -> a359d8152
[AMQ-6277] take account of producer audit not being updatated on recovery check, avoid unnecessary partial journal replay Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a359d815 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a359d815 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a359d815 Branch: refs/heads/master Commit: a359d8152cfee6f2fe95d34fd1b2296f6ed2670c Parents: ed395d1 Author: gtully <gary.tu...@gmail.com> Authored: Fri Sep 15 13:48:03 2017 +0100 Committer: gtully <gary.tu...@gmail.com> Committed: Fri Sep 15 13:48:03 2017 +0100 ---------------------------------------------------------------------- .../activemq/store/kahadb/MessageDatabase.java | 35 ++++++-------- .../kahadb/KahaDBPersistenceAdapterTest.java | 49 ++++++++++++++++++-- 2 files changed, 59 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/a359d815/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 40e8f95..a6d3cc8 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -671,17 +671,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe try { long start = System.currentTimeMillis(); - Location afterProducerAudit = recoverProducerAudit(); - Location afterAckMessageFile = recoverAckMessageFileMap(); + boolean requiresJournalReplay = recoverProducerAudit(); + requiresJournalReplay |= recoverAckMessageFileMap(); Location lastIndoubtPosition = getRecoveryPosition(); - - if (afterProducerAudit != null && afterProducerAudit.equals(metadata.ackMessageFileMapLocation)) { - // valid checkpoint, possible recover from afterAckMessageFile - afterProducerAudit = null; - } - Location recoveryPosition = minimum(afterProducerAudit, afterAckMessageFile); - recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition); - + Location recoveryPosition = requiresJournalReplay ? journal.getNextLocation(null) : lastIndoubtPosition; if (recoveryPosition != null) { int redoCounter = 0; int dataFileRotationTracker = recoveryPosition.getDataFileId(); @@ -784,7 +777,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return min; } - private Location recoverProducerAudit() throws IOException { + private boolean recoverProducerAudit() throws IOException { + boolean requiresReplay = true; if (metadata.producerSequenceIdTrackerLocation != null) { try { KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation); @@ -794,33 +788,30 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth); metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers); - return getNextInitializedLocation(metadata.producerSequenceIdTrackerLocation); + requiresReplay = false; } catch (Exception e) { LOG.warn("Cannot recover message audit", e); - return journal.getNextLocation(null); } - } else { - // got no audit stored so got to recreate via replay from start of the journal - return journal.getNextLocation(null); } + // got no audit stored so got to recreate via replay from start of the journal + return requiresReplay; } @SuppressWarnings("unchecked") - private Location recoverAckMessageFileMap() throws IOException { + private boolean recoverAckMessageFileMap() throws IOException { + boolean requiresReplay = true; if (metadata.ackMessageFileMapLocation != null) { try { KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation); ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput()); metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject(); - return getNextInitializedLocation(metadata.ackMessageFileMapLocation); + requiresReplay = false; } catch (Exception e) { LOG.warn("Cannot recover ackMessageFileMap", e); - return journal.getNextLocation(null); } - } else { - // got no ackMessageFileMap stored so got to recreate via replay from start of the journal - return journal.getNextLocation(null); } + // got no ackMessageFileMap stored so got to recreate via replay from start of the journal + return requiresReplay; } protected void recoverIndex(Transaction tx) throws IOException { http://git-wip-us.apache.org/repos/asf/activemq/blob/a359d815/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java index c45c3e5..f509011 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java @@ -16,11 +16,18 @@ */ package org.apache.activemq.store.kahadb; -import java.io.File; -import java.io.IOException; - +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapterTestSupport; +import org.apache.activemq.util.DefaultTestAppender; +import org.apache.log4j.Appender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; /** * @@ -36,4 +43,40 @@ public class KahaDBPersistenceAdapterTest extends PersistenceAdapterTestSupport } return kaha; } + + public void testNoReplayOnStop() throws Exception { + brokerService.getPersistenceAdapter().checkpoint(true); + brokerService.stop(); + + final AtomicBoolean gotSomeReplay = new AtomicBoolean(Boolean.FALSE); + final AtomicBoolean trappedLogMessages = new AtomicBoolean(Boolean.FALSE); + + Appender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + trappedLogMessages.set(true); + if (event.getLevel().equals(Level.INFO)) { + if (event.getMessage().toString().contains("Recovery replayed ")) { + gotSomeReplay.set(true); + } + } + } + }; + + try { + Logger.getLogger(MessageDatabase.class.getName()).addAppender(appender); + Logger.getLogger(MessageDatabase.class.getName()).setLevel(Level.INFO); + + brokerService = new BrokerService(); + pa = createPersistenceAdapter(false); + brokerService.setPersistenceAdapter(pa); + brokerService.start(); + + } finally { + Logger.getRootLogger().removeAppender(appender); + Logger.getLogger(MessageDatabase.class.getName()).removeAppender(appender); + } + assertTrue("log capture working", trappedLogMessages.get()); + assertFalse("no replay message in the log", gotSomeReplay.get()); + } }