Author: gsim
Date: Mon Feb 21 17:30:17 2011
New Revision: 1073085
URL: http://svn.apache.org/viewvc?rev=1073085&view=rev
Log:
QPID-3051: Ensure credit window is moved correctly even if it contains rejected
messages.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1073085&r1=1073084&r2=1073085&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Mon Feb 21 17:30:17
2011
@@ -131,18 +131,20 @@ void DeliveryRecord::committed() const{
void DeliveryRecord::reject()
{
- Exchange::shared_ptr alternate = queue->getAlternateExchange();
- if (alternate) {
- DeliverableMessage delivery(msg.payload);
- alternate->route(delivery, msg.payload->getRoutingKey(),
msg.payload->getApplicationHeaders());
- QPID_LOG(info, "Routed rejected message from " << queue->getName() <<
" to "
- << alternate->getName());
- } else {
- //just drop it
- QPID_LOG(info, "Dropping rejected message from " << queue->getName());
+ if (acquired && !ended) {
+ Exchange::shared_ptr alternate = queue->getAlternateExchange();
+ if (alternate) {
+ DeliverableMessage delivery(msg.payload);
+ alternate->route(delivery, msg.payload->getRoutingKey(),
msg.payload->getApplicationHeaders());
+ QPID_LOG(info, "Routed rejected message from " << queue->getName()
<< " to "
+ << alternate->getName());
+ } else {
+ //just drop it
+ QPID_LOG(info, "Dropping rejected message from " <<
queue->getName());
+ }
+ dequeue();
+ setEnded();
}
-
- dequeue();
}
uint32_t DeliveryRecord::getCredit() const
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1073085&r1=1073084&r2=1073085&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Feb 21 17:30:17
2011
@@ -697,8 +697,11 @@ void SemanticState::reject(DeliveryId fi
{
AckRange range = findRange(first, last);
for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject));
- //need to remove the delivery records as well
- unacked.erase(range.start, range.end);
+ //may need to remove the delivery records as well
+ for (DeliveryRecords::iterator i = range.start; i != unacked.end() &&
i->getId() <= last; ) {
+ if (i->isRedundant()) i = unacked.erase(i);
+ else i++;
+ }
}
bool SemanticState::ConsumerImpl::doOutput()
Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=1073085&r1=1073084&r2=1073085&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Mon Feb 21 17:30:17
2011
@@ -937,6 +937,47 @@ QPID_AUTO_TEST_CASE(testQmfCreateAndDele
}
}
+QPID_AUTO_TEST_CASE(testRejectAndCredit)
+{
+ //Ensure credit is restored on completing rejected messages
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+
+ const uint count(10);
+ receiver.setCapacity(count);
+ for (uint i = 0; i < count; i++) {
+ sender.send(Message((boost::format("Message_%1%") % (i+1)).str()));
+ }
+
+ Message in;
+ for (uint i = 0; i < count; ++i) {
+ if (receiver.fetch(in, Duration::SECOND)) {
+ BOOST_CHECK_EQUAL(in.getContent(), (boost::format("Message_%1%") %
(i+1)).str());
+ fix.session.reject(in);
+ } else {
+ BOOST_FAIL((boost::format("Message_%1% not received as expected")
% (i+1)).str());
+ break;
+ }
+ }
+ //send another batch of messages
+ for (uint i = 0; i < count; i++) {
+ sender.send(Message((boost::format("Message_%1%") % (i+count)).str()));
+ }
+
+ for (uint i = 0; i < count; ++i) {
+ if (receiver.fetch(in, Duration::SECOND)) {
+ BOOST_CHECK_EQUAL(in.getContent(), (boost::format("Message_%1%") %
(i+count)).str());
+ } else {
+ BOOST_FAIL((boost::format("Message_%1% not received as expected")
% (i+count)).str());
+ break;
+ }
+ }
+ fix.session.acknowledge();
+ receiver.close();
+ sender.close();
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]