Repository: qpid-broker-j Updated Branches: refs/heads/master 32733fc22 -> cf1af38c6
QPID-7811: Ensure that last stored message is recovered by AsynchronousMessageStoreRecoverer Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/cf1af38c Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/cf1af38c Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/cf1af38c Branch: refs/heads/master Commit: cf1af38c61727f6608c0505134137cbcd067ca6e Parents: 32733fc Author: Alex Rudyy <[email protected]> Authored: Fri Jun 9 16:58:23 2017 +0100 Committer: Alex Rudyy <[email protected]> Committed: Fri Jun 9 16:58:40 2017 +0100 ---------------------------------------------------------------------- .../AsynchronousMessageStoreRecoverer.java | 2 +- .../AsynchronousMessageStoreRecovererTest.java | 20 +++++++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/cf1af38c/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java index 31c20f6..777674b 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java +++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java @@ -200,7 +200,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer public boolean handle(final StoredMessage<?> storedMessage) { long messageNumber = storedMessage.getMessageNumber(); - if ( _continueRecovery.get() && messageNumber < _maxMessageId - 1) + if ( _continueRecovery.get() && messageNumber < _maxMessageId) { if (!_recoveredMessages.containsKey(messageNumber)) { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/cf1af38c/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java ---------------------------------------------------------------------- diff --git a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java index c5a8a24..f8a0589 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java @@ -21,7 +21,9 @@ package org.apache.qpid.server.virtualhost; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -35,8 +37,10 @@ import java.util.UUID; import java.util.concurrent.ExecutionException; import com.google.common.util.concurrent.ListenableFuture; +import org.mockito.ArgumentMatcher; import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.MessageStore; @@ -112,7 +116,8 @@ public class AsynchronousMessageStoreRecovererTest extends QpidTestCase when(_store.newTransaction()).thenReturn(mock(Transaction.class)); final List<StoredMessage<?>> testMessages = new ArrayList<>(); - testMessages.add(createTestMessage(1L)); + StoredMessage<?> storedMessage = createTestMessage(1L); + testMessages.add(storedMessage); StoredMessage newMessage = createTestMessage(4L); testMessages.add(newMessage); @@ -129,6 +134,19 @@ public class AsynchronousMessageStoreRecovererTest extends QpidTestCase assertNull(result.get()); verify(newMessage, times(0)).remove(); + verify(queue).recover(argThat(new ArgumentMatcher<ServerMessage>() + { + @Override + public boolean matches(final Object argument) + { + if (argument instanceof ServerMessage) + { + ServerMessage serverMessage = (ServerMessage)argument; + return serverMessage.getMessageNumber() == storedMessage.getMessageNumber(); + } + return false; + } + }), same(messageEnqueueRecord)); } private StoredMessage<?> createTestMessage(final long messageNumber) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
