Author: gsim
Date: Mon Jun 23 23:02:26 2014
New Revision: 1604952

URL: http://svn.apache.org/r1604952
Log:
QPID-5828: Check and adjust ttl on resend attempt

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=1604952&r1=1604951&r2=1604952&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp 
(original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Mon Jun 
23 23:02:26 2014
@@ -642,9 +642,7 @@ void ExchangeSink::declare(qpid::client:
 
 void ExchangeSink::send(qpid::client::AsyncSession& session, const 
std::string&, OutgoingMessage& m)
 {
-    m.message.getDeliveryProperties().setRoutingKey(m.getSubject());
-    m.status = session.messageTransfer(arg::destination=name, 
arg::content=m.message);
-    QPID_LOG(debug, "Sending to exchange " << name << " " << 
m.message.getMessageProperties() << " " << m.message.getDeliveryProperties());
+    m.send(session, name, m.getSubject());
 }
 
 void ExchangeSink::cancel(qpid::client::AsyncSession& session, const 
std::string&)
@@ -663,9 +661,7 @@ void QueueSink::declare(qpid::client::As
 }
 void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, 
OutgoingMessage& m)
 {
-    m.message.getDeliveryProperties().setRoutingKey(name);
-    m.status = session.messageTransfer(arg::content=m.message);
-    QPID_LOG(debug, "Sending to queue " << name << " " << 
m.message.getMessageProperties() << " " << m.message.getDeliveryProperties());
+    m.send(session, name);
 }
 
 void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&)

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h?rev=1604952&r1=1604951&r2=1604952&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h Mon Jun 23 
23:02:26 2014
@@ -33,7 +33,7 @@ class Message;
 namespace client {
 namespace amqp0_10 {
 
-struct OutgoingMessage;
+class OutgoingMessage;
 
 /**
  *

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp?rev=1604952&r1=1604951&r2=1604952&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp Mon Jun 23 
23:02:26 2014
@@ -27,6 +27,7 @@
 #include "qpid/messaging/Message.h"
 #include "qpid/messaging/MessageImpl.h"
 #include "qpid/framing/enum.h"
+#include "qpid/log/Statement.h"
 #include <sstream>
 
 namespace qpid {
@@ -111,6 +112,7 @@ void OutgoingMessage::convert(const qpid
     if (i != from.getProperties().end()) {
         
message.getMessageProperties().setContentEncoding(i->second.asString());
     }
+    base = qpid::sys::now();
 }
 
 void OutgoingMessage::setSubject(const std::string& s)
@@ -123,4 +125,45 @@ std::string OutgoingMessage::getSubject(
     return subject;
 }
 
+void OutgoingMessage::send(qpid::client::AsyncSession& session, const 
std::string& destination, const std::string& routingKey)
+{
+    if (!expired) {
+        message.getDeliveryProperties().setRoutingKey(routingKey);
+        status = session.messageTransfer(arg::destination=destination, 
arg::content=message);
+        if (destination.empty()) {
+            QPID_LOG(debug, "Sending to queue " << routingKey << " " << 
message.getMessageProperties() << " " << message.getDeliveryProperties());
+        } else {
+            QPID_LOG(debug, "Sending to exchange " << destination << " " << 
message.getMessageProperties() << " " << message.getDeliveryProperties());
+        }
+    }
+}
+void OutgoingMessage::send(qpid::client::AsyncSession& session, const 
std::string& routingKey)
+{
+    send(session, std::string(), routingKey);
+}
+
+bool OutgoingMessage::isComplete()
+{
+    return expired || (status.isValid() && status.isComplete());
+}
+void OutgoingMessage::markRedelivered()
+{
+    message.setRedelivered(true);
+    if (message.getDeliveryProperties().hasTtl()) {
+        uint64_t delta = qpid::sys::Duration(base, 
qpid::sys::now())/qpid::sys::TIME_MSEC;
+        uint64_t ttl = message.getDeliveryProperties().getTtl();
+        if (ttl <= delta) {
+            QPID_LOG(debug, "Expiring outgoing message (" << ttl << " < " << 
delta << ")");
+            expired = true;
+            message.getDeliveryProperties().setTtl(1);
+        } else {
+            QPID_LOG(debug, "Adjusting ttl on outgoing message from " << ttl 
<< " by " << delta);
+            ttl = ttl - delta;
+            message.getDeliveryProperties().setTtl(ttl);
+        }
+    }
+}
+OutgoingMessage::OutgoingMessage() : expired (false) {}
+
+
 }}} // namespace qpid::client::amqp0_10

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h?rev=1604952&r1=1604951&r2=1604952&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h Mon Jun 23 
23:02:26 2014
@@ -21,8 +21,10 @@
  * under the License.
  *
  */
+#include "qpid/client/AsyncSession.h"
 #include "qpid/client/Completion.h"
 #include "qpid/client/Message.h"
+#include "qpid/sys/Time.h"
 
 namespace qpid {
 namespace messaging {
@@ -31,15 +33,24 @@ class Message;
 namespace client {
 namespace amqp0_10 {
 
-struct OutgoingMessage
+class OutgoingMessage
 {
+  private:
     qpid::client::Message message;
     qpid::client::Completion status;
     std::string subject;
+    qpid::sys::AbsTime base;
+    bool expired;
 
+  public:
+    OutgoingMessage();
     void convert(const qpid::messaging::Message&);
     void setSubject(const std::string& subject);
     std::string getSubject() const;
+    void send(qpid::client::AsyncSession& session, const std::string& 
destination, const std::string& routingKey);
+    void send(qpid::client::AsyncSession& session,const std::string& 
routingKey);
+    bool isComplete();
+    void markRedelivered();
 };
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=1604952&r1=1604951&r2=1604952&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Mon Jun 23 
23:02:26 2014
@@ -140,7 +140,7 @@ void SenderImpl::sendUnreliable(const qp
 void SenderImpl::replay(const sys::Mutex::ScopedLock&)
 {
     for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); 
++i) {
-        i->message.setRedelivered(true);
+        i->markRedelivered();
         sink->send(session, name, *i);
     }
 }
@@ -158,7 +158,7 @@ uint32_t SenderImpl::checkPendingSends(b
     } else {
         flushed = false;
     }
-    while (!outgoing.empty() && outgoing.front().status.isValid() && 
outgoing.front().status.isComplete()) {
+    while (!outgoing.empty() && outgoing.front().isComplete()) {
         outgoing.pop_front();
     }
     return outgoing.size();



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to