Author: gsim
Date: Mon Apr 1 16:14:19 2013
New Revision: 1463181
URL: http://svn.apache.org/r1463181
Log:
QPID-4679: Cleaned up processing of addresses, including errors and warnings
where elements can not be supported. Merged from r1462646.
Modified:
qpid/branches/0.22/qpid/ (props changed)
qpid/branches/0.22/qpid/cpp/src/ (props changed)
qpid/branches/0.22/qpid/cpp/src/qpid/broker/ (props changed)
qpid/branches/0.22/qpid/cpp/src/qpid/broker/amqp/Session.cpp
qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
Propchange: qpid/branches/0.22/qpid/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid:r1462646
Propchange: qpid/branches/0.22/qpid/cpp/src/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src:r1462646
Propchange: qpid/branches/0.22/qpid/cpp/src/qpid/broker/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/broker:r1462646
Modified: qpid/branches/0.22/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/0.22/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1463181&r1=1463180&r2=1463181&view=diff
==============================================================================
--- qpid/branches/0.22/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/branches/0.22/qpid/cpp/src/qpid/broker/amqp/Session.cpp Mon Apr 1
16:14:19 2013
@@ -63,8 +63,44 @@ bool is_capability_requested(const std::
}
return false;
}
-
+//capabilities
const std::string CREATE_ON_DEMAND("create-on-demand");
+const std::string DURABLE("durable");
+const std::string QUEUE("queue");
+const std::string TOPIC("topic");
+const std::string DIRECT_FILTER("legacy-amqp-direct-binding");
+const std::string TOPIC_FILTER("legacy-amqp-topic-binding");
+
+void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Queue>
node)
+{
+ while (pn_data_next(in)) {
+ pn_bytes_t c = pn_data_get_symbol(in);
+ std::string s(c.start, c.size);
+ if (s == DURABLE) {
+ if (node->isDurable()) pn_data_put_symbol(out, c);
+ } else if (s == CREATE_ON_DEMAND || s == QUEUE || s == DIRECT_FILTER
|| s == TOPIC_FILTER) {
+ pn_data_put_symbol(out, c);
+ }
+ }
+}
+
+void setCapabilities(pn_data_t* in, pn_data_t* out,
boost::shared_ptr<Exchange> node)
+{
+ while (pn_data_next(in)) {
+ pn_bytes_t c = pn_data_get_symbol(in);
+ std::string s(c.start, c.size);
+ if (s == DURABLE) {
+ if (node->isDurable()) pn_data_put_symbol(out, c);
+ } else if (s == CREATE_ON_DEMAND || s == TOPIC) {
+ pn_data_put_symbol(out, c);
+ } else if (s == DIRECT_FILTER) {
+ if (node->getType() == DirectExchange::typeName)
pn_data_put_symbol(out, c);
+ } else if (s == TOPIC_FILTER) {
+ if (node->getType() == TopicExchange::typeName)
pn_data_put_symbol(out, c);
+ }
+ }
+}
+
}
class IncomingToQueue : public DecodingIncoming
@@ -178,6 +214,10 @@ void Session::attach(pn_link_t* link)
void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const
std::string& name)
{
ResolvedNode node = resolve(name, target, true);
+ //set capabilities
+ if (node.queue) setCapabilities(pn_terminus_capabilities(target),
pn_terminus_capabilities(pn_link_target(link)), node.queue);
+ else if (node.exchange) setCapabilities(pn_terminus_capabilities(target),
pn_terminus_capabilities(pn_link_target(link)), node.exchange);
+
const char* sourceAddress =
pn_terminus_get_address(pn_link_remote_source(link));
if (!sourceAddress) {
sourceAddress = pn_terminus_get_address(pn_link_source(link));
@@ -205,6 +245,9 @@ void Session::setupIncoming(pn_link_t* l
void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const
std::string& name)
{
ResolvedNode node = resolve(name, source, false);
+ if (node.queue) setCapabilities(pn_terminus_capabilities(source),
pn_terminus_capabilities(pn_link_source(link)), node.queue);
+ else if (node.exchange) setCapabilities(pn_terminus_capabilities(source),
pn_terminus_capabilities(pn_link_source(link)), node.exchange);
+
Filter filter;
filter.read(pn_terminus_filter(source));
const char* targetAddress =
pn_terminus_get_address(pn_link_remote_target(link));
Modified: qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp?rev=1463181&r1=1463180&r2=1463181&view=diff
==============================================================================
--- qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
(original)
+++ qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp Mon
Apr 1 16:14:19 2013
@@ -20,8 +20,10 @@
*/
#include "qpid/messaging/amqp/AddressHelper.h"
#include "qpid/messaging/Address.h"
+#include "qpid/messaging/AddressImpl.h"
#include "qpid/log/Statement.h"
#include <vector>
+#include <set>
#include <boost/assign.hpp>
extern "C" {
#include <proton/engine.h>
@@ -47,10 +49,13 @@ const std::string SENDER("sender");
const std::string NODE("node");
const std::string LINK("link");
+const std::string CAPABILITIES("capabilities");
+const std::string PROPERTIES("properties");
const std::string TYPE("type");
const std::string TOPIC("topic");
const std::string QUEUE("queue");
+const std::string DURABLE("durable");
//distribution modes:
const std::string MOVE("move");
@@ -61,6 +66,11 @@ const std::string CREATE_ON_DEMAND("crea
const std::string DUMMY(".");
+const std::string X_DECLARE("x-declare");
+const std::string X_BINDINGS("x-bindings");
+const std::string X_SUBSCRIBE("x-subscribe");
+const std::string ARGUMENTS("arguments");
+
const std::vector<std::string> RECEIVER_MODES =
boost::assign::list_of<std::string>(ALWAYS) (RECEIVER);
const std::vector<std::string> SENDER_MODES =
boost::assign::list_of<std::string>(ALWAYS) (SENDER);
@@ -72,6 +82,24 @@ pn_bytes_t convert(const std::string& s)
return result;
}
+bool contains(const Variant::List& list, const std::string& item)
+{
+ for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i)
{
+ if (*i == item) return true;
+ }
+ return false;
+}
+
+bool test(const Variant::Map& options, const std::string& name)
+{
+ Variant::Map::const_iterator j = options.find(name);
+ if (j == options.end()) {
+ return false;
+ } else {
+ return j->second;
+ }
+}
+
bool bind(const Variant::Map& options, const std::string& name, std::string&
variable)
{
Variant::Map::const_iterator j = options.find(name);
@@ -94,6 +122,17 @@ bool bind(const Variant::Map& options, c
}
}
+bool bind(const Variant::Map& options, const std::string& name, Variant::List&
variable)
+{
+ Variant::Map::const_iterator j = options.find(name);
+ if (j == options.end()) {
+ return false;
+ } else {
+ variable = j->second.asList();
+ return true;
+ }
+}
+
bool bind(const Address& address, const std::string& name, std::string&
variable)
{
return bind(address.getOptions(), name, variable);
@@ -111,9 +150,23 @@ bool in(const std::string& value, const
}
return false;
}
+void add(Variant::Map& target, const Variant::Map& source)
+{
+ for (Variant::Map::const_iterator i = source.begin(); i != source.end();
++i) {
+ target[i->first] = i->second;
+ }
+}
+void flatten(Variant::Map& base, const std::string& nested)
+{
+ Variant::Map::iterator i = base.find(nested);
+ if (i != base.end()) {
+ add(base, i->second.asMap());
+ }
+ base.erase(i);
+}
}
-AddressHelper::AddressHelper(const Address& address)
+AddressHelper::AddressHelper(const Address& address) :
isTemporary(AddressImpl::isTemporary(address)), name(address.getName()),
type(address.getType())
{
bind(address, CREATE, createPolicy);
bind(address, DELETE, deletePolicy);
@@ -121,20 +174,81 @@ AddressHelper::AddressHelper(const Addre
bind(address, NODE, node);
bind(address, LINK, link);
+ bind(node, PROPERTIES, properties);
+ bind(node, CAPABILITIES, capabilities);
+ durableNode = test(node, DURABLE);
+
+ if (!deletePolicy.empty()) {
+ throw qpid::messaging::AddressError("Delete policies not supported
over AMQP 1.0.");
+ }
+ if (node.find(X_BINDINGS) != node.end()) {
+ throw qpid::messaging::AddressError("Node scoped x-bindings element
not supported over AMQP 1.0.");
+ }
+ if (link.find(X_BINDINGS) != link.end()) {
+ throw qpid::messaging::AddressError("Link scoped x-bindings element
not supported over AMQP 1.0.");
+ }
+ if (link.find(X_SUBSCRIBE) != link.end()) {
+ throw qpid::messaging::AddressError("Link scoped x-subscribe element
not supported over AMQP 1.0.");
+ }
+ if (link.find(X_DECLARE) != link.end()) {
+ throw qpid::messaging::AddressError("Link scoped x-declare element not
supported over AMQP 1.0.");
+ }
+ //massage x-declare into properties
+ Variant::Map::iterator i = node.find(X_DECLARE);
+ if (i != node.end()) {
+ Variant::Map x_declare = i->second.asMap();
+ flatten(x_declare, ARGUMENTS);
+ add(properties, x_declare);
+ node.erase(i);
+ }
+
+ if (properties.size() && !(isTemporary || createPolicy.size())) {
+ QPID_LOG(warning, "Properties will be ignored! " << address);
+ }
}
-bool AddressHelper::createEnabled(CheckMode mode) const
+void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode)
{
- return enabled(createPolicy, mode);
+ if (assertEnabled(mode)) {
+ QPID_LOG(debug, "checking assertions: " << capabilities);
+ //ensure all desired capabilities have been offerred
+ std::set<std::string> desired;
+ if (type.size()) desired.insert(type);
+ if (durableNode) desired.insert(DURABLE);
+ for (Variant::List::const_iterator i = capabilities.begin(); i !=
capabilities.end(); ++i) {
+ desired.insert(i->asString());
+ }
+ pn_data_t* data = pn_terminus_capabilities(terminus);
+ while (pn_data_next(data)) {
+ pn_bytes_t c = pn_data_get_symbol(data);
+ std::string s(c.start, c.size);
+ desired.erase(s);
+ }
+
+ if (desired.size()) {
+ std::stringstream missing;
+ missing << "Desired capabilities not met: ";
+ bool first(true);
+ for (std::set<std::string>::const_iterator i = desired.begin(); i
!= desired.end(); ++i) {
+ if (first) first = false;
+ else missing << ", ";
+ missing << *i;
+ }
+ throw qpid::messaging::AssertionFailed(missing.str());
+ }
+ }
}
-bool AddressHelper::deleteEnabled(CheckMode mode) const
+
+bool AddressHelper::createEnabled(CheckMode mode) const
{
- return enabled(deletePolicy, mode);
+ return enabled(createPolicy, mode);
}
+
bool AddressHelper::assertEnabled(CheckMode mode) const
{
return enabled(assertPolicy, mode);
}
+
bool AddressHelper::enabled(const std::string& policy, CheckMode mode) const
{
bool result = false;
@@ -158,32 +272,53 @@ const qpid::types::Variant::Map& Address
return link;
}
-void AddressHelper::setNodeProperties(pn_terminus_t* terminus, bool dynamic)
+void AddressHelper::configure(pn_terminus_t* terminus, CheckMode mode)
{
- if (dynamic) {
- pn_terminus_set_address(terminus, DUMMY.c_str());//Workaround for
proton bug
+ bool createOnDemand(false);
+ if (isTemporary) {
+ //application expects a name to be generated
+ pn_terminus_set_address(terminus, DUMMY.c_str());//workaround for
PROTON-277
pn_terminus_set_dynamic(terminus, true);
+ setNodeProperties(terminus);
} else {
- pn_data_t* capabilities = pn_terminus_capabilities(terminus);
- if (!capabilities) {
- QPID_LOG(error, "!!!No capabilities!!!");
+ pn_terminus_set_address(terminus, name.c_str());
+ if (createEnabled(mode)) {
+ //application expects name of node to be as specified
+ setNodeProperties(terminus);
+ createOnDemand = true;
}
- pn_data_put_symbol(capabilities, convert(CREATE_ON_DEMAND));
}
+ setCapabilities(terminus, createOnDemand);
+}
- //properties for dynamically created node:
- if (node.size()) {
+void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create)
+{
+ pn_data_t* data = pn_terminus_capabilities(terminus);
+ if (create) pn_data_put_symbol(data, convert(CREATE_ON_DEMAND));
+ if (type.size()) pn_data_put_symbol(data, convert(type));
+ if (durableNode) pn_data_put_symbol(data, convert(DURABLE));
+ for (qpid::types::Variant::List::const_iterator i = capabilities.begin();
i != capabilities.end(); ++i) {
+ pn_data_put_symbol(data, convert(i->asString()));
+ }
+}
+
+void AddressHelper::setNodeProperties(pn_terminus_t* terminus)
+{
+ if (properties.size() || type.size()) {
pn_data_t* data = pn_terminus_properties(terminus);
pn_data_put_map(data);
pn_data_enter(data);
- for (qpid::types::Variant::Map::const_iterator i = node.begin(); i !=
node.end(); ++i) {
- if (i->first == TYPE) {
- pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES));
- pn_data_put_string(data, convert(i->second == TOPIC ? COPY :
MOVE));
- } else {
- pn_data_put_symbol(data, convert(i->first));
- pn_data_put_string(data, convert(i->second.asString()));
- }
+ if (type.size()) {
+ pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES));
+ pn_data_put_string(data, convert(type == TOPIC ? COPY : MOVE));
+ }
+ if (durableNode) {
+ pn_data_put_symbol(data, convert(DURABLE));
+ pn_data_put_bool(data, true);
+ }
+ for (qpid::types::Variant::Map::const_iterator i = properties.begin();
i != properties.end(); ++i) {
+ pn_data_put_symbol(data, convert(i->first));
+ pn_data_put_string(data, convert(i->second.asString()));
}
pn_data_exit(data);
}
Modified: qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
URL:
http://svn.apache.org/viewvc/qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h?rev=1463181&r1=1463180&r2=1463181&view=diff
==============================================================================
--- qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
(original)
+++ qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h Mon Apr
1 16:14:19 2013
@@ -36,22 +36,29 @@ class AddressHelper
enum CheckMode {FOR_RECEIVER, FOR_SENDER};
AddressHelper(const Address& address);
- bool createEnabled(CheckMode mode) const;
- bool deleteEnabled(CheckMode mode) const;
- bool assertEnabled(CheckMode mode) const;
+ void configure(pn_terminus_t* terminus, CheckMode mode);
+ void checkAssertion(pn_terminus_t* terminus, CheckMode mode);
- void setNodeProperties(pn_terminus_t*, bool dynamic);
const qpid::types::Variant::Map& getNodeProperties() const;
const qpid::types::Variant::Map& getLinkProperties() const;
private:
+ bool isTemporary;
std::string createPolicy;
std::string assertPolicy;
std::string deletePolicy;
qpid::types::Variant::Map node;
qpid::types::Variant::Map link;
+ qpid::types::Variant::Map properties;
+ qpid::types::Variant::List capabilities;
std::string name;
+ std::string type;
+ bool durableNode;
bool enabled(const std::string& policy, CheckMode mode) const;
+ bool createEnabled(CheckMode mode) const;
+ bool assertEnabled(CheckMode mode) const;
+ void setCapabilities(pn_terminus_t* terminus, bool create);
+ void setNodeProperties(pn_terminus_t* terminus);
};
}}} // namespace qpid::messaging::amqp
Modified:
qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1463181&r1=1463180&r2=1463181&view=diff
==============================================================================
--- qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
(original)
+++ qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
Mon Apr 1 16:14:19 2013
@@ -299,6 +299,7 @@ void ConnectionContext::attach(boost::sh
lnk->address.setName(pn_terminus_get_address(t));
QPID_LOG(debug, "Dynamic target name set to " <<
lnk->address.getName());
}
+ lnk->verify(t);
QPID_LOG(debug, "Attach succeeded to " << lnk->getTarget());
}
@@ -316,6 +317,7 @@ void ConnectionContext::attach(boost::sh
lnk->address.setName(pn_terminus_get_address(s));
QPID_LOG(debug, "Dynamic source name set to " <<
lnk->address.getName());
}
+ lnk->verify(s);
QPID_LOG(debug, "Attach succeeded from " << lnk->getSource());
}
Modified:
qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp?rev=1463181&r1=1463180&r2=1463181&view=diff
==============================================================================
--- qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
(original)
+++ qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp Mon
Apr 1 16:14:19 2013
@@ -19,7 +19,6 @@
*
*/
#include "qpid/messaging/amqp/ReceiverContext.h"
-#include "qpid/messaging/amqp/AddressHelper.h"
#include "qpid/messaging/AddressImpl.h"
#include "qpid/messaging/Duration.h"
#include "qpid/messaging/Message.h"
@@ -36,6 +35,7 @@ namespace amqp {
ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n,
const qpid::messaging::Address& a)
: name(n),
address(a),
+ helper(address),
receiver(pn_receiver(session, name.c_str())),
capacity(0) {}
ReceiverContext::~ReceiverContext()
@@ -108,26 +108,17 @@ uint64_t getFilterDescriptor(const std::
return hasWildcards(key) ? qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE :
qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE;
}
}
-
-void ReceiverContext::configure() const
+void ReceiverContext::verify(pn_terminus_t* source)
+{
+ helper.checkAssertion(source, AddressHelper::FOR_RECEIVER);
+}
+void ReceiverContext::configure()
{
configure(pn_link_source(receiver));
}
-void ReceiverContext::configure(pn_terminus_t* source) const
+void ReceiverContext::configure(pn_terminus_t* source)
{
- //dynamic create:
- AddressHelper helper(address);
- if (AddressImpl::isTemporary(address)) {
- //application expects a name to be generated
- QPID_LOG(debug, "source is dynamic");
- helper.setNodeProperties(source, true);
- } else {
- pn_terminus_set_address(source, address.getName().c_str());
- if (helper.createEnabled(AddressHelper::FOR_RECEIVER)) {
- //application expects name of node to be as specified
- helper.setNodeProperties(source, false);
- }
- }
+ helper.configure(source, AddressHelper::FOR_RECEIVER);
// Look specifically for qpid.selector link property and add a filter for
it
qpid::types::Variant::Map::const_iterator i =
helper.getLinkProperties().find("selector");
Modified: qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
URL:
http://svn.apache.org/viewvc/qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h?rev=1463181&r1=1463180&r2=1463181&view=diff
==============================================================================
--- qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
(original)
+++ qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h Mon
Apr 1 16:14:19 2013
@@ -22,6 +22,7 @@
*
*/
#include "qpid/messaging/Address.h"
+#include "qpid/messaging/amqp/AddressHelper.h"
#include <string>
#include "qpid/sys/IntegerTypes.h"
@@ -54,15 +55,17 @@ class ReceiverContext
const std::string& getName() const;
const std::string& getSource() const;
bool isClosed() const;
- void configure() const;
+ void configure();
+ void verify(pn_terminus_t*);
Address getAddress() const;
private:
friend class ConnectionContext;
const std::string name;
Address address;
+ AddressHelper helper;
pn_link_t* receiver;
uint32_t capacity;
- void configure(pn_terminus_t*) const;
+ void configure(pn_terminus_t*);
};
}}} // namespace qpid::messaging::amqp
Modified: qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1463181&r1=1463180&r2=1463181&view=diff
==============================================================================
--- qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
(original)
+++ qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Mon
Apr 1 16:14:19 2013
@@ -20,7 +20,6 @@
*/
#include "qpid/messaging/amqp/SenderContext.h"
#include "qpid/messaging/amqp/EncodedMessage.h"
-#include "qpid/messaging/amqp/AddressHelper.h"
#include "qpid/messaging/AddressImpl.h"
#include "qpid/amqp/descriptors.h"
#include "qpid/amqp/MessageEncoder.h"
@@ -41,6 +40,7 @@ namespace amqp {
SenderContext::SenderContext(pn_session_t* session, const std::string& n,
const qpid::messaging::Address& a)
: name(n),
address(a),
+ helper(address),
sender(pn_sender(session, n.c_str())), capacity(1000) {}
SenderContext::~SenderContext()
@@ -342,23 +342,17 @@ void SenderContext::Delivery::settle()
{
pn_delivery_settle(token);
}
-void SenderContext::configure() const
+void SenderContext::verify(pn_terminus_t* target)
+{
+ helper.checkAssertion(target, AddressHelper::FOR_SENDER);
+}
+void SenderContext::configure()
{
configure(pn_link_target(sender));
}
-void SenderContext::configure(pn_terminus_t* target) const
+void SenderContext::configure(pn_terminus_t* target)
{
- AddressHelper helper(address);
- if (AddressImpl::isTemporary(address)) {
- //application expects a name to be generated
- helper.setNodeProperties(target, true);
- } else {
- pn_terminus_set_address(target, address.getName().c_str());
- if (helper.createEnabled(AddressHelper::FOR_SENDER)) {
- //application expects name of node to be as specified
- helper.setNodeProperties(target, false);
- }
- }
+ helper.configure(target, AddressHelper::FOR_SENDER);
}
bool SenderContext::settled()
Modified: qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
URL:
http://svn.apache.org/viewvc/qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h?rev=1463181&r1=1463180&r2=1463181&view=diff
==============================================================================
--- qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
(original)
+++ qpid/branches/0.22/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h Mon Apr
1 16:14:19 2013
@@ -26,6 +26,7 @@
#include <vector>
#include "qpid/sys/IntegerTypes.h"
#include "qpid/messaging/Address.h"
+#include "qpid/messaging/amqp/AddressHelper.h"
#include "qpid/messaging/amqp/EncodedMessage.h"
struct pn_delivery_t;
@@ -69,7 +70,8 @@ class SenderContext
const std::string& getName() const;
const std::string& getTarget() const;
Delivery* send(const qpid::messaging::Message& message);
- void configure() const;
+ void configure();
+ void verify(pn_terminus_t*);
bool settled();
Address getAddress() const;
private:
@@ -78,13 +80,14 @@ class SenderContext
const std::string name;
qpid::messaging::Address address;
+ AddressHelper helper;
pn_link_t* sender;
int32_t nextId;
Deliveries deliveries;
uint32_t capacity;
uint32_t processUnsettled();
- void configure(pn_terminus_t*) const;
+ void configure(pn_terminus_t*);
};
}}} // namespace qpid::messaging::amqp
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]