Repository: activemq Updated Branches: refs/heads/master a28a091c5 -> 1c4108545
https://issues.apache.org/jira/browse/AMQ-6277 - journal getNextLocation needs too passes to skip past if target is not initialized Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1c410854 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1c410854 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1c410854 Branch: refs/heads/master Commit: 1c4108545c1cdc7ba6017d11015d8233b1218e0f Parents: a28a091c Author: gtully <[email protected]> Authored: Wed May 4 22:09:06 2016 +0100 Committer: gtully <[email protected]> Committed: Wed May 4 22:09:06 2016 +0100 ---------------------------------------------------------------------- .../activemq/store/kahadb/MessageDatabase.java | 36 ++++++++++++++------ 1 file changed, 25 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/1c410854/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 4a23cbc..7252bb9 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -111,6 +111,8 @@ import org.apache.activemq.util.ThreadPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET; + public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware { protected BrokerService brokerService; @@ -625,12 +627,16 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe try { long start = System.currentTimeMillis(); - Location producerAuditPosition = recoverProducerAudit(); - Location ackMessageFileLocation = recoverAckMessageFileMap(); + Location afterProducerAudit = recoverProducerAudit(); + Location afterAckMessageFile = recoverAckMessageFileMap(); Location lastIndoubtPosition = getRecoveryPosition(); - Location recoveryPosition = startOfRecovery(producerAuditPosition, ackMessageFileLocation); - recoveryPosition = startOfRecovery(recoveryPosition, lastIndoubtPosition); + if (afterProducerAudit != null && afterProducerAudit.equals(metadata.ackMessageFileMapLocation)) { + // valid checkpoint, possible recover from afterAckMessageFile + afterProducerAudit = null; + } + Location recoveryPosition = minimum(afterProducerAudit, afterAckMessageFile); + recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition); if (recoveryPosition != null) { int redoCounter = 0; @@ -711,8 +717,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return TransactionIdConversion.convertToLocal(tx); } - private Location startOfRecovery(Location x, - Location y) { + private Location minimum(Location x, + Location y) { Location min = null; if (x != null) { min = x; @@ -720,8 +726,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe int compare = y.compareTo(x); if (compare < 0) { min = y; - } else if (compare == 0) { - min = null; // no recovery needed on a matched location } } } else { @@ -740,7 +744,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth); metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers); - return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation); + return getNextInitializedLocation(metadata.producerSequenceIdTrackerLocation); } catch (Exception e) { LOG.warn("Cannot recover message audit", e); return journal.getNextLocation(null); @@ -758,7 +762,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe try { ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput()); metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject(); - return journal.getNextLocation(metadata.ackMessageFileMapLocation); + return getNextInitializedLocation(metadata.ackMessageFileMapLocation); } catch (Exception e) { LOG.warn("Cannot recover ackMessageFileMap", e); return journal.getNextLocation(null); @@ -986,13 +990,23 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // Perhaps there were no transactions... if( metadata.lastUpdate!=null) { // Start replay at the record after the last one recorded in the index file. - return journal.getNextLocation(metadata.lastUpdate); + return getNextInitializedLocation(metadata.lastUpdate); } } // This loads the first position. return journal.getNextLocation(null); } + private Location getNextInitializedLocation(Location location) throws IOException { + Location mayNotBeInitialized = journal.getNextLocation(location); + if (location.getSize() == NOT_SET && mayNotBeInitialized.getSize() != NOT_SET) { + // need to init size and type to skip + return journal.getNextLocation(mayNotBeInitialized); + } else { + return mayNotBeInitialized; + } + } + protected void checkpointCleanup(final boolean cleanup) throws IOException { long start; this.indexLock.writeLock().lock();
