Author: gsim
Date: Mon Jun 23 23:02:33 2014
New Revision: 1604953

URL: http://svn.apache.org/r1604953
Log:
QPID-5828: Drop expired incoming messages

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
    qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp
    qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=1604953&r1=1604952&r2=1604953&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Mon Jun 23 23:02:33 2014
@@ -480,6 +480,7 @@ void SessionImpl::deliver(AMQFrame& fram
         //as completion affects flow control; other commands will be
         //considered completed as soon as processed here
         if (arriving->isA<MessageTransferBody>()) {
+            arriving->setReceived();
             Lock l(state);
             incompleteIn.add(arriving->getId());
         } else {

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=1604953&r1=1604952&r2=1604953&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Mon Jun 
23 23:02:33 2014
@@ -132,12 +132,16 @@ bool IncomingMessages::get(Handler& hand
     AbsTime deadline(AbsTime::now(), timeout);
     do {
         //search through received list for any transfer of interest:
-        for (FrameSetQueue::iterator i = received.begin(); i != 
received.end(); i++)
+        for (FrameSetQueue::iterator i = received.begin(); i != 
received.end();)
         {
             MessageTransfer transfer(*i, *this);
-            if (handler.accept(transfer)) {
+            if (transfer.checkExpired()) {
+                i = received.erase(i);
+            } else if (handler.accept(transfer)) {
                 received.erase(i);
                 return true;
+            } else {
+                ++i;
             }
         }
         if (inUse) {
@@ -260,7 +264,9 @@ bool IncomingMessages::process(Handler* 
         for (Duration timeout = duration; pop(content, timeout); timeout = 
Duration(AbsTime::now(), deadline)) {
             if (content->isA<MessageTransferBody>()) {
                 MessageTransfer transfer(content, *this);
-                if (handler && handler->accept(transfer)) {
+                if (transfer.checkExpired()) {
+                    QPID_LOG(debug, "Expired received transfer: " << 
*content->getMethod());
+                } else if (handler && handler->accept(transfer)) {
                     QPID_LOG(debug, "Delivered " << *content->getMethod() << " 
"
                              << *content->getHeaders());
                     return true;
@@ -359,6 +365,16 @@ void IncomingMessages::MessageTransfer::
     parent.retrieve(content, message);
 }
 
+bool IncomingMessages::MessageTransfer::checkExpired()
+{
+    if (content->hasExpired()) {
+        retrieve(0);
+        parent.accept(content->getId(), false);
+        return true;
+    } else {
+        return false;
+    }
+}
 
 namespace {
 //TODO: unify conversion to and from 0-10 message that is currently

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h?rev=1604953&r1=1604952&r2=1604953&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h Mon Jun 23 
23:02:33 2014
@@ -57,6 +57,7 @@ class IncomingMessages
       private:
         FrameSetPtr content;
         IncomingMessages& parent;
+        bool checkExpired();
 
         MessageTransfer(FrameSetPtr, IncomingMessages&);
       friend class IncomingMessages;

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp?rev=1604953&r1=1604952&r2=1604953&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp Mon Jun 23 
23:02:33 2014
@@ -24,6 +24,7 @@
 #include "qpid/types/encodings.h"
 #include "qpid/types/Variant.h"
 #include "qpid/messaging/Address.h"
+#include "qpid/messaging/Duration.h"
 #include "qpid/messaging/Message.h"
 #include "qpid/messaging/MessageImpl.h"
 #include "qpid/framing/enum.h"

Modified: qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp?rev=1604953&r1=1604952&r2=1604953&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp Mon Jun 23 23:02:33 2014
@@ -26,9 +26,11 @@
 #include "qpid/framing/TypeFilter.h"
 
 using namespace qpid::framing;
+using qpid::sys::AbsTime;
+using qpid::sys::TIME_MSEC;
 
-FrameSet::FrameSet(const SequenceNumber& _id) : 
id(_id),contentSize(0),recalculateSize(true) { }
-FrameSet::FrameSet(const FrameSet& original) : id(original.id), 
contentSize(0), recalculateSize(true)
+FrameSet::FrameSet(const SequenceNumber& _id) : 
id(_id),contentSize(0),recalculateSize(true),received(AbsTime::FarFuture()) { }
+FrameSet::FrameSet(const FrameSet& original) : id(original.id), 
contentSize(0), recalculateSize(true), received(AbsTime::FarFuture())
 {
     for (Frames::const_iterator i = original.begin(); i != original.end(); 
++i) {
         parts.push_back(AMQFrame(*(i->getBody())));
@@ -106,3 +108,21 @@ std::string FrameSet::getContent() const
 bool FrameSet::hasContent() const {
     return parts.size() >= 3;
 }
+
+void FrameSet::setReceived()
+{
+    received = AbsTime::now();
+}
+namespace {
+uint64_t MAX_TTL = std::numeric_limits<int64_t>::max()/TIME_MSEC;
+}
+
+bool FrameSet::hasExpired() const
+{
+    const DeliveryProperties* props = 
getHeaderProperties<DeliveryProperties>();
+    if (props && props->hasTtl() && props->getTtl() < MAX_TTL) {
+        AbsTime expiration(received, props->getTtl()*TIME_MSEC);
+        return expiration < AbsTime::now();
+    }
+    return false;
+}

Modified: qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h?rev=1604953&r1=1604952&r2=1604953&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h Mon Jun 23 23:02:33 2014
@@ -26,6 +26,7 @@
 #include "qpid/framing/amqp_framing.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/SequenceNumber.h"
+#include "qpid/sys/Time.h"
 #include "qpid/CommonImportExport.h"
 
 namespace qpid {
@@ -39,8 +40,9 @@ class FrameSet
     typedef InlineVector<AMQFrame, 4> Frames;
     const SequenceNumber id;
     Frames parts;
-       mutable uint64_t contentSize;
-       mutable bool recalculateSize;
+    mutable uint64_t contentSize;
+    mutable bool recalculateSize;
+    qpid::sys::AbsTime received;
 
 public:
     typedef boost::shared_ptr<FrameSet> shared_ptr;
@@ -58,6 +60,9 @@ public:
     QPID_COMMON_EXTERN std::string getContent() const;
     QPID_COMMON_EXTERN bool hasContent() const;
 
+    QPID_COMMON_EXTERN void setReceived();
+    QPID_COMMON_EXTERN bool hasExpired() const;
+
     bool isContentBearing() const;
 
     QPID_COMMON_EXTERN const AMQMethodBody* getMethod() const;



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to