reflect need for store to set messageId.setFutureOrSequenceLong for journaled jdbc
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e8f81551 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e8f81551 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e8f81551 Branch: refs/heads/trunk Commit: e8f8155141e75ba10c1ba03d9031b6128a735507 Parents: 8a37f97 Author: gtully <[email protected]> Authored: Sat Aug 30 23:51:59 2014 +0100 Committer: gtully <[email protected]> Committed: Sat Aug 30 23:51:59 2014 +0100 ---------------------------------------------------------------------- .../activemq/store/journal/JournalMessageStore.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/e8f81551/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java index 08276d3..2d44769 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java @@ -34,6 +34,7 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; +import org.apache.activemq.store.IndexListener; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; @@ -88,7 +89,7 @@ public class JournalMessageStore extends AbstractMessageStore { * Not synchronized since the Journal has better throughput if you increase * the number of concurrent writes that it is doing. */ - public void addMessage(ConnectionContext context, final Message message) throws IOException { + public void addMessage(final ConnectionContext context, final Message message) throws IOException { final MessageId id = message.getMessageId(); @@ -100,7 +101,7 @@ public class JournalMessageStore extends AbstractMessageStore { if (debug) { LOG.debug("Journalled message add for: " + id + ", at: " + location); } - addMessage(message, location); + addMessage(context, message, location); } else { if (debug) { LOG.debug("Journalled transacted message add for: " + id + ", at: " + location); @@ -116,7 +117,7 @@ public class JournalMessageStore extends AbstractMessageStore { } synchronized (JournalMessageStore.this) { inFlightTxLocations.remove(location); - addMessage(message, location); + addMessage(context, message, location); } } @@ -133,11 +134,15 @@ public class JournalMessageStore extends AbstractMessageStore { } } - void addMessage(final Message message, final RecordLocation location) { + void addMessage(ConnectionContext context, final Message message, final RecordLocation location) { synchronized (this) { lastLocation = location; MessageId id = message.getMessageId(); messages.put(id, message); + message.getMessageId().setFutureOrSequenceLong(0l); + if (indexListener != null) { + indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); + } } }
