Author: gsim
Date: Mon Jun 23 23:02:33 2014
New Revision: 1604953
URL: http://svn.apache.org/r1604953
Log:
QPID-5828: Drop expired incoming messages
Modified:
qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp
qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h
Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=1604953&r1=1604952&r2=1604953&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Mon Jun 23 23:02:33 2014
@@ -480,6 +480,7 @@ void SessionImpl::deliver(AMQFrame& fram
//as completion affects flow control; other commands will be
//considered completed as soon as processed here
if (arriving->isA<MessageTransferBody>()) {
+ arriving->setReceived();
Lock l(state);
incompleteIn.add(arriving->getId());
} else {
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=1604953&r1=1604952&r2=1604953&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Mon Jun
23 23:02:33 2014
@@ -132,12 +132,16 @@ bool IncomingMessages::get(Handler& hand
AbsTime deadline(AbsTime::now(), timeout);
do {
//search through received list for any transfer of interest:
- for (FrameSetQueue::iterator i = received.begin(); i !=
received.end(); i++)
+ for (FrameSetQueue::iterator i = received.begin(); i !=
received.end();)
{
MessageTransfer transfer(*i, *this);
- if (handler.accept(transfer)) {
+ if (transfer.checkExpired()) {
+ i = received.erase(i);
+ } else if (handler.accept(transfer)) {
received.erase(i);
return true;
+ } else {
+ ++i;
}
}
if (inUse) {
@@ -260,7 +264,9 @@ bool IncomingMessages::process(Handler*
for (Duration timeout = duration; pop(content, timeout); timeout =
Duration(AbsTime::now(), deadline)) {
if (content->isA<MessageTransferBody>()) {
MessageTransfer transfer(content, *this);
- if (handler && handler->accept(transfer)) {
+ if (transfer.checkExpired()) {
+ QPID_LOG(debug, "Expired received transfer: " <<
*content->getMethod());
+ } else if (handler && handler->accept(transfer)) {
QPID_LOG(debug, "Delivered " << *content->getMethod() << "
"
<< *content->getHeaders());
return true;
@@ -359,6 +365,16 @@ void IncomingMessages::MessageTransfer::
parent.retrieve(content, message);
}
+bool IncomingMessages::MessageTransfer::checkExpired()
+{
+ if (content->hasExpired()) {
+ retrieve(0);
+ parent.accept(content->getId(), false);
+ return true;
+ } else {
+ return false;
+ }
+}
namespace {
//TODO: unify conversion to and from 0-10 message that is currently
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h?rev=1604953&r1=1604952&r2=1604953&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h Mon Jun 23
23:02:33 2014
@@ -57,6 +57,7 @@ class IncomingMessages
private:
FrameSetPtr content;
IncomingMessages& parent;
+ bool checkExpired();
MessageTransfer(FrameSetPtr, IncomingMessages&);
friend class IncomingMessages;
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp?rev=1604953&r1=1604952&r2=1604953&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp Mon Jun 23
23:02:33 2014
@@ -24,6 +24,7 @@
#include "qpid/types/encodings.h"
#include "qpid/types/Variant.h"
#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Duration.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/MessageImpl.h"
#include "qpid/framing/enum.h"
Modified: qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp?rev=1604953&r1=1604952&r2=1604953&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp Mon Jun 23 23:02:33 2014
@@ -26,9 +26,11 @@
#include "qpid/framing/TypeFilter.h"
using namespace qpid::framing;
+using qpid::sys::AbsTime;
+using qpid::sys::TIME_MSEC;
-FrameSet::FrameSet(const SequenceNumber& _id) :
id(_id),contentSize(0),recalculateSize(true) { }
-FrameSet::FrameSet(const FrameSet& original) : id(original.id),
contentSize(0), recalculateSize(true)
+FrameSet::FrameSet(const SequenceNumber& _id) :
id(_id),contentSize(0),recalculateSize(true),received(AbsTime::FarFuture()) { }
+FrameSet::FrameSet(const FrameSet& original) : id(original.id),
contentSize(0), recalculateSize(true), received(AbsTime::FarFuture())
{
for (Frames::const_iterator i = original.begin(); i != original.end();
++i) {
parts.push_back(AMQFrame(*(i->getBody())));
@@ -106,3 +108,21 @@ std::string FrameSet::getContent() const
bool FrameSet::hasContent() const {
return parts.size() >= 3;
}
+
+void FrameSet::setReceived()
+{
+ received = AbsTime::now();
+}
+namespace {
+uint64_t MAX_TTL = std::numeric_limits<int64_t>::max()/TIME_MSEC;
+}
+
+bool FrameSet::hasExpired() const
+{
+ const DeliveryProperties* props =
getHeaderProperties<DeliveryProperties>();
+ if (props && props->hasTtl() && props->getTtl() < MAX_TTL) {
+ AbsTime expiration(received, props->getTtl()*TIME_MSEC);
+ return expiration < AbsTime::now();
+ }
+ return false;
+}
Modified: qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h?rev=1604953&r1=1604952&r2=1604953&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h Mon Jun 23 23:02:33 2014
@@ -26,6 +26,7 @@
#include "qpid/framing/amqp_framing.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/SequenceNumber.h"
+#include "qpid/sys/Time.h"
#include "qpid/CommonImportExport.h"
namespace qpid {
@@ -39,8 +40,9 @@ class FrameSet
typedef InlineVector<AMQFrame, 4> Frames;
const SequenceNumber id;
Frames parts;
- mutable uint64_t contentSize;
- mutable bool recalculateSize;
+ mutable uint64_t contentSize;
+ mutable bool recalculateSize;
+ qpid::sys::AbsTime received;
public:
typedef boost::shared_ptr<FrameSet> shared_ptr;
@@ -58,6 +60,9 @@ public:
QPID_COMMON_EXTERN std::string getContent() const;
QPID_COMMON_EXTERN bool hasContent() const;
+ QPID_COMMON_EXTERN void setReceived();
+ QPID_COMMON_EXTERN bool hasExpired() const;
+
bool isContentBearing() const;
QPID_COMMON_EXTERN const AMQMethodBody* getMethod() const;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]