Author: gsim Date: Thu Apr 28 19:01:29 2016 New Revision: 1741491 URL: http://svn.apache.org/viewvc?rev=1741491&view=rev Log: QPID-7234: allow proper credit processing to happen even for expired messages
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=1741491&r1=1741490&r2=1741491&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Thu Apr 28 19:01:29 2016 @@ -55,15 +55,17 @@ const std::string EMPTY_STRING; struct GetNone : IncomingMessages::Handler { bool accept(IncomingMessages::MessageTransfer&) { return false; } + bool expire(IncomingMessages::MessageTransfer&) { return false; } }; struct GetAny : IncomingMessages::Handler { bool accept(IncomingMessages::MessageTransfer& transfer) - { + { transfer.retrieve(0); return true; } + bool expire(IncomingMessages::MessageTransfer&) { return false; } }; struct MatchAndTrack @@ -147,7 +149,7 @@ bool IncomingMessages::get(Handler& hand for (FrameSetQueue::iterator i = received.begin(); i != received.end();) { MessageTransfer transfer(*i, *this); - if (transfer.checkExpired()) { + if (transfer.checkExpired() && handler.expire(transfer)) { i = received.erase(i); } else if (handler.accept(transfer)) { received.erase(i); @@ -282,7 +284,7 @@ IncomingMessages::ProcessState IncomingM for (Duration timeout = duration; pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) { if (content->isA<MessageTransferBody>()) { MessageTransfer transfer(content, *this); - if (transfer.checkExpired()) { + if (transfer.checkExpired() && handler->expire(transfer)) { QPID_LOG(debug, "Expired received transfer: " << *content->getMethod()); } else if (handler && handler->accept(transfer)) { QPID_LOG(debug, "Delivered " << *content->getMethod() << " " Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h?rev=1741491&r1=1741490&r2=1741491&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h Thu Apr 28 19:01:29 2016 @@ -67,6 +67,7 @@ class IncomingMessages { virtual ~Handler() {} virtual bool accept(MessageTransfer& transfer) = 0; + virtual bool expire(MessageTransfer& transfer) = 0; virtual bool isClosed() { return false; } }; Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=1741491&r1=1741490&r2=1741491&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Thu Apr 28 19:01:29 2016 @@ -36,7 +36,7 @@ using qpid::messaging::NoMessageAvailabl using qpid::messaging::Receiver; using qpid::messaging::Duration; -void ReceiverImpl::received(qpid::messaging::Message&) +void ReceiverImpl::received() { //TODO: should this be configurable sys::Mutex::ScopedLock l(lock); Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h?rev=1741491&r1=1741490&r2=1741491&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h Thu Apr 28 19:01:29 2016 @@ -63,7 +63,7 @@ class ReceiverImpl : public qpid::messag uint32_t getCapacity(); uint32_t getAvailable(); uint32_t getUnsettled(); - void received(qpid::messaging::Message& message); + void received(); qpid::messaging::Session getSession() const; bool isClosed() const; qpid::messaging::Address getAddress() const; Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=1741491&r1=1741490&r2=1741491&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Thu Apr 28 19:01:29 2016 @@ -330,6 +330,16 @@ struct IncomingMessageHandler : Incoming return callback(transfer); } + bool expire(IncomingMessages::MessageTransfer& transfer) + { + if (receiver && receiver->getName() == transfer.getDestination()) { + receiver->received(); + return true; + } else { + return false; + } + } + bool isClosed() { return receiver && receiver->isClosed(); @@ -358,7 +368,7 @@ bool SessionImpl::accept(ReceiverImpl* r { if (receiver->getName() == transfer.getDestination()) { transfer.retrieve(message); - receiver->received(*message); + receiver->received(); return true; } else { return false; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org