Author: gsim
Date: Mon Jun 23 23:02:26 2014
New Revision: 1604952
URL: http://svn.apache.org/r1604952
Log:
QPID-5828: Check and adjust ttl on resend attempt
Modified:
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=1604952&r1=1604951&r2=1604952&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
(original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Mon Jun
23 23:02:26 2014
@@ -642,9 +642,7 @@ void ExchangeSink::declare(qpid::client:
void ExchangeSink::send(qpid::client::AsyncSession& session, const
std::string&, OutgoingMessage& m)
{
- m.message.getDeliveryProperties().setRoutingKey(m.getSubject());
- m.status = session.messageTransfer(arg::destination=name,
arg::content=m.message);
- QPID_LOG(debug, "Sending to exchange " << name << " " <<
m.message.getMessageProperties() << " " << m.message.getDeliveryProperties());
+ m.send(session, name, m.getSubject());
}
void ExchangeSink::cancel(qpid::client::AsyncSession& session, const
std::string&)
@@ -663,9 +661,7 @@ void QueueSink::declare(qpid::client::As
}
void QueueSink::send(qpid::client::AsyncSession& session, const std::string&,
OutgoingMessage& m)
{
- m.message.getDeliveryProperties().setRoutingKey(name);
- m.status = session.messageTransfer(arg::content=m.message);
- QPID_LOG(debug, "Sending to queue " << name << " " <<
m.message.getMessageProperties() << " " << m.message.getDeliveryProperties());
+ m.send(session, name);
}
void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&)
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h?rev=1604952&r1=1604951&r2=1604952&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h Mon Jun 23
23:02:26 2014
@@ -33,7 +33,7 @@ class Message;
namespace client {
namespace amqp0_10 {
-struct OutgoingMessage;
+class OutgoingMessage;
/**
*
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp?rev=1604952&r1=1604951&r2=1604952&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp Mon Jun 23
23:02:26 2014
@@ -27,6 +27,7 @@
#include "qpid/messaging/Message.h"
#include "qpid/messaging/MessageImpl.h"
#include "qpid/framing/enum.h"
+#include "qpid/log/Statement.h"
#include <sstream>
namespace qpid {
@@ -111,6 +112,7 @@ void OutgoingMessage::convert(const qpid
if (i != from.getProperties().end()) {
message.getMessageProperties().setContentEncoding(i->second.asString());
}
+ base = qpid::sys::now();
}
void OutgoingMessage::setSubject(const std::string& s)
@@ -123,4 +125,45 @@ std::string OutgoingMessage::getSubject(
return subject;
}
+void OutgoingMessage::send(qpid::client::AsyncSession& session, const
std::string& destination, const std::string& routingKey)
+{
+ if (!expired) {
+ message.getDeliveryProperties().setRoutingKey(routingKey);
+ status = session.messageTransfer(arg::destination=destination,
arg::content=message);
+ if (destination.empty()) {
+ QPID_LOG(debug, "Sending to queue " << routingKey << " " <<
message.getMessageProperties() << " " << message.getDeliveryProperties());
+ } else {
+ QPID_LOG(debug, "Sending to exchange " << destination << " " <<
message.getMessageProperties() << " " << message.getDeliveryProperties());
+ }
+ }
+}
+void OutgoingMessage::send(qpid::client::AsyncSession& session, const
std::string& routingKey)
+{
+ send(session, std::string(), routingKey);
+}
+
+bool OutgoingMessage::isComplete()
+{
+ return expired || (status.isValid() && status.isComplete());
+}
+void OutgoingMessage::markRedelivered()
+{
+ message.setRedelivered(true);
+ if (message.getDeliveryProperties().hasTtl()) {
+ uint64_t delta = qpid::sys::Duration(base,
qpid::sys::now())/qpid::sys::TIME_MSEC;
+ uint64_t ttl = message.getDeliveryProperties().getTtl();
+ if (ttl <= delta) {
+ QPID_LOG(debug, "Expiring outgoing message (" << ttl << " < " <<
delta << ")");
+ expired = true;
+ message.getDeliveryProperties().setTtl(1);
+ } else {
+ QPID_LOG(debug, "Adjusting ttl on outgoing message from " << ttl
<< " by " << delta);
+ ttl = ttl - delta;
+ message.getDeliveryProperties().setTtl(ttl);
+ }
+ }
+}
+OutgoingMessage::OutgoingMessage() : expired (false) {}
+
+
}}} // namespace qpid::client::amqp0_10
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h?rev=1604952&r1=1604951&r2=1604952&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h Mon Jun 23
23:02:26 2014
@@ -21,8 +21,10 @@
* under the License.
*
*/
+#include "qpid/client/AsyncSession.h"
#include "qpid/client/Completion.h"
#include "qpid/client/Message.h"
+#include "qpid/sys/Time.h"
namespace qpid {
namespace messaging {
@@ -31,15 +33,24 @@ class Message;
namespace client {
namespace amqp0_10 {
-struct OutgoingMessage
+class OutgoingMessage
{
+ private:
qpid::client::Message message;
qpid::client::Completion status;
std::string subject;
+ qpid::sys::AbsTime base;
+ bool expired;
+ public:
+ OutgoingMessage();
void convert(const qpid::messaging::Message&);
void setSubject(const std::string& subject);
std::string getSubject() const;
+ void send(qpid::client::AsyncSession& session, const std::string&
destination, const std::string& routingKey);
+ void send(qpid::client::AsyncSession& session,const std::string&
routingKey);
+ bool isComplete();
+ void markRedelivered();
};
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=1604952&r1=1604951&r2=1604952&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Mon Jun 23
23:02:26 2014
@@ -140,7 +140,7 @@ void SenderImpl::sendUnreliable(const qp
void SenderImpl::replay(const sys::Mutex::ScopedLock&)
{
for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end();
++i) {
- i->message.setRedelivered(true);
+ i->markRedelivered();
sink->send(session, name, *i);
}
}
@@ -158,7 +158,7 @@ uint32_t SenderImpl::checkPendingSends(b
} else {
flushed = false;
}
- while (!outgoing.empty() && outgoing.front().status.isValid() &&
outgoing.front().status.isComplete()) {
+ while (!outgoing.empty() && outgoing.front().isComplete()) {
outgoing.pop_front();
}
return outgoing.size();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]