Repository: qpid-cpp Updated Branches: refs/heads/master 09324a7b1 -> 3b9b412e6
QPID-7406: reset cursors for cosnumers if message is released Project: http://git-wip-us.apache.org/repos/asf/qpid-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-cpp/commit/3b9b412e Tree: http://git-wip-us.apache.org/repos/asf/qpid-cpp/tree/3b9b412e Diff: http://git-wip-us.apache.org/repos/asf/qpid-cpp/diff/3b9b412e Branch: refs/heads/master Commit: 3b9b412e6519d7e653a2b75dfdcbcb090af1d92d Parents: 09324a7 Author: Gordon Sim <g...@redhat.com> Authored: Mon Aug 29 13:27:00 2016 +0100 Committer: Gordon Sim <g...@redhat.com> Committed: Mon Aug 29 13:27:11 2016 +0100 ---------------------------------------------------------------------- src/qpid/broker/MessageMap.cpp | 8 ++++++- src/qpid/broker/MessageMap.h | 1 + src/tests/MessagingSessionTests.cpp | 41 ++++++++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/3b9b412e/src/qpid/broker/MessageMap.cpp ---------------------------------------------------------------------- diff --git a/src/qpid/broker/MessageMap.cpp b/src/qpid/broker/MessageMap.cpp index 4cdd83c..c996236 100644 --- a/src/qpid/broker/MessageMap.cpp +++ b/src/qpid/broker/MessageMap.cpp @@ -82,10 +82,15 @@ Message* MessageMap::find(const framing::SequenceNumber& position, QueueCursor* } } +bool MessageMap::reset(const QueueCursor& cursor) +{ + return !cursor.valid || (cursor.type == CONSUMER && cursor.version != version); +} + Message* MessageMap::next(QueueCursor& cursor) { Ordering::iterator i; - if (!cursor.valid) i = messages.begin(); //start with oldest message + if (reset(cursor)) i = messages.begin(); //start with oldest message else i = messages.upper_bound(cursor.position); //get first message that is greater than position while (i != messages.end()) { @@ -137,6 +142,7 @@ Message* MessageMap::release(const QueueCursor& cursor) Ordering::iterator i = messages.find(cursor.position); if (i != messages.end()) { i->second.setState(AVAILABLE); + version++; return &i->second; } else { return 0; http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/3b9b412e/src/qpid/broker/MessageMap.h ---------------------------------------------------------------------- diff --git a/src/qpid/broker/MessageMap.h b/src/qpid/broker/MessageMap.h index c30606d..600ad62 100644 --- a/src/qpid/broker/MessageMap.h +++ b/src/qpid/broker/MessageMap.h @@ -65,6 +65,7 @@ class MessageMap : public Messages std::string getKey(const Message&); virtual const Message& replace(const Message&, const Message&); void erase(Ordering::iterator); + bool reset(const QueueCursor& cursor); }; }} // namespace qpid::broker http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/3b9b412e/src/tests/MessagingSessionTests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/MessagingSessionTests.cpp b/src/tests/MessagingSessionTests.cpp index 2a953d2..c6d8c39 100644 --- a/src/tests/MessagingSessionTests.cpp +++ b/src/tests/MessagingSessionTests.cpp @@ -1653,6 +1653,47 @@ QPID_AUTO_TEST_CASE(testPriorityRingEviction) BOOST_CHECK(!receiver.fetch(msg, Duration::IMMEDIATE)); } +QPID_AUTO_TEST_CASE(testReleaseResetsCursor) +{ + QueueFixture fix; + Sender sender = fix.session.createSender(fix.queue); + send(sender, 10); + Receiver r1 = fix.session.createReceiver(fix.queue); + Receiver r2 = fix.session.createReceiver(fix.queue); + Message m1; + BOOST_CHECK(r1.fetch(m1, Duration::IMMEDIATE)); + BOOST_CHECK_EQUAL(m1.getContent(), "Message_1"); + for (uint i = 1; i < 10; i++) { + Message msg; + BOOST_CHECK(r2.fetch(msg, Duration::IMMEDIATE)); + BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str()); + } + fix.session.release(m1); + Message msg; + BOOST_CHECK(r2.fetch(msg, Duration::IMMEDIATE)); + BOOST_CHECK_EQUAL(msg.getContent(), "Message_1"); + fix.session.acknowledge(); +} + +QPID_AUTO_TEST_CASE(testReleaseResetsCursorForLVQ) +{ + MessagingFixture fix; + std::string queue("queue; {create:always, node:{x-declare:{auto-delete:True, arguments:{qpid.last_value_queue_key:qpid.subject}}}}"); + Sender sender = fix.session.createSender(queue); + sender.send(Message("please release me")); + Receiver r1 = fix.session.createReceiver(queue); + Receiver r2 = fix.session.createReceiver(queue); + Message m1; + Message m2; + BOOST_CHECK(r1.fetch(m1, Duration::IMMEDIATE)); + BOOST_CHECK_EQUAL(m1.getContent(), "please release me"); + BOOST_CHECK(!r2.fetch(m2, Duration::IMMEDIATE)); + fix.session.release(m1); + BOOST_CHECK(r2.fetch(m2, Duration::SECOND*5)); + BOOST_CHECK_EQUAL(m2.getContent(), "please release me"); + fix.session.acknowledge(); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org