Repository: qpid-proton Updated Branches: refs/heads/master 25d39a8e9 -> 6f189609d
PROTON-1089: C++ binding link options part 2, filters, dynamic node properties Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6f189609 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6f189609 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6f189609 Branch: refs/heads/master Commit: 6f189609d5deedfa444795230477aabdeabe5449 Parents: 25d39a8 Author: Clifford Jansen <[email protected]> Authored: Mon Jan 11 09:13:22 2016 -0800 Committer: Clifford Jansen <[email protected]> Committed: Mon Jan 11 09:16:45 2016 -0800 ---------------------------------------------------------------------- examples/cpp/CMakeLists.txt | 1 + examples/cpp/selected_recv.cpp | 57 ++++++++++++++++++++ .../cpp/include/proton/link_options.hpp | 23 +++++++- .../bindings/cpp/include/proton/terminus.hpp | 8 ++- proton-c/bindings/cpp/src/link_options.cpp | 55 ++++++++++++++++++- proton-c/bindings/cpp/src/terminus.cpp | 16 ++++++ 6 files changed, 155 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6f189609/examples/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index 4a66925..48049d4 100644 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -37,6 +37,7 @@ set(examples recurring_timer connection_options queue_browser + selected_recv ssl ssl_client_cert encode_decode) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6f189609/examples/cpp/selected_recv.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/selected_recv.cpp b/examples/cpp/selected_recv.cpp new file mode 100644 index 0000000..d591fd6 --- /dev/null +++ b/examples/cpp/selected_recv.cpp @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/container.hpp" +#include "proton/messaging_handler.hpp" +#include "proton/url.hpp" +#include "proton/link_options.hpp" + +#include <iostream> + +class selected_recv : public proton::messaging_handler { + private: + proton::url url; + + public: + + selected_recv(const proton::url& u) : url(u) {} + + void on_start(proton::event &e) { + proton::connection conn = e.container().connect(url); + conn.open_receiver(url.path(), proton::link_options().selector("colour = 'green'")); + } + + void on_message(proton::event &e) { + std::cout << e.message().body() << std::endl; + } +}; + +int main(int argc, char **argv) { + try { + std::string url = argc > 1 ? argv[1] : "127.0.0.1:5672/examples"; + selected_recv recv(url); + proton::container(recv).run(); + return 0; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + return 1; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6f189609/proton-c/bindings/cpp/include/proton/link_options.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/link_options.hpp b/proton-c/bindings/cpp/include/proton/link_options.hpp index 98790db..f3c5da6 100644 --- a/proton-c/bindings/cpp/include/proton/link_options.hpp +++ b/proton-c/bindings/cpp/include/proton/link_options.hpp @@ -43,6 +43,21 @@ enum link_delivery_mode_t { AT_LEAST_ONCE }; +/** The lifetime of dynamically created nodes. */ +enum lifetime_policy_t { + // The policy is unspecified. + UNSPECIFIED = 0, + // The lifetime of the dynamic node is scoped to lifetime of the creating link. + DELETE_ON_CLOSE = 0x2B, + // The node will be deleted when it is neither the source nor the target of any link. + DELETE_ON_NO_LINKS = 0x2C, + // The node will be deleted when the creating link no longer exists and no messages remain at the node. + DELETE_ON_NO_MESSAGES = 0x2D, + // The node will be deleted when there are no links which have this node as + // their source or target, and there remain no messages at the node. + DELETE_ON_NO_LINKS_OR_MESSAGES = 0x2E +}; + class handler; class link; @@ -87,11 +102,15 @@ class link_options { PN_CPP_EXTERN link_options& durable_subscription(bool); /* Set the delivery mode on the link. */ PN_CPP_EXTERN link_options& delivery_mode(link_delivery_mode_t); - /* Receiver-only option to request a dynamically generated node at the peer. */ + /* Request a dynamically generated node at the peer. */ PN_CPP_EXTERN link_options& dynamic_address(bool); + /* Set the lifetime policy for a receiver to a dynamically created node. */ + PN_CPP_EXTERN link_options& lifetime_policy(lifetime_policy_t); /* Set the local address for the link. */ PN_CPP_EXTERN link_options& local_address(const std::string &addr); - // TODO: selector/filter, dynamic node properties + /* Set a selector on the receiver to str. This sets a single registered filter on the link of + type apache.org:selector-filter with value str. */ + PN_CPP_EXTERN link_options& selector(const std::string &str); private: friend class link; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6f189609/proton-c/bindings/cpp/include/proton/terminus.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/terminus.hpp b/proton-c/bindings/cpp/include/proton/terminus.hpp index 3aeda23..8459909 100644 --- a/proton-c/bindings/cpp/include/proton/terminus.hpp +++ b/proton-c/bindings/cpp/include/proton/terminus.hpp @@ -25,6 +25,7 @@ #include "proton/link.h" #include "proton/object.hpp" +#include "proton/data.hpp" #include <string> namespace proton { @@ -72,6 +73,8 @@ class terminus PN_CPP_EXTERN void type(type_t); PN_CPP_EXTERN expiry_policy_t expiry_policy() const; PN_CPP_EXTERN void expiry_policy(expiry_policy_t); + PN_CPP_EXTERN uint32_t timeout() const; + PN_CPP_EXTERN void timeout(uint32_t seconds); PN_CPP_EXTERN distribution_mode_t distribution_mode() const; PN_CPP_EXTERN void distribution_mode(distribution_mode_t); PN_CPP_EXTERN durability_t durability(); @@ -80,7 +83,10 @@ class terminus PN_CPP_EXTERN void address(const std::string &); PN_CPP_EXTERN bool dynamic() const; PN_CPP_EXTERN void dynamic(bool); - // TODO: filter + related selector + /** Obtain a reference to the AMQP dynamic node properties for the terminus. See also link_options::lifetime_policy. */ + PN_CPP_EXTERN data dynamic_node_properties(); + /** Obtain a reference to the AMQP filter set for the terminus. See also link_options::selector. */ + PN_CPP_EXTERN data filter(); private: pn_terminus_t* object_; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6f189609/proton-c/bindings/cpp/src/link_options.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/link_options.cpp b/proton-c/bindings/cpp/src/link_options.cpp index 1eabdf4..297a6c7 100644 --- a/proton-c/bindings/cpp/src/link_options.cpp +++ b/proton-c/bindings/cpp/src/link_options.cpp @@ -25,6 +25,28 @@ namespace proton { +namespace { +std::string lifetime_policy_symbol(lifetime_policy_t lp) { + switch (lp) { + case DELETE_ON_CLOSE: return "amqp:delete-on-close:list"; + case DELETE_ON_NO_LINKS: return "amqp:delete-on-no-links:list"; + case DELETE_ON_NO_MESSAGES: return "amqp:delete-on-no-messages:list"; + case DELETE_ON_NO_LINKS_OR_MESSAGES: return "amqp:delete-on-no-links-or-messages:list"; + default: break; + } + return ""; +} + +std::string distribution_mode_symbol(terminus::distribution_mode_t dm) { + switch (dm) { + case terminus::COPY: return "copy"; + case terminus::MOVE: return "move"; + default: break; + } + return ""; +} +} + template <class T> struct option { T value; bool set; @@ -42,6 +64,8 @@ class link_options::impl { option<link_delivery_mode_t> delivery_mode; option<bool> dynamic_address; option<std::string> local_address; + option<lifetime_policy_t> lifetime_policy; + option<std::string> selector; void apply(link& l) { if (l.state() & endpoint::LOCAL_UNINIT) { @@ -72,6 +96,24 @@ class link_options::impl { else l.detach_handler(); } + if (dynamic_address.set) { + terminus t = sender ? l.target() : l.source(); + t.dynamic(dynamic_address.value); + if (dynamic_address.value) { + std::string lp, dm; + if (lifetime_policy.set) lp = lifetime_policy_symbol(lifetime_policy.value); + if (!sender && distribution_mode.set) dm = distribution_mode_symbol(distribution_mode.value); + if (lp.size() || dm.size()) { + encoder enc = t.dynamic_node_properties().encoder(); + enc << start::map(); + if (dm.size()) + enc << amqp_symbol("supported-dist-modes") << amqp_string(dm); + if (lp.size()) + enc << amqp_symbol("lifetime-policy") << start::described() + << amqp_symbol(lp) << start::list() << finish(); + } + } + } if (!sender) { // receiver only options if (distribution_mode.set) l.source().distribution_mode(distribution_mode.value); @@ -79,8 +121,13 @@ class link_options::impl { l.source().durability(terminus::DELIVERIES); l.source().expiry_policy(terminus::EXPIRE_NEVER); } - if (dynamic_address.set) - l.source().dynamic(dynamic_address.value); + if (selector.set && selector.value.size()) { + data d = l.source().filter(); + d.clear(); + encoder enc = d.encoder(); + enc << start::map() << amqp_symbol("selector") << start::described() + << amqp_symbol("apache.org:selector-filter:string") << amqp_binary(selector.value) << finish(); + } } } } @@ -92,6 +139,8 @@ class link_options::impl { delivery_mode.override(x.delivery_mode); dynamic_address.override(x.dynamic_address); local_address.override(x.local_address); + lifetime_policy.override(x.lifetime_policy); + selector.override(x.selector); } }; @@ -116,6 +165,8 @@ link_options& link_options::durable_subscription(bool b) {impl_->durable_subscri link_options& link_options::delivery_mode(link_delivery_mode_t m) {impl_->delivery_mode = m; return *this; } link_options& link_options::dynamic_address(bool b) {impl_->dynamic_address = b; return *this; } link_options& link_options::local_address(const std::string &addr) {impl_->local_address = addr; return *this; } +link_options& link_options::lifetime_policy(lifetime_policy_t lp) {impl_->lifetime_policy = lp; return *this; } +link_options& link_options::selector(const std::string &str) {impl_->selector = str; return *this; } void link_options::apply(link& l) const { impl_->apply(l); } handler* link_options::handler() const { return impl_->handler.value; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6f189609/proton-c/bindings/cpp/src/terminus.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/terminus.cpp b/proton-c/bindings/cpp/src/terminus.cpp index cae148a..062f376 100644 --- a/proton-c/bindings/cpp/src/terminus.cpp +++ b/proton-c/bindings/cpp/src/terminus.cpp @@ -40,6 +40,14 @@ void terminus::expiry_policy(expiry_policy_t policy) { pn_terminus_set_expiry_policy(object_, pn_expiry_policy_t(policy)); } +uint32_t terminus::timeout() const { + return pn_terminus_get_timeout(object_); +} + +void terminus::timeout(uint32_t seconds) { + pn_terminus_set_timeout(object_, seconds); +} + terminus::distribution_mode_t terminus::distribution_mode() const { return distribution_mode_t(pn_terminus_get_distribution_mode(object_)); } @@ -73,4 +81,12 @@ void terminus::dynamic(bool d) { pn_terminus_set_dynamic(object_, d); } +data terminus::filter() { + return pn_terminus_filter(object_); +} + +data terminus::dynamic_node_properties() { + return pn_terminus_properties(object_); +} + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
