Author: gsim
Date: Wed Aug 28 12:41:31 2013
New Revision: 1518182
URL: http://svn.apache.org/r1518182
Log:
QPID-4978: add support for reliability option
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1518182&r1=1518181&r2=1518182&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Wed Aug 28 12:41:31
2013
@@ -52,7 +52,8 @@ OutgoingFromQueue::OutgoingFromQueue(Bro
isControllingUser(p),
queue(q), deliveries(5000), link(l), out(o),
current(0), outstanding(0),
- buffer(1024)/*used only for header at present*/
+ buffer(1024)/*used only for header at present*/,
+ unreliable(pn_link_remote_snd_settle_mode(link) == PN_SND_SETTLED)
{
for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
deliveries[i].init(i);
@@ -105,6 +106,7 @@ void OutgoingFromQueue::handle(pn_delive
write(&buffer[0], encoder.getPosition());
Translation t(r.msg);
t.write(*this);
+ if (unreliable) pn_delivery_settle(delivery);
if (pn_link_advance(link)) {
--outstanding;
outgoingMessageSent();
@@ -113,7 +115,10 @@ void OutgoingFromQueue::handle(pn_delive
QPID_LOG(error, "Failed to send message " << r.msg.getSequence()
<< " from " << queue->getName() << ", index=" << r.index);
}
}
- if (pn_delivery_updated(delivery)) {
+ if (unreliable) {
+ if (preAcquires()) queue->dequeue(0, r.cursor);
+ r.reset();
+ } else if (pn_delivery_updated(delivery)) {
assert(r.delivery == delivery);
r.disposition = pn_delivery_remote_state(delivery);
if (r.disposition) {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h?rev=1518182&r1=1518181&r2=1518182&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h Wed Aug 28 12:41:31 2013
@@ -135,6 +135,7 @@ class OutgoingFromQueue : public Outgoin
std::vector<char> buffer;
std::string subjectFilter;
boost::scoped_ptr<Selector> selector;
+ bool unreliable;
};
}}} // namespace qpid::broker::amqp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1518182&r1=1518181&r2=1518182&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Wed Aug 28 12:41:31
2013
@@ -317,7 +317,6 @@ void Session::setupOutgoing(pn_link_t* l
target = targetAddress;
}
-
if (node.queue) {
authorise.outgoing(node.queue);
SubscriptionType type = pn_terminus_get_distribution_mode(source) ==
PN_DIST_MODE_COPY ? BROWSER : CONSUMER;
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp?rev=1518182&r1=1518181&r2=1518182&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp Wed Aug 28
12:41:31 2013
@@ -73,6 +73,12 @@ const std::string SUBJECT_FILTER("subjec
const std::string SOURCE("sender-source");
const std::string TARGET("receiver-target");
+//reliability options:
+const std::string UNRELIABLE("unreliable");
+const std::string AT_MOST_ONCE("at-most-once");
+const std::string AT_LEAST_ONCE("at-least-once");
+const std::string EXACTLY_ONCE("exactly-once");
+
//distribution modes:
const std::string MOVE("move");
const std::string COPY("copy");
@@ -293,6 +299,7 @@ AddressHelper::AddressHelper(const Addre
bind(address, LINK, link);
bind(node, PROPERTIES, properties);
bind(node, CAPABILITIES, capabilities);
+ bind(link, RELIABILITY, reliability);
durableNode = test(node, DURABLE);
durableLink = test(link, DURABLE);
timeout = get(link, TIMEOUT, durableLink ? DEFAULT_DURABLE_TIMEOUT :
DEFAULT_TIMEOUT);
@@ -506,6 +513,11 @@ bool AddressHelper::enabled(const std::s
return result;
}
+bool AddressHelper::isUnreliable() const
+{
+ return reliability == AT_MOST_ONCE || reliability == UNRELIABLE;
+}
+
const qpid::types::Variant::Map& AddressHelper::getNodeProperties() const
{
return node;
@@ -536,7 +548,7 @@ bool AddressHelper::getLinkOption(const
}
}
-void AddressHelper::configure(pn_terminus_t* terminus, CheckMode mode)
+void AddressHelper::configure(pn_link_t* link, pn_terminus_t* terminus,
CheckMode mode)
{
bool createOnDemand(false);
if (isTemporary) {
@@ -581,7 +593,9 @@ void AddressHelper::configure(pn_terminu
pn_data_exit(filter);
}
}
-
+ if (isUnreliable()) {
+ pn_link_set_snd_settle_mode(link, PN_SND_SETTLED);
+ }
}
void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create)
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h?rev=1518182&r1=1518181&r2=1518182&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h Wed Aug 28
12:41:31 2013
@@ -24,6 +24,7 @@
#include "qpid/types/Variant.h"
#include <vector>
+struct pn_link_t;
struct pn_terminus_t;
namespace qpid {
@@ -36,9 +37,10 @@ class AddressHelper
enum CheckMode {FOR_RECEIVER, FOR_SENDER};
AddressHelper(const Address& address);
- void configure(pn_terminus_t* terminus, CheckMode mode);
+ void configure(pn_link_t* link, pn_terminus_t* terminus, CheckMode mode);
void checkAssertion(pn_terminus_t* terminus, CheckMode mode);
+ bool isUnreliable() const;
const qpid::types::Variant::Map& getNodeProperties() const;
bool getLinkSource(std::string& out) const;
bool getLinkTarget(std::string& out) const;
@@ -68,6 +70,7 @@ class AddressHelper
qpid::types::Variant::List capabilities;
std::string name;
std::string type;
+ std::string reliability;
bool durableNode;
bool durableLink;
uint32_t timeout;
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1518182&r1=1518181&r2=1518182&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Wed Aug
28 12:41:31 2013
@@ -377,12 +377,12 @@ void ConnectionContext::send(boost::shar
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
checkClosed(ssn);
SenderContext::Delivery* delivery(0);
- while (!(delivery = snd->send(message))) {
+ while (!snd->send(message, &delivery)) {
QPID_LOG(debug, "Waiting for capacity...");
wait(ssn, snd);//wait for capacity
}
wakeupDriver();
- if (sync) {
+ if (sync && delivery) {
while (!delivery->accepted()) {
QPID_LOG(debug, "Waiting for confirmation...");
wait(ssn, snd);//wait until message has been confirmed
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp?rev=1518182&r1=1518181&r2=1518182&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp Wed Aug 28
12:41:31 2013
@@ -99,7 +99,7 @@ void ReceiverContext::configure()
}
void ReceiverContext::configure(pn_terminus_t* source)
{
- helper.configure(source, AddressHelper::FOR_RECEIVER);
+ helper.configure(receiver, source, AddressHelper::FOR_RECEIVER);
std::string option;
if (helper.getLinkTarget(option)) {
pn_terminus_set_address(pn_link_target(receiver), option.c_str());
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1518182&r1=1518181&r2=1518182&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Wed Aug 28
12:41:31 2013
@@ -42,7 +42,7 @@ SenderContext::SenderContext(pn_session_
: name(n),
address(a),
helper(address),
- sender(pn_sender(session, n.c_str())), capacity(1000) {}
+ sender(pn_sender(session, n.c_str())), capacity(1000),
unreliable(helper.isUnreliable()) {}
SenderContext::~SenderContext()
{
@@ -80,16 +80,25 @@ const std::string& SenderContext::getTar
return address.getName();
}
-SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message&
message)
+bool SenderContext::send(const qpid::messaging::Message& message,
SenderContext::Delivery** out)
{
if (processUnsettled(false) < capacity && pn_link_credit(sender)) {
- deliveries.push_back(Delivery(nextId++));
- Delivery& delivery = deliveries.back();
- delivery.encode(MessageImplAccess::get(message), address);
- delivery.send(sender);
- return &delivery;
+ if (unreliable) {
+ Delivery delivery(nextId++);
+ delivery.encode(MessageImplAccess::get(message), address);
+ delivery.send(sender, unreliable);
+ *out = 0;
+ return true;
+ } else {
+ deliveries.push_back(Delivery(nextId++));
+ Delivery& delivery = deliveries.back();
+ delivery.encode(MessageImplAccess::get(message), address);
+ delivery.send(sender, unreliable);
+ *out = &delivery;
+ return true;
+ }
} else {
- return 0;
+ return false;
}
}
@@ -474,13 +483,14 @@ void SenderContext::Delivery::encode(con
//write footer (no annotations yet supported)
}
}
-void SenderContext::Delivery::send(pn_link_t* sender)
+void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable)
{
pn_delivery_tag_t tag;
tag.size = sizeof(id);
tag.bytes = reinterpret_cast<const char*>(&id);
token = pn_delivery(sender, tag);
pn_link_send(sender, encoded.getData(), encoded.getSize());
+ if (unreliable) pn_delivery_settle(token);
pn_link_advance(sender);
}
@@ -520,7 +530,7 @@ void SenderContext::configure()
}
void SenderContext::configure(pn_terminus_t* target)
{
- helper.configure(target, AddressHelper::FOR_SENDER);
+ helper.configure(sender, target, AddressHelper::FOR_SENDER);
std::string option;
if (helper.getLinkSource(option)) {
pn_terminus_set_address(pn_link_source(sender), option.c_str());
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h?rev=1518182&r1=1518181&r2=1518182&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h Wed Aug 28
12:41:31 2013
@@ -52,7 +52,7 @@ class SenderContext
public:
Delivery(int32_t id);
void encode(const qpid::messaging::MessageImpl& message, const
qpid::messaging::Address&);
- void send(pn_link_t*);
+ void send(pn_link_t*, bool unreliable);
bool delivered();
bool accepted();
bool rejected();
@@ -71,7 +71,7 @@ class SenderContext
uint32_t getUnsettled();
const std::string& getName() const;
const std::string& getTarget() const;
- Delivery* send(const qpid::messaging::Message& message);
+ bool send(const qpid::messaging::Message& message, Delivery**);
void configure();
void verify(pn_terminus_t*);
void check();
@@ -88,6 +88,7 @@ class SenderContext
int32_t nextId;
Deliveries deliveries;
uint32_t capacity;
+ bool unreliable;
uint32_t processUnsettled(bool silent);
void configure(pn_terminus_t*);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]