Repository: activemq Updated Branches: refs/heads/trunk 350889c1e -> 135226533
https://issues.apache.org/jira/browse/AMQ-5394 - applied patch for kahadb lastUpdate tracking from Jesse Fugitt with thanks Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/13522653 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/13522653 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/13522653 Branch: refs/heads/trunk Commit: 135226533f9c21e33fccd19df6b1ad506a15a1cb Parents: 350889c Author: gtully <gary.tu...@gmail.com> Authored: Fri Oct 24 14:42:25 2014 +0100 Committer: gtully <gary.tu...@gmail.com> Committed: Fri Oct 24 14:42:25 2014 +0100 ---------------------------------------------------------------------- .../org/apache/activemq/store/kahadb/MessageDatabase.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/13522653/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 88dde75..13e9b3c 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1306,6 +1306,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) { addAckLocationForNewMessage(tx, sd, id); } + metadata.lastUpdate = location; } else { // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId()); @@ -1318,10 +1319,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // added message. We don't want to assign it a new id as the other indexes would // be wrong.. sd.locationIndex.put(tx, location, previous); + metadata.lastUpdate = location; } // record this id in any event, initial send or recovery metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); - metadata.lastUpdate = location; return id; } @@ -1355,10 +1356,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe if(previousKeys != null) { sd.locationIndex.remove(tx, previousKeys.location); } + metadata.lastUpdate = location; } else { LOG.warn("Non existent message update attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId()); } - metadata.lastUpdate = location; } void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { @@ -1372,6 +1373,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe if (keys != null) { sd.locationIndex.remove(tx, keys.location); recordAckMessageReferenceLocation(ackLocation, keys.location); + metadata.lastUpdate = ackLocation; } else if (LOG.isDebugEnabled()) { LOG.debug("message not found in order index: " + sequenceId + " for: " + command.getMessageId()); } @@ -1398,12 +1400,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } // The following method handles deleting un-referenced messages. removeAckLocation(tx, sd, subscriptionKey, sequence); + metadata.lastUpdate = ackLocation; } else if (LOG.isDebugEnabled()) { LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey()); } } - metadata.lastUpdate = ackLocation; } private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {