Repository: qpid-proton Updated Branches: refs/heads/master a39d6ac5e -> 6e6cb0499
PROTON-1183: source::filters moved from receiver Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6e6cb049 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6e6cb049 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6e6cb049 Branch: refs/heads/master Commit: 6e6cb049901ea9f319adaddb7e91f77f740f6f78 Parents: a39d6ac Author: Clifford Jansen <[email protected]> Authored: Thu Apr 28 19:34:44 2016 -0700 Committer: Clifford Jansen <[email protected]> Committed: Thu Apr 28 19:34:44 2016 -0700 ---------------------------------------------------------------------- examples/cpp/selected_recv.cpp | 29 ++++++++++++++++++-- .../cpp/include/proton/receiver_options.hpp | 9 ------ proton-c/bindings/cpp/include/proton/source.hpp | 9 ++++++ .../cpp/include/proton/source_options.hpp | 8 ++++-- .../bindings/cpp/include/proton/terminus.hpp | 21 +++----------- proton-c/bindings/cpp/src/node_options.cpp | 8 ++++++ proton-c/bindings/cpp/src/receiver_options.cpp | 1 - proton-c/bindings/cpp/src/source.cpp | 10 +++++++ proton-c/bindings/cpp/src/terminus.cpp | 15 ++++------ 9 files changed, 70 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e6cb049/examples/cpp/selected_recv.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/selected_recv.cpp b/examples/cpp/selected_recv.cpp index f02760f..c450cba 100644 --- a/examples/cpp/selected_recv.cpp +++ b/examples/cpp/selected_recv.cpp @@ -24,11 +24,35 @@ #include "proton/handler.hpp" #include "proton/url.hpp" #include "proton/receiver_options.hpp" +#include "proton/source_options.hpp" #include <iostream> #include "fake_cpp11.hpp" +namespace { + + // Example custom function to configure an AMQP filter, + // specifically an APACHE.ORG:SELECTOR + // (http://www.amqp.org/specification/1.0/filters) + + void set_filter(proton::source_options &opts, const std::string& selector_str) { + proton::source::filter_map map; + proton::symbol filter_key("selector"); + proton::value filter_value; + // The value is a specific AMQP "described type": binary string with symbolic descriptor + proton::codec::encoder enc(filter_value); + enc << proton::codec::start::described() + << proton::symbol("apache.org:selector-filter:string") + << proton::binary(selector_str) + << proton::codec::finish(); + // In our case, the map has this one element + map[filter_key] = filter_value; + opts.filters(map); + } +} + + class selected_recv : public proton::handler { private: proton::url url; @@ -37,9 +61,10 @@ class selected_recv : public proton::handler { selected_recv(const proton::url& u) : url(u) {} void on_container_start(proton::container &c) override { + proton::source_options custom_selector; + set_filter(custom_selector, "colour = 'green'"); proton::connection conn = c.connect(url); - // Note: the following signature is changing in Proton 0.13 - conn.open_receiver(url.path(), proton::receiver_options().selector("colour = 'green'")); + conn.open_receiver(url.path(), proton::receiver_options().source(custom_selector)); } void on_message(proton::delivery &d, proton::message &m) override { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e6cb049/proton-c/bindings/cpp/include/proton/receiver_options.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/receiver_options.hpp b/proton-c/bindings/cpp/include/proton/receiver_options.hpp index bb9ac3f..babc602 100644 --- a/proton-c/bindings/cpp/include/proton/receiver_options.hpp +++ b/proton-c/bindings/cpp/include/proton/receiver_options.hpp @@ -91,15 +91,6 @@ class receiver_options { /// Set the delivery mode on the receiver. PN_CPP_EXTERN receiver_options& delivery_mode(delivery_mode); - /// @cond INTERNAL - /// XXX need to discuss spec issues, jms versus amqp filters - /// - /// Set a selector on the receiver to str. This sets a single - /// registered filter on the link of type - /// apache.org:selector-filter with value str. - PN_CPP_EXTERN receiver_options& selector(const std::string&); - /// @endcond - /// Automatically accept inbound messages that aren't otherwise /// released, rejected or modified (default value:true). PN_CPP_EXTERN receiver_options& auto_accept(bool); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e6cb049/proton-c/bindings/cpp/include/proton/source.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/source.hpp b/proton-c/bindings/cpp/include/proton/source.hpp index 218610d..ef6f1a3 100644 --- a/proton-c/bindings/cpp/include/proton/source.hpp +++ b/proton-c/bindings/cpp/include/proton/source.hpp @@ -26,6 +26,7 @@ #include "proton/object.hpp" #include "proton/value.hpp" #include "proton/terminus.hpp" +#include <proton/map.hpp> #include <string> @@ -40,8 +41,16 @@ class receiver; /// @see proton::sender proton::receiver proton::target class source : public terminus { public: + /// A map of AMQP symbol keys and filter specifiers. + typedef std::map<symbol, value> filter_map; + source() : terminus() {} + + /// The address of the source. PN_CPP_EXTERN std::string address() const; + + /// Obtain the set of message filters. + PN_CPP_EXTERN filter_map filters() const; /// @cond INTERNAL private: source(pn_terminus_t* t); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e6cb049/proton-c/bindings/cpp/include/proton/source_options.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/source_options.hpp b/proton-c/bindings/cpp/include/proton/source_options.hpp index 0dc945b..b1676b7 100644 --- a/proton-c/bindings/cpp/include/proton/source_options.hpp +++ b/proton-c/bindings/cpp/include/proton/source_options.hpp @@ -27,7 +27,7 @@ #include "proton/pn_unique_ptr.hpp" #include "proton/types.hpp" #include "proton/delivery_mode.hpp" -#include "proton/terminus.hpp" +#include "proton/source.hpp" #include <vector> #include <string> @@ -35,7 +35,7 @@ namespace proton { class proton_handler; -class source; + /// Options for creating a source node for a sender or receiver. /// @@ -76,6 +76,10 @@ class source_options { /// Control when the clock for expiration begins. PN_CPP_EXTERN source_options& expiry_policy(enum expiry_policy); + /// Specify a filter mechanism on the source that restricts + /// message flow to a subset of the available messages. + PN_CPP_EXTERN source_options& filters(const source::filter_map&); + /// @cond INTERNAL private: void apply(source&) const; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e6cb049/proton-c/bindings/cpp/include/proton/terminus.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/terminus.hpp b/proton-c/bindings/cpp/include/proton/terminus.hpp index d73717b..7ee60f4 100644 --- a/proton-c/bindings/cpp/include/proton/terminus.hpp +++ b/proton-c/bindings/cpp/include/proton/terminus.hpp @@ -32,10 +32,6 @@ namespace proton { -namespace internal { -class noderef; -} - /// One end of a link, either a source or a target. /// /// The source terminus is where messages originate; the target @@ -63,32 +59,23 @@ class terminus { /// Get the durability flag. PN_CPP_EXTERN enum durability_mode durability_mode(); - /// Get the source or target node's address. - PN_CPP_EXTERN std::string address() const; - /// True if the remote node is created dynamically. PN_CPP_EXTERN bool dynamic() const; /// Obtain a reference to the AMQP dynamic node properties for the /// terminus. See also lifetime_policy. - PN_CPP_EXTERN const value& node_properties() const; - - /// Obtain a reference to the AMQP filter set for the terminus. - /// See also selector. - PN_CPP_EXTERN const value& filter() const; + PN_CPP_EXTERN value node_properties() const; /// @cond INTERNAL protected: pn_terminus_t *pn_object() { return object_; } private: pn_terminus_t* object_; - value properties_, filter_; pn_link_t* parent_; - - friend class internal::factory<terminus>; - friend class source; - friend class target; + friend class internal::factory<terminus>; + friend class source; + friend class target; /// @endcond }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e6cb049/proton-c/bindings/cpp/src/node_options.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/node_options.cpp b/proton-c/bindings/cpp/src/node_options.cpp index b8438fa..7eebec6 100644 --- a/proton-c/bindings/cpp/src/node_options.cpp +++ b/proton-c/bindings/cpp/src/node_options.cpp @@ -96,6 +96,7 @@ class source_options::impl { option<duration> timeout; option<enum expiry_policy> expiry_policy; option<enum distribution_mode> distribution_mode; + option<source::filter_map> filters; void apply(source& s) { node_address(s, address, dynamic); @@ -103,6 +104,11 @@ class source_options::impl { node_expiry(s, expiry_policy, timeout); if (distribution_mode.set) pn_terminus_set_distribution_mode(unwrap(s), pn_distribution_mode_t(distribution_mode.value)); + if (filters.set && !filters.value.empty()) { + // Applied at most once via source_option. No need to clear. + codec::encoder e(pn_terminus_filter(unwrap(s))); + e << filters.value; + } } void update(const impl& x) { @@ -112,6 +118,7 @@ class source_options::impl { timeout.update(x.timeout); expiry_policy.update(x.expiry_policy); distribution_mode.update(x.distribution_mode); + filters.update(x.filters); } }; @@ -135,6 +142,7 @@ source_options& source_options::durability_mode(enum durability_mode m) { impl_- source_options& source_options::timeout(duration d) { impl_->timeout = d; return *this; } source_options& source_options::expiry_policy(enum expiry_policy m) { impl_->expiry_policy = m; return *this; } source_options& source_options::distribution_mode(enum distribution_mode m) { impl_->distribution_mode = m; return *this; } +source_options& source_options::filters(const source::filter_map &map) { impl_->filters = map; return *this; } void source_options::apply(source& s) const { impl_->apply(s); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e6cb049/proton-c/bindings/cpp/src/receiver_options.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/receiver_options.cpp b/proton-c/bindings/cpp/src/receiver_options.cpp index e2eb416..733bd00 100644 --- a/proton-c/bindings/cpp/src/receiver_options.cpp +++ b/proton-c/bindings/cpp/src/receiver_options.cpp @@ -130,7 +130,6 @@ receiver_options& receiver_options::auto_settle(bool b) {impl_->auto_settle = b; receiver_options& receiver_options::credit_window(int w) {impl_->credit_window = w; return *this; } receiver_options& receiver_options::source(source_options &s) {impl_->source = s; return *this; } receiver_options& receiver_options::target(target_options &s) {impl_->target = s; return *this; } -receiver_options& receiver_options::selector(const std::string&) { return *this; } void receiver_options::apply(receiver& r) const { impl_->apply(r); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e6cb049/proton-c/bindings/cpp/src/source.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/source.cpp b/proton-c/bindings/cpp/src/source.cpp index 26de203..3816c78 100644 --- a/proton-c/bindings/cpp/src/source.cpp +++ b/proton-c/bindings/cpp/src/source.cpp @@ -43,4 +43,14 @@ std::string source::address() const { return str(pn_terminus_get_address(authoritative)); } +source::filter_map source::filters() const { + codec::decoder d(pn_terminus_filter(object_)); + filter_map map; + if (!d.empty()) { + d.rewind(); + d >> map; + } + return map; +} + } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e6cb049/proton-c/bindings/cpp/src/terminus.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/terminus.cpp b/proton-c/bindings/cpp/src/terminus.cpp index f9460bb..f1cc61a 100644 --- a/proton-c/bindings/cpp/src/terminus.cpp +++ b/proton-c/bindings/cpp/src/terminus.cpp @@ -27,7 +27,7 @@ namespace proton { terminus::terminus(pn_terminus_t* t) : - object_(t), properties_(pn_terminus_properties(t)), filter_(pn_terminus_filter(t)), parent_(0) + object_(t), parent_(0) {} enum expiry_policy terminus::expiry_policy() const { @@ -46,17 +46,14 @@ enum durability_mode terminus::durability_mode() { return (enum durability_mode) pn_terminus_get_durability(object_); } -std::string terminus::address() const { - return str(pn_terminus_get_address(object_)); -} - bool terminus::dynamic() const { return pn_terminus_is_dynamic(object_); } -const value& terminus::filter() const { return filter_; } - -const value& terminus::node_properties() const { return properties_; } - +value terminus::node_properties() const { + value x(pn_terminus_properties(object_)); + pn_terminus_properties(object_); // ZZZ + return x; +} } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
