https://issues.apache.org/jira/browse/AMQ-5277 - jdbc store make use of entryLocator on ack
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3b5d89a7 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3b5d89a7 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3b5d89a7 Branch: refs/heads/trunk Commit: 3b5d89a78b12e63f998f239a155f71a557a87ea9 Parents: a9b8d98 Author: gtully <gary.tu...@gmail.com> Authored: Mon Jul 14 16:58:47 2014 +0100 Committer: gtully <gary.tu...@gmail.com> Committed: Mon Jul 14 17:02:19 2014 +0100 ---------------------------------------------------------------------- .../java/org/apache/activemq/broker/region/BaseDestination.java | 5 +++-- .../java/org/apache/activemq/store/jdbc/JDBCMessageStore.java | 5 ++++- 2 files changed, 7 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/3b5d89a7/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index c3841c8..03513aa 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -782,10 +782,11 @@ public abstract class BaseDestination implements Destination { ack.copy(a); ack = a; // Convert to non-ranged. - ack.setFirstMessageId(node.getMessageId()); - ack.setLastMessageId(node.getMessageId()); ack.setMessageCount(1); } + // always use node messageId so we can access entry/data Location + ack.setFirstMessageId(node.getMessageId()); + ack.setLastMessageId(node.getMessageId()); return ack; } http://git-wip-us.apache.org/repos/asf/activemq/blob/3b5d89a7/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index 4badb09..c3d5594 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -237,7 +237,9 @@ public class JDBCMessageStore extends AbstractMessageStore { public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { - long seq = persistenceAdapter.getStoreSequenceIdForMessageId(ack.getLastMessageId(), destination)[0]; + long seq = ack.getLastMessageId().getEntryLocator() != null ? + (Long) ack.getLastMessageId().getEntryLocator() : + persistenceAdapter.getStoreSequenceIdForMessageId(ack.getLastMessageId(), destination)[0]; // Get a connection and remove the message from the DB TransactionContext c = persistenceAdapter.getTransactionContext(context); @@ -339,6 +341,7 @@ public class JDBCMessageStore extends AbstractMessageStore { public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); msg.getMessageId().setBrokerSequenceId(sequenceId); + msg.getMessageId().setEntryLocator(sequenceId); listener.recoverMessage(msg); lastRecoveredSequenceId.set(sequenceId); lastRecoveredPriority.set(msg.getPriority());