PROTON-1863: [cpp] need support for anonymous termini Added anonymous() to source/target_options, sets the address to NULL. Also fixed the dynamic() option to set the address to NULL.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/21eb6d8b Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/21eb6d8b Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/21eb6d8b Branch: refs/heads/go1 Commit: 21eb6d8beee5e7e42aaf51d34c566bf1e8764901 Parents: 9364588 Author: Alan Conway <[email protected]> Authored: Thu Jun 14 14:45:07 2018 -0400 Committer: Alan Conway <[email protected]> Committed: Thu Jun 14 15:27:46 2018 -0400 ---------------------------------------------------------------------- cpp/include/proton/source_options.hpp | 5 + cpp/include/proton/target_options.hpp | 7 +- cpp/include/proton/terminus.hpp | 3 + cpp/src/connection_driver_test.cpp | 148 +++++++++++++++++++++++------ cpp/src/node_options.cpp | 18 ++-- cpp/src/terminus.cpp | 4 + 6 files changed, 149 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/21eb6d8b/cpp/include/proton/source_options.hpp ---------------------------------------------------------------------- diff --git a/cpp/include/proton/source_options.hpp b/cpp/include/proton/source_options.hpp index dd9d9d0..fe1c34a 100644 --- a/cpp/include/proton/source_options.hpp +++ b/cpp/include/proton/source_options.hpp @@ -63,6 +63,11 @@ class source_options { /// ignored. PN_CPP_EXTERN source_options& dynamic(bool); + /// Request an anonymous node on the remote peer. + /// The default is false. Any specified target address() is + /// ignored if true. + PN_CPP_EXTERN source_options& anonymous(bool); + /// Control whether messages are browsed or consumed. The /// default is source::MOVE, meaning consumed. PN_CPP_EXTERN source_options& distribution_mode(enum source::distribution_mode); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/21eb6d8b/cpp/include/proton/target_options.hpp ---------------------------------------------------------------------- diff --git a/cpp/include/proton/target_options.hpp b/cpp/include/proton/target_options.hpp index f9b8895..834a185 100644 --- a/cpp/include/proton/target_options.hpp +++ b/cpp/include/proton/target_options.hpp @@ -60,9 +60,14 @@ class target_options { /// Request that a node be dynamically created by the remote peer. /// The default is false. Any specified target address() is - /// ignored. + /// ignored if true. PN_CPP_EXTERN target_options& dynamic(bool); + /// Request an anonymous node on the remote peer. + /// The default is false. Any specified target address() is + /// ignored if true. + PN_CPP_EXTERN target_options& anonymous(bool); + /// Control the persistence of the target node. The default is /// target::NONDURABLE, meaning non-persistent. PN_CPP_EXTERN target_options& durability_mode(enum target::durability_mode); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/21eb6d8b/cpp/include/proton/terminus.hpp ---------------------------------------------------------------------- diff --git a/cpp/include/proton/terminus.hpp b/cpp/include/proton/terminus.hpp index d0f755c..5b9c684 100644 --- a/cpp/include/proton/terminus.hpp +++ b/cpp/include/proton/terminus.hpp @@ -93,6 +93,9 @@ class terminus { /// True if the remote node is created dynamically. PN_CPP_EXTERN bool dynamic() const; + /// True if the remote node is an anonymous-relay + PN_CPP_EXTERN bool anonymous() const; + /// Obtain a reference to the AMQP dynamic node properties for the /// terminus. See also lifetime_policy. PN_CPP_EXTERN value node_properties() const; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/21eb6d8b/cpp/src/connection_driver_test.cpp ---------------------------------------------------------------------- diff --git a/cpp/src/connection_driver_test.cpp b/cpp/src/connection_driver_test.cpp index 5a4bfcb..5be857e 100644 --- a/cpp/src/connection_driver_test.cpp +++ b/cpp/src/connection_driver_test.cpp @@ -169,6 +169,8 @@ struct record_handler : public messaging_handler { std::deque<std::string> unhandled_errors, transport_errors, connection_errors; std::deque<proton::message> messages; + size_t link_count() const { return senders.size() + receivers.size(); } + void on_receiver_open(receiver &l) PN_CPP_OVERRIDE { messaging_handler::on_receiver_open(l); receivers.push_back(l); @@ -331,55 +333,143 @@ void test_spin_interrupt() { } catch (const test::error&) {} } -void test_link_options() { - // Propagation of link and terminus properties +#define ASSERT_ADDR(ADDR, TERMINUS) do { \ + ASSERT_EQUAL((ADDR), (TERMINUS).address()); \ + if ((ADDR) == std::string()) ASSERT((TERMINUS).anonymous()); \ + else ASSERT(!(TERMINUS).anonymous()); \ + } while(0); + +#define ASSERT_LINK(SRC, TGT, LINK) do { \ + ASSERT_ADDR((SRC), (LINK).source()); \ + ASSERT_ADDR((TGT), (LINK).target()); \ + } while(0); + +void test_link_address() { + record_handler ha, hb; + driver_pair d(ha, hb); + + // FIXME aconway 2018-06-14: also fixes PROTON-1679? + + // Using open(address, opts) + d.a.connection().open_sender("tx", sender_options().name("_x").source(source_options().address("sx"))); + d.a.connection().open_receiver("sy", receiver_options().name("_y").target(target_options().address("ty"))); + while (ha.link_count()+hb.link_count() < 4) d.process(); + + proton::sender ax = quick_pop(ha.senders); + ASSERT_EQUAL("_x", ax.name()); + ASSERT_LINK("sx", "tx", ax); + proton::receiver bx = quick_pop(hb.receivers); + ASSERT_EQUAL("_x", bx.name()); + ASSERT_LINK("sx", "tx", bx); + + proton::receiver ay = quick_pop(ha.receivers); + ASSERT_EQUAL("_y", ay.name()); + ASSERT_LINK("sy", "ty", ay); + proton::sender by = quick_pop(hb.senders); + ASSERT_EQUAL("_y", by.name()); + ASSERT_LINK("sy", "ty", by); + + // Override address parameter in opts + d.a.connection().open_sender("x", sender_options().target(target_options().address("X"))); + d.a.connection().open_receiver("y", receiver_options().source(source_options().address("Y"))); + while (ha.link_count()+hb.link_count() < 4) d.process(); + + ax = quick_pop(ha.senders); + ASSERT_LINK("", "X", ax); + bx = quick_pop(hb.receivers); + ASSERT_LINK("", "X", bx); + + ay = quick_pop(ha.receivers); + ASSERT_LINK("Y", "", ay); + by = quick_pop(hb.senders); + ASSERT_LINK("Y", "", by); +} + +void test_link_anonymous_dynamic() { record_handler ha, hb; driver_pair d(ha, hb); + // Anonymous link should have NULL address + d.a.connection().open_sender("x", sender_options().target(target_options().anonymous(true))); + d.a.connection().open_receiver("y", receiver_options().source(source_options().anonymous(true))); + while (ha.link_count()+hb.link_count() < 4) d.process(); + + proton::sender ax = quick_pop(ha.senders); + ASSERT_LINK("", "", ax); + proton::receiver bx = quick_pop(hb.receivers); + ASSERT_LINK("", "", bx); + + proton::receiver ay = quick_pop(ha.receivers); + ASSERT_LINK("", "", ay); + proton::sender by = quick_pop(hb.senders); + ASSERT_LINK("", "", by); + + // Dynamic link should have NULL address and dynamic flag + d.a.connection().open_sender("x", sender_options().target(target_options().dynamic(true))); + d.a.connection().open_receiver("y", receiver_options().source(source_options().dynamic(true))); + while (ha.link_count()+hb.link_count() < 4) d.process(); + + ax = quick_pop(ha.senders); + ASSERT(ax.target().dynamic()); + ASSERT_LINK("", "", ax); + bx = quick_pop(hb.receivers); + ASSERT(bx.target().dynamic()); + ASSERT_LINK("", "", bx); + + ay = quick_pop(ha.receivers); + ASSERT(ay.source().dynamic()); + ASSERT_LINK("", "", ay); + by = quick_pop(hb.senders); + ASSERT(by.source().dynamic()); + ASSERT_LINK("", "", by); + + // Empty string as a link address is allowed and not considered anonymous. + d.a.connection().open_sender("", sender_options()); + d.a.connection().open_receiver("", receiver_options()); + while (ha.link_count()+hb.link_count() < 4) d.process(); + + ax = quick_pop(ha.senders); + ASSERT(ax.target().address().empty()); + ASSERT(!ax.target().anonymous()); + + ay = quick_pop(ha.receivers); + ASSERT(ay.source().address().empty()); + ASSERT(!ay.source().anonymous()); +} + +void test_link_capability_filter() { + record_handler ha, hb; + driver_pair d(ha, hb); + + // Capabilities and filters std::vector<proton::symbol> caps; caps.push_back("foo"); caps.push_back("bar"); - source::filter_map f; - f.put("xx", "xxx"); - ASSERT_EQUAL(1U, f.size()); - d.a.connection().open_sender( - "x", sender_options().name("_x").target(target_options().capabilities(caps))); - - f.clear(); - f.put("yy", "yyy"); - ASSERT_EQUAL(1U, f.size()); - d.a.connection().open_receiver( - "y", receiver_options().name("_y").source(source_options().filters(f).capabilities(caps))); + d.a.connection().open_sender("x", sender_options().target(target_options().capabilities(caps))); - while (ha.senders.size()+ha.receivers.size() < 2 || - hb.senders.size()+hb.receivers.size() < 2) - d.process(); + source::filter_map f; + f.put("1", "11"); + f.put("2", "22"); + d.a.connection().open_receiver("y", receiver_options().source(source_options().filters(f).capabilities(caps))); + while (ha.link_count()+hb.link_count() < 4) d.process(); proton::sender ax = quick_pop(ha.senders); - ASSERT_EQUAL("_x", ax.name()); - ASSERT_EQUAL("x", ax.target().address()); ASSERT_EQUAL(many<proton::symbol>() + "foo" + "bar", ax.target().capabilities()); proton::receiver ay = quick_pop(ha.receivers); - ASSERT_EQUAL("_y", ay.name()); - ASSERT_EQUAL("y", ay.source().address()); ASSERT_EQUAL(many<proton::symbol>() + "foo" + "bar", ay.source().capabilities()); proton::receiver bx = quick_pop(hb.receivers); - ASSERT_EQUAL("x", bx.target().address()); ASSERT_EQUAL(many<proton::symbol>() + "foo" + "bar", bx.target().capabilities()); - ASSERT_EQUAL("_x", bx.name()); - ASSERT_EQUAL("", bx.source().address()); ASSERT_EQUAL(many<proton::symbol>(), bx.source().capabilities()); proton::sender by = quick_pop(hb.senders); - ASSERT_EQUAL("y", by.source().address()); ASSERT_EQUAL(many<proton::symbol>() + "foo" + "bar", by.source().capabilities()); - ASSERT_EQUAL("_y", by.name()); f = by.source().filters(); - ASSERT_EQUAL(1U, f.size()); - ASSERT_EQUAL(value("yyy"), f.get("yy")); + ASSERT_EQUAL(2U, f.size()); + ASSERT_EQUAL(value("11"), f.get("1")); + ASSERT_EQUAL(value("22"), f.get("2")); } void test_message() { @@ -457,7 +547,9 @@ int main(int argc, char** argv) { RUN_ARGV_TEST(failed, test_driver_disconnected()); RUN_ARGV_TEST(failed, test_no_container()); RUN_ARGV_TEST(failed, test_spin_interrupt()); - RUN_ARGV_TEST(failed, test_link_options()); + RUN_ARGV_TEST(failed, test_link_address()); + RUN_ARGV_TEST(failed, test_link_anonymous_dynamic()); + RUN_ARGV_TEST(failed, test_link_capability_filter()); RUN_ARGV_TEST(failed, test_message()); RUN_ARGV_TEST(failed, test_message_timeout_succeed()); RUN_ARGV_TEST(failed, test_message_timeout_fail()); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/21eb6d8b/cpp/src/node_options.cpp ---------------------------------------------------------------------- diff --git a/cpp/src/node_options.cpp b/cpp/src/node_options.cpp index 2156509..3b6d197 100644 --- a/cpp/src/node_options.cpp +++ b/cpp/src/node_options.cpp @@ -63,13 +63,13 @@ namespace { // Options common to sources and targets -void node_address(terminus &t, option<std::string> &addr, option<bool> &dynamic) { +void node_address(terminus &t, option<std::string> &addr, option<bool> &dynamic, option<bool> &anonymous) { if (dynamic.set && dynamic.value) { pn_terminus_set_dynamic(unwrap(t), true); - // Ignore any addr value for dynamic. - return; - } - if (addr.set) { + pn_terminus_set_address(unwrap(t), NULL); + } else if (anonymous.set && anonymous.value) { + pn_terminus_set_address(unwrap(t), NULL); + } else if (addr.set) { pn_terminus_set_address(unwrap(t), addr.value.c_str()); } } @@ -90,6 +90,7 @@ class source_options::impl { public: option<std::string> address; option<bool> dynamic; + option<bool> anonymous; option<enum source::durability_mode> durability_mode; option<duration> timeout; option<enum source::expiry_policy> expiry_policy; @@ -98,7 +99,7 @@ class source_options::impl { option<std::vector<symbol> > capabilities; void apply(source& s) { - node_address(s, address, dynamic); + node_address(s, address, dynamic, anonymous); node_durability(s, durability_mode); node_expiry(s, expiry_policy, timeout); if (distribution_mode.set) @@ -126,6 +127,7 @@ source_options& source_options::operator=(const source_options& x) { source_options& source_options::address(const std::string &addr) { impl_->address = addr; return *this; } source_options& source_options::dynamic(bool b) { impl_->dynamic = b; return *this; } +source_options& source_options::anonymous(bool b) { impl_->anonymous = b; return *this; } source_options& source_options::durability_mode(enum source::durability_mode m) { impl_->durability_mode = m; return *this; } source_options& source_options::timeout(duration d) { impl_->timeout = d; return *this; } source_options& source_options::expiry_policy(enum source::expiry_policy m) { impl_->expiry_policy = m; return *this; } @@ -141,13 +143,14 @@ class target_options::impl { public: option<std::string> address; option<bool> dynamic; + option<bool> anonymous; option<enum target::durability_mode> durability_mode; option<duration> timeout; option<enum target::expiry_policy> expiry_policy; option<std::vector<symbol> > capabilities; void apply(target& t) { - node_address(t, address, dynamic); + node_address(t, address, dynamic, anonymous); node_durability(t, durability_mode); node_expiry(t, expiry_policy, timeout); if (capabilities.set) { @@ -169,6 +172,7 @@ target_options& target_options::operator=(const target_options& x) { target_options& target_options::address(const std::string &addr) { impl_->address = addr; return *this; } target_options& target_options::dynamic(bool b) { impl_->dynamic = b; return *this; } +target_options& target_options::anonymous(bool b) { impl_->anonymous = b; return *this; } target_options& target_options::durability_mode(enum target::durability_mode m) { impl_->durability_mode = m; return *this; } target_options& target_options::timeout(duration d) { impl_->timeout = d; return *this; } target_options& target_options::expiry_policy(enum target::expiry_policy m) { impl_->expiry_policy = m; return *this; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/21eb6d8b/cpp/src/terminus.cpp ---------------------------------------------------------------------- diff --git a/cpp/src/terminus.cpp b/cpp/src/terminus.cpp index 413c909..f04694f 100644 --- a/cpp/src/terminus.cpp +++ b/cpp/src/terminus.cpp @@ -48,6 +48,10 @@ bool terminus::dynamic() const { return pn_terminus_is_dynamic(object_); } +bool terminus::anonymous() const { + return pn_terminus_get_address(object_) == NULL; +} + value terminus::node_properties() const { return value(pn_terminus_properties(object_)); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
