Repository: activemq Updated Branches: refs/heads/master 86c826c46 -> fcabcd282
https://issues.apache.org/jira/browse/AMQ-5960 - rework fix to reset the next sequence so that the next ack position and message reference gets cleared up in normal operation Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fcabcd28 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fcabcd28 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fcabcd28 Branch: refs/heads/master Commit: fcabcd282dc01c6faaff3c7627882f42a64543b4 Parents: 86c826c Author: gtully <[email protected]> Authored: Wed Sep 23 13:15:29 2015 +0100 Committer: gtully <[email protected]> Committed: Wed Sep 23 13:15:29 2015 +0100 ---------------------------------------------------------------------- .../activemq/store/kahadb/MessageDatabase.java | 21 +++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/fcabcd28/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 815b9df..3512190 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1319,7 +1319,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // Add the message. int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY; - long id = sd.orderIndex.getNextMessageId(priority); + long id = sd.orderIndex.getNextMessageId(); Long previous = sd.locationIndex.put(tx, location, id); if (previous == null) { previous = sd.messageIdIndex.put(tx, command.getMessageId(), id); @@ -1346,16 +1346,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // added message. We don't want to assign it a new id as the other indexes would // be wrong.. sd.locationIndex.put(tx, location, previous); + // ensure sequence is not broken + sd.orderIndex.revertNextMessageId(); metadata.lastUpdate = location; - // remove ack positions - if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) { - Iterator<Entry<String, SequenceSet>> it = sd.ackPositions.iterator(tx); - while (it.hasNext()) { - Entry<String, SequenceSet> entry = it.next(); - entry.getValue().remove(id); - } - } - } // record this id in any event, initial send or recovery metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); @@ -1443,7 +1436,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe removeAckLocation(command, tx, sd, subscriptionKey, sequence); metadata.lastUpdate = ackLocation; } else if (LOG.isDebugEnabled()) { - LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey()); + LOG.debug("on ack, no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey()); } } @@ -3183,10 +3176,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe deletes.add(iterator.next()); } - long getNextMessageId(int priority) { + long getNextMessageId() { return nextMessageId++; } + void revertNextMessageId() { + nextMessageId--; + } + MessageKeys get(Transaction tx, Long key) throws IOException { MessageKeys result = defaultPriorityIndex.get(tx, key); if (result == null) {
