Repository: qpid-proton Updated Branches: refs/heads/master 181bb4bd1 -> b1b85f6c1
PROTON-1089: C++ binding link options part 1 without filters Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b1b85f6c Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b1b85f6c Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b1b85f6c Branch: refs/heads/master Commit: b1b85f6c12e70ae70cb2d3df59cdd83d8833bcd5 Parents: 181bb4b Author: Clifford Jansen <[email protected]> Authored: Tue Jan 5 14:41:25 2016 -0800 Committer: Clifford Jansen <[email protected]> Committed: Tue Jan 5 14:48:41 2016 -0800 ---------------------------------------------------------------------- examples/cpp/CMakeLists.txt | 1 + examples/cpp/client.cpp | 2 +- examples/cpp/queue_browser.cpp | 59 +++++++++ proton-c/bindings/cpp/CMakeLists.txt | 1 + .../bindings/cpp/include/proton/connection.hpp | 8 +- .../bindings/cpp/include/proton/container.hpp | 21 +++- proton-c/bindings/cpp/include/proton/link.hpp | 33 ++++- .../cpp/include/proton/link_options.hpp | 107 ++++++++++++++++ proton-c/bindings/cpp/include/proton/sender.hpp | 9 ++ .../bindings/cpp/include/proton/session.hpp | 8 +- .../bindings/cpp/include/proton/terminus.hpp | 17 ++- proton-c/bindings/cpp/include/proton/types.hpp | 2 +- proton-c/bindings/cpp/src/blocking_receiver.cpp | 4 +- proton-c/bindings/cpp/src/connection.cpp | 8 +- proton-c/bindings/cpp/src/container.cpp | 9 +- proton-c/bindings/cpp/src/container_impl.cpp | 28 +++-- proton-c/bindings/cpp/src/container_impl.hpp | 9 +- proton-c/bindings/cpp/src/link.cpp | 32 ++++- proton-c/bindings/cpp/src/link_options.cpp | 123 +++++++++++++++++++ proton-c/bindings/cpp/src/sender.cpp | 4 + proton-c/bindings/cpp/src/session.cpp | 11 +- proton-c/bindings/cpp/src/terminus.cpp | 12 +- 22 files changed, 453 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/examples/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index 903294c..4a66925 100644 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -36,6 +36,7 @@ set(examples server_direct recurring_timer connection_options + queue_browser ssl ssl_client_cert encode_decode) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/examples/cpp/client.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/client.cpp b/examples/cpp/client.cpp index 3f5c4ab..f8186e0 100644 --- a/examples/cpp/client.cpp +++ b/examples/cpp/client.cpp @@ -40,7 +40,7 @@ class client : public proton::messaging_handler { void on_start(proton::event &e) { sender = e.container().open_sender(url); // Create a receiver with a dynamically chosen unique address. - receiver = sender.connection().open_receiver("", true/*dynamic*/); + receiver = sender.connection().open_receiver("", proton::link_options().dynamic_address(true)); } void send_request() { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/examples/cpp/queue_browser.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/queue_browser.cpp b/examples/cpp/queue_browser.cpp new file mode 100644 index 0000000..1206c71 --- /dev/null +++ b/examples/cpp/queue_browser.cpp @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/container.hpp" +#include "proton/messaging_handler.hpp" +#include "proton/url.hpp" +#include "proton/link_options.hpp" + +#include <iostream> + +class browser : public proton::messaging_handler { + private: + proton::url url; + + public: + + browser(const proton::url& u) : url(u) {} + + void on_start(proton::event &e) { + proton::connection conn = e.container().connect(url); + conn.open_receiver(url.path(), proton::link_options().browsing(true)); + } + + void on_message(proton::event &e) { + std::cout << e.message().body() << std::endl; + if (e.receiver().queued() == 0 && e.receiver().drained() > 0) + e.connection().close(); + } +}; + +int main(int argc, char **argv) { + try { + std::string url = argc > 1 ? argv[1] : "127.0.0.1:5672/examples"; + browser b(url); + proton::container(b).run(); + return 0; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + return 1; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt index 732a8c7..7a345c0 100644 --- a/proton-c/bindings/cpp/CMakeLists.txt +++ b/proton-c/bindings/cpp/CMakeLists.txt @@ -50,6 +50,7 @@ set(qpid-proton-cpp-source src/event.cpp src/handler.cpp src/link.cpp + src/link_options.cpp src/message.cpp src/messaging_adapter.cpp src/messaging_event.cpp http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/include/proton/connection.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/connection.hpp b/proton-c/bindings/cpp/include/proton/connection.hpp index 7a9cf74..67091e3 100644 --- a/proton-c/bindings/cpp/include/proton/connection.hpp +++ b/proton-c/bindings/cpp/include/proton/connection.hpp @@ -88,11 +88,11 @@ class connection : public object<pn_connection_t>, endpoint /** Default session is created on first call and re-used for the lifetime of the connection */ PN_CPP_EXTERN session default_session(); - /** Create a sender on default_session() with target=addr and optional handler h */ - PN_CPP_EXTERN sender open_sender(const std::string &addr, handler *h=0); + /** Create a sender on default_session() with target=addr and link options=opts */ + PN_CPP_EXTERN sender open_sender(const std::string &addr, const link_options &opts = link_options()); - /** Create a receiver on default_session() with target=addr and optional handler h */ - PN_CPP_EXTERN receiver open_receiver(const std::string &addr, bool dynamic=false, handler *h=0); + /** Create a receiver on default_session() with target=addr and optional link options opts */ + PN_CPP_EXTERN receiver open_receiver(const std::string &addr, const link_options &opts = link_options()); /** Return links on this connection matching the state mask. */ PN_CPP_EXTERN link_range find_links(endpoint::state mask) const; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/include/proton/container.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp index fbdafb9..44cc476 100644 --- a/proton-c/bindings/cpp/include/proton/container.hpp +++ b/proton-c/bindings/cpp/include/proton/container.hpp @@ -28,6 +28,7 @@ #include "proton/reactor.hpp" #include "proton/url.hpp" #include "proton/connection_options.hpp" +#include "proton/link_options.hpp" #include <string> @@ -68,11 +69,17 @@ class container : public event_loop { /** Run the event loop, return when all connections and acceptors are closed. */ PN_CPP_EXTERN void run(); - /** Open a connection to url and create a sender with target=url.path() */ - PN_CPP_EXTERN sender open_sender(const proton::url &); + /** Open a new connection to url and create a sender with target=url.path(). + Any supplied link or connection options will override the container's + template options. */ + PN_CPP_EXTERN sender open_sender(const proton::url &, const proton::link_options &l = proton::link_options(), + const connection_options &c = connection_options()); - /** Create a receiver on connection with source=url.path() */ - PN_CPP_EXTERN receiver open_receiver(const url &); + /** Create a receiver on a new connection with source=url.path(). Any + supplied link or connection options will override the container's + template options. */ + PN_CPP_EXTERN receiver open_receiver(const url &, const proton::link_options &l = proton::link_options(), + const connection_options &c = connection_options()); /// Identifier for the container PN_CPP_EXTERN std::string id() const; @@ -94,6 +101,12 @@ class container : public event_loop { first open event on the connection. */ PN_CPP_EXTERN void server_connection_options(const connection_options &); + /** Copy the link options to a template applied to new links created and + opened by this container. They are applied before the open event on the + link and may be overriden by link options in other methods. */ + PN_CPP_EXTERN void link_options(const link_options &); + + private: pn_unique_ptr<container_impl> impl_; }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/include/proton/link.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/link.hpp b/proton-c/bindings/cpp/include/proton/link.hpp index b775341..aafaf77 100644 --- a/proton-c/bindings/cpp/include/proton/link.hpp +++ b/proton-c/bindings/cpp/include/proton/link.hpp @@ -27,6 +27,7 @@ #include "proton/terminus.hpp" #include "proton/types.h" #include "proton/object.hpp" +#include "proton/link_options.hpp" #include <string> @@ -39,12 +40,25 @@ class receiver; class link : public object<pn_link_t> , public endpoint { public: + /// Sender settlement behaviour for a link + enum sender_settle_mode_t { + UNSETTLED = PN_SND_UNSETTLED, + SETTLED = PN_SND_SETTLED, + MIXED = PN_SND_MIXED + }; + + /// Receiver settlement behaviour for a link + enum receiver_settle_mode_t { + SETTLE_ALWAYS = PN_RCV_FIRST, + SETTLE_SECOND= PN_RCV_SECOND + }; + link(pn_link_t* l=0) : object<pn_link_t>(l) {} /** Locally open the link, not complete till messaging_handler::on_link_opened or * proton_handler::link_remote_open */ - PN_CPP_EXTERN void open(); + PN_CPP_EXTERN void open(const link_options &opts = link_options()); /** Locally close the link, not complete till messaging_handler::on_link_closed or * proton_handler::link_remote_close @@ -62,8 +76,14 @@ class link : public object<pn_link_t> , public endpoint /** Credit available on the link */ PN_CPP_EXTERN int credit() const; - /** Grant credit to the link */ - PN_CPP_EXTERN void flow(int credit); + /** The number of queued deliveries for the link */ + PN_CPP_EXTERN int queued(); + + /** The number of unsettled deliveries on the link */ + PN_CPP_EXTERN int unsettled(); + + /** The count of credit returned. */ + PN_CPP_EXTERN int drained(); /** Local source of the link. */ PN_CPP_EXTERN terminus source() const; @@ -100,6 +120,13 @@ class link : public object<pn_link_t> , public endpoint /** Navigate the links in a connection - get next link with state */ PN_CPP_EXTERN link next(endpoint::state) const; + + PN_CPP_EXTERN sender_settle_mode_t sender_settle_mode(); + PN_CPP_EXTERN void sender_settle_mode(sender_settle_mode_t); + PN_CPP_EXTERN receiver_settle_mode_t receiver_settle_mode(); + PN_CPP_EXTERN void receiver_settle_mode(receiver_settle_mode_t); + PN_CPP_EXTERN sender_settle_mode_t remote_sender_settle_mode(); + PN_CPP_EXTERN receiver_settle_mode_t remote_receiver_settle_mode(); }; /// An iterator for links. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/include/proton/link_options.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/link_options.hpp b/proton-c/bindings/cpp/include/proton/link_options.hpp new file mode 100644 index 0000000..98790db --- /dev/null +++ b/proton-c/bindings/cpp/include/proton/link_options.hpp @@ -0,0 +1,107 @@ +#ifndef PROTON_CPP_LINK_OPTIONS_H +#define PROTON_CPP_LINK_OPTIONS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/config.hpp" +#include "proton/export.hpp" +#include "proton/pn_unique_ptr.hpp" +#include "proton/types.hpp" +#include "proton/terminus.hpp" + +#include <vector> +#include <string> + +namespace proton { + +/** The message delivery policy to establish when opening the link. */ +enum link_delivery_mode_t { + // No set policy. The application must settle messages itself according to its own policy. + NONE = 0, + // Outgoing messages are settled immediately by the link. There are no duplicates. + AT_MOST_ONCE, + // The receiver settles the delivery first with an accept/reject/release disposition. + // The sender waits to settle until after the disposition notification is received. + AT_LEAST_ONCE +}; + +class handler; +class link; + +/** Options for creating a link. + * + * Options can be "chained" like this: + * + * l = container.create_sender(url, link_options().handler(h).browsing(true)); + * + * You can also create an options object with common settings and use it as a base + * for different connections that have mostly the same settings: + * + * link_options opts; + * opts.browsing(true); + * l1 = container.open_sender(url1, opts.handler(h1)); + * c2 = container.open_receiver(url2, opts.handler(h2)); + * + * Normal value semantics, copy or assign creates a separate copy of the options. + */ +class link_options { + public: + PN_CPP_EXTERN link_options(); + PN_CPP_EXTERN link_options(const link_options&); + PN_CPP_EXTERN ~link_options(); + PN_CPP_EXTERN link_options& operator=(const link_options&); + + /// Override with options from other. + PN_CPP_EXTERN void override(const link_options& other); + + /** Set a handler for events scoped to the link. If NULL, link-scoped events on the link are discarded. */ + PN_CPP_EXTERN link_options& handler(class handler *); + /** Receiver-only option to specify whether messages are browsed or + consumed. Setting browsing to true is Equivalent to setting + distribution_mode(COPY). Setting browsing to false is equivalent to + setting distribution_mode(MOVE). */ + PN_CPP_EXTERN link_options& browsing(bool); + /** Set the distribution mode for message transfer. See terminus::distribution_mode_t. */ + PN_CPP_EXTERN link_options& distribution_mode(terminus::distribution_mode_t); + /* Receiver-only option to create a durable subsription on the receiver. + Equivalent to setting the terminus durability to termins::DELIVERIES and + the expiry policy to terminus::EXPIRE_NEVER. */ + PN_CPP_EXTERN link_options& durable_subscription(bool); + /* Set the delivery mode on the link. */ + PN_CPP_EXTERN link_options& delivery_mode(link_delivery_mode_t); + /* Receiver-only option to request a dynamically generated node at the peer. */ + PN_CPP_EXTERN link_options& dynamic_address(bool); + /* Set the local address for the link. */ + PN_CPP_EXTERN link_options& local_address(const std::string &addr); + // TODO: selector/filter, dynamic node properties + + private: + friend class link; + void apply(link&) const; + class handler* handler() const; + + class impl; + pn_unique_ptr<impl> impl_; +}; + +} // namespace + +#endif /*!PROTON_CPP_LINK_OPTIONS_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/include/proton/sender.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/sender.hpp b/proton-c/bindings/cpp/include/proton/sender.hpp index af8b9d5..a5835fd 100644 --- a/proton-c/bindings/cpp/include/proton/sender.hpp +++ b/proton-c/bindings/cpp/include/proton/sender.hpp @@ -41,6 +41,15 @@ class sender : public link /// Send a message on the link. PN_CPP_EXTERN delivery send(const message &m); + + /** The number of deliveries that might be able to be sent if sufficient credit were + issued on the link. See sender::offered(). Maintained by the application. */ + PN_CPP_EXTERN int available(); + + /** Set the availability of deliveries for a sender. */ + PN_CPP_EXTERN void offered(int c); + + }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/include/proton/session.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/session.hpp b/proton-c/bindings/cpp/include/proton/session.hpp index 7873775..0f7c946 100644 --- a/proton-c/bindings/cpp/include/proton/session.hpp +++ b/proton-c/bindings/cpp/include/proton/session.hpp @@ -71,11 +71,11 @@ class session : public object<pn_session_t>, public endpoint */ PN_CPP_EXTERN sender create_sender(const std::string& name=std::string()); - /** Create and open a sender with target=addr and optional handler h */ - PN_CPP_EXTERN sender open_sender(const std::string &addr, handler *h=0); + /** Create and open a sender with target=addr and optional link options opts*/ + PN_CPP_EXTERN sender open_sender(const std::string &addr, const link_options &opts = link_options()); - /** Create and open a receiver with target=addr and optional handler h */ - PN_CPP_EXTERN receiver open_receiver(const std::string &addr, bool dynamic=false, handler *h=0); + /** Create and open a receiver with target=addr and optional link options opts */ + PN_CPP_EXTERN receiver open_receiver(const std::string &addr, const link_options &opts = link_options()); /** Get the endpoint state */ PN_CPP_EXTERN endpoint::state state() const; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/include/proton/terminus.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/terminus.hpp b/proton-c/bindings/cpp/include/proton/terminus.hpp index 0a1d4b8..3aeda23 100644 --- a/proton-c/bindings/cpp/include/proton/terminus.hpp +++ b/proton-c/bindings/cpp/include/proton/terminus.hpp @@ -32,7 +32,7 @@ namespace proton { class link; /** A terminus represents one end of a link. - * The source terminus is where originate, the target terminus is where they go. + * The source terminus is where messages originate, the target terminus is where they go. */ class terminus { @@ -46,13 +46,21 @@ class terminus COORDINATOR = PN_COORDINATOR ///< Transaction co-ordinator }; - /// Expiry policy - enum expiry_policy_t { + /// Durability + enum durability_t { NONDURABLE = PN_NONDURABLE, CONFIGURATION = PN_CONFIGURATION, DELIVERIES = PN_DELIVERIES }; + /// Expiry policy + enum expiry_policy_t { + EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK, + EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION, + EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION, + EXPIRE_NEVER = PN_EXPIRE_NEVER + }; + /// Distribution mode enum distribution_mode_t { MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED, @@ -66,10 +74,13 @@ class terminus PN_CPP_EXTERN void expiry_policy(expiry_policy_t); PN_CPP_EXTERN distribution_mode_t distribution_mode() const; PN_CPP_EXTERN void distribution_mode(distribution_mode_t); + PN_CPP_EXTERN durability_t durability(); + PN_CPP_EXTERN void durability(durability_t); PN_CPP_EXTERN std::string address() const; PN_CPP_EXTERN void address(const std::string &); PN_CPP_EXTERN bool dynamic() const; PN_CPP_EXTERN void dynamic(bool); + // TODO: filter + related selector private: pn_terminus_t* object_; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/include/proton/types.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/types.hpp b/proton-c/bindings/cpp/include/proton/types.hpp index 062566a..d0f3da6 100644 --- a/proton-c/bindings/cpp/include/proton/types.hpp +++ b/proton-c/bindings/cpp/include/proton/types.hpp @@ -78,7 +78,7 @@ struct type_error : public decode_error { type_id want; ///< Expected type_id type_id got; ///< Actual type_id }; - + ///@cond INTERNAL /// Provide a full set of comparison operators for proton:: types that have < and ==. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/src/blocking_receiver.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking_receiver.cpp b/proton-c/bindings/cpp/src/blocking_receiver.cpp index 00d3dce..ed70634 100644 --- a/proton-c/bindings/cpp/src/blocking_receiver.cpp +++ b/proton-c/bindings/cpp/src/blocking_receiver.cpp @@ -46,7 +46,7 @@ blocking_receiver::blocking_receiver( class blocking_connection &c, const std::string& addr, int credit, bool dynamic) : blocking_link(c), fetcher_(new blocking_fetcher(credit)) { - open(c.impl_->connection_.open_receiver(addr, dynamic, fetcher_.get())); + open(c.impl_->connection_.open_receiver(addr, link_options().dynamic_address(dynamic).handler(fetcher_.get()))); std::string sa = link_.source().address(); std::string rsa = link_.remote_source().address(); if (!sa.empty() && sa.compare(rsa) != 0) { @@ -56,7 +56,7 @@ blocking_receiver::blocking_receiver( throw error(MSG(txt)); } if (credit) - link_.flow(credit); + link_.receiver().flow(credit); } blocking_receiver::~blocking_receiver() { link_.detach_handler(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/src/connection.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/connection.cpp b/proton-c/bindings/cpp/src/connection.cpp index 41c56f6..7469954 100644 --- a/proton-c/bindings/cpp/src/connection.cpp +++ b/proton-c/bindings/cpp/src/connection.cpp @@ -98,13 +98,13 @@ session connection::default_session() { return ctx.default_session; } -sender connection::open_sender(const std::string &addr, handler *h) { - return default_session().open_sender(addr, h); +sender connection::open_sender(const std::string &addr, const link_options &opts) { + return default_session().open_sender(addr, opts); } -receiver connection::open_receiver(const std::string &addr, bool dynamic, handler *h) +receiver connection::open_receiver(const std::string &addr, const link_options &opts) { - return default_session().open_receiver(addr, dynamic, h); + return default_session().open_receiver(addr, opts); } endpoint::state connection::state() const { return pn_connection_state(pn_object()); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/src/container.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container.cpp b/proton-c/bindings/cpp/src/container.cpp index 1924cc0..2eaad07 100644 --- a/proton-c/bindings/cpp/src/container.cpp +++ b/proton-c/bindings/cpp/src/container.cpp @@ -21,6 +21,7 @@ #include "proton/container.hpp" #include "messaging_event.hpp" #include "proton/connection.hpp" +#include "proton/link_options.hpp" #include "proton/session.hpp" #include "proton/messaging_adapter.hpp" #include "proton/acceptor.hpp" @@ -58,12 +59,12 @@ std::string container::id() const { return impl_->id_; } void container::run() { impl_->reactor_.run(); } -sender container::open_sender(const proton::url &url) { - return impl_->open_sender(url); +sender container::open_sender(const proton::url &url, const proton::link_options &lo, const connection_options &co) { + return impl_->open_sender(url, lo, co); } -receiver container::open_receiver(const proton::url &url) { - return impl_->open_receiver(url); +receiver container::open_receiver(const proton::url &url, const proton::link_options &lo, const connection_options &co) { + return impl_->open_receiver(url, lo, co); } acceptor container::listen(const proton::url &url, const connection_options &opts) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/src/container_impl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container_impl.cpp b/proton-c/bindings/cpp/src/container_impl.cpp index b6e057e..5a50cdd 100644 --- a/proton-c/bindings/cpp/src/container_impl.cpp +++ b/proton-c/bindings/cpp/src/container_impl.cpp @@ -167,21 +167,29 @@ connection container_impl::connect(const proton::url &url, const connection_opti return conn; } -sender container_impl::open_sender(const proton::url &url) { - connection conn = connect(url, connection_options()); +sender container_impl::open_sender(const proton::url &url, const proton::link_options &o1, const connection_options &o2) { + proton::link_options lopts(link_options_); + lopts.override(o1); + connection_options copts(client_connection_options_); + copts.override(o2); + connection conn = connect(url, copts); std::string path = url.path(); - sender snd = conn.default_session().open_sender(id_ + '-' + path); + sender snd = conn.default_session().create_sender(id_ + '-' + path); snd.target().address(path); - snd.open(); + snd.open(lopts); return snd; } -receiver container_impl::open_receiver(const proton::url &url) { - connection conn = connect(url, connection_options()); +receiver container_impl::open_receiver(const proton::url &url, const proton::link_options &o1, const connection_options &o2) { + proton::link_options lopts(link_options_); + lopts.override(o1); + connection_options copts(client_connection_options_); + copts.override(o2); + connection conn = connect(url, copts); std::string path = url.path(); - receiver rcv = conn.default_session().open_receiver(id_ + '-' + path); + receiver rcv = conn.default_session().create_receiver(id_ + '-' + path); rcv.source().address(path); - rcv.open(); + rcv.open(lopts); return rcv; } @@ -225,6 +233,10 @@ void container_impl::server_connection_options(const connection_options &opts) { server_connection_options_ = opts; } +void container_impl::link_options(const proton::link_options &opts) { + link_options_ = opts; +} + void container_impl::configure_server_connection(connection &c) { pn_acceptor_t *pnp = pn_connection_acceptor(connection_options::pn_connection(c)); listener_context &lc(listener_context::get(pnp)); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/src/container_impl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container_impl.hpp b/proton-c/bindings/cpp/src/container_impl.hpp index 31ec0cf..66c3aa6 100644 --- a/proton-c/bindings/cpp/src/container_impl.hpp +++ b/proton-c/bindings/cpp/src/container_impl.hpp @@ -48,10 +48,8 @@ class container_impl PN_CPP_EXTERN container_impl(container&, handler *, const std::string& id); PN_CPP_EXTERN ~container_impl(); PN_CPP_EXTERN connection connect(const url&, const connection_options&); - PN_CPP_EXTERN sender open_sender(connection &connection, const std::string &addr, handler *h); - PN_CPP_EXTERN sender open_sender(const url&); - PN_CPP_EXTERN receiver open_receiver(connection &connection, const std::string &addr, bool dynamic, handler *h); - PN_CPP_EXTERN receiver open_receiver(const url&); + PN_CPP_EXTERN sender open_sender(const url&, const proton::link_options &, const connection_options &); + PN_CPP_EXTERN receiver open_receiver(const url&, const proton::link_options &, const connection_options &); PN_CPP_EXTERN class acceptor listen(const url&, const connection_options &); PN_CPP_EXTERN duration timeout(); PN_CPP_EXTERN void timeout(duration timeout); @@ -59,6 +57,8 @@ class container_impl const connection_options& client_connection_options() { return client_connection_options_; } void server_connection_options(const connection_options &); const connection_options& server_connection_options() { return server_connection_options_; } + void link_options(const proton::link_options&); + const proton::link_options& link_options() { return link_options_; } void configure_server_connection(connection &c); task schedule(int delay, handler *h); @@ -78,6 +78,7 @@ class container_impl uint64_t link_id_; connection_options client_connection_options_; connection_options server_connection_options_; + proton::link_options link_options_; friend class container; }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/src/link.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/link.cpp b/proton-c/bindings/cpp/src/link.cpp index cb66933..37337c0 100644 --- a/proton-c/bindings/cpp/src/link.cpp +++ b/proton-c/bindings/cpp/src/link.cpp @@ -31,7 +31,8 @@ namespace proton { -void link::open() { +void link::open(const link_options &lo) { + lo.apply(*this); pn_link_open(pn_object()); } @@ -59,9 +60,9 @@ int link::credit() const { return pn_link_credit(pn_object()); } -void link::flow(int credit) { - pn_link_flow(pn_object(), credit); -} +int link::queued() { return pn_link_queued(pn_object()); } +int link::unsettled() { return pn_link_unsettled(pn_object()); } +int link::drained() { return pn_link_drained(pn_object()); } terminus link::source() const { return pn_link_source(pn_object()); } terminus link::target() const { return pn_link_target(pn_object()); } @@ -107,5 +108,28 @@ link link::next(endpoint::state s) const return pn_link_next(pn_object(), s); } +link::sender_settle_mode_t link::sender_settle_mode() { + return (sender_settle_mode_t) pn_link_snd_settle_mode(pn_object()); +} + +void link::sender_settle_mode(sender_settle_mode_t mode) { + pn_link_set_snd_settle_mode(pn_object(), (pn_snd_settle_mode_t) mode); +} + +link::receiver_settle_mode_t link::receiver_settle_mode() { + return (receiver_settle_mode_t) pn_link_rcv_settle_mode(pn_object()); +} + +void link::receiver_settle_mode(receiver_settle_mode_t mode) { + pn_link_set_rcv_settle_mode(pn_object(), (pn_rcv_settle_mode_t) mode); +} + +link::sender_settle_mode_t link::remote_sender_settle_mode() { + return (sender_settle_mode_t) pn_link_remote_snd_settle_mode(pn_object()); } +link::receiver_settle_mode_t link::remote_receiver_settle_mode() { + return (receiver_settle_mode_t) pn_link_remote_rcv_settle_mode(pn_object()); +} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/src/link_options.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/link_options.cpp b/proton-c/bindings/cpp/src/link_options.cpp new file mode 100644 index 0000000..1eabdf4 --- /dev/null +++ b/proton-c/bindings/cpp/src/link_options.cpp @@ -0,0 +1,123 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/link_options.hpp" +#include "proton/link.hpp" +#include "msg.hpp" + + +namespace proton { + +template <class T> struct option { + T value; + bool set; + + option() : value(), set(false) {} + option& operator=(const T& x) { value = x; set = true; return *this; } + void override(const option<T>& x) { if (x.set) *this = x.value; } +}; + +class link_options::impl { + public: + option<class handler*> handler; + option<terminus::distribution_mode_t> distribution_mode; + option<bool> durable_subscription; + option<link_delivery_mode_t> delivery_mode; + option<bool> dynamic_address; + option<std::string> local_address; + + void apply(link& l) { + if (l.state() & endpoint::LOCAL_UNINIT) { + bool sender = !l.receiver(); + if (local_address.set) { + const char *addr = local_address.value.empty() ? NULL : local_address.value.c_str(); + if (sender) + l.target().address(addr); + else + l.source().address(addr); + } + if (delivery_mode.set) { + switch (delivery_mode.value) { + case AT_MOST_ONCE: + l.sender_settle_mode(link::SETTLED); + break; + case AT_LEAST_ONCE: + l.sender_settle_mode(link::UNSETTLED); + l.receiver_settle_mode(link::SETTLE_ALWAYS); + break; + default: + break; + } + } + if (handler.set) { + if (handler.value) + l.handler(*handler.value); + else + l.detach_handler(); + } + if (!sender) { + // receiver only options + if (distribution_mode.set) l.source().distribution_mode(distribution_mode.value); + if (durable_subscription.set && durable_subscription.value) { + l.source().durability(terminus::DELIVERIES); + l.source().expiry_policy(terminus::EXPIRE_NEVER); + } + if (dynamic_address.set) + l.source().dynamic(dynamic_address.value); + } + } + } + + void override(const impl& x) { + handler.override(x.handler); + distribution_mode.override(x.distribution_mode); + durable_subscription.override(x.durable_subscription); + delivery_mode.override(x.delivery_mode); + dynamic_address.override(x.dynamic_address); + local_address.override(x.local_address); + } + +}; + +link_options::link_options() : impl_(new impl()) {} +link_options::link_options(const link_options& x) : impl_(new impl()) { + *this = x; +} +link_options::~link_options() {} + +link_options& link_options::operator=(const link_options& x) { + *impl_ = *x.impl_; + return *this; +} + +void link_options::override(const link_options& x) { impl_->override(*x.impl_); } + +link_options& link_options::handler(class handler *h) { impl_->handler = h; return *this; } +link_options& link_options::browsing(bool b) { distribution_mode(b ? terminus::COPY : terminus::MOVE); return *this; } +link_options& link_options::distribution_mode(terminus::distribution_mode_t m) { impl_->distribution_mode = m; return *this; } +link_options& link_options::durable_subscription(bool b) {impl_->durable_subscription = b; return *this; } +link_options& link_options::delivery_mode(link_delivery_mode_t m) {impl_->delivery_mode = m; return *this; } +link_options& link_options::dynamic_address(bool b) {impl_->dynamic_address = b; return *this; } +link_options& link_options::local_address(const std::string &addr) {impl_->local_address = addr; return *this; } + +void link_options::apply(link& l) const { impl_->apply(l); } +handler* link_options::handler() const { return impl_->handler.value; } + +} // namespace proton http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/src/sender.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/sender.cpp b/proton-c/bindings/cpp/src/sender.cpp index 177e6f0..56f737b 100644 --- a/proton-c/bindings/cpp/src/sender.cpp +++ b/proton-c/bindings/cpp/src/sender.cpp @@ -54,4 +54,8 @@ delivery sender::send(const message &message) { return dlv; } +int sender::available() { return pn_link_available(pn_object()); } +void sender::offered(int c) { pn_link_offered(pn_object(), c); } + + } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/src/session.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/session.cpp b/proton-c/bindings/cpp/src/session.cpp index c6c2233..c851ef7 100644 --- a/proton-c/bindings/cpp/src/session.cpp +++ b/proton-c/bindings/cpp/src/session.cpp @@ -53,21 +53,18 @@ sender session::create_sender(const std::string& name) { return pn_sender(pn_object(), set_name(name, this).c_str()); } -sender session::open_sender(const std::string &addr, handler *h) { +sender session::open_sender(const std::string &addr, const link_options &lo) { sender snd = create_sender(); snd.target().address(addr); - if (h) snd.handler(*h); - snd.open(); + snd.open(lo); return snd; } -receiver session::open_receiver(const std::string &addr, bool dynamic, handler *h) +receiver session::open_receiver(const std::string &addr, const link_options &lo) { receiver rcv = create_receiver(); rcv.source().address(addr); - if (dynamic) rcv.source().dynamic(true); - if (h) rcv.handler(*h); - rcv.open(); + rcv.open(lo); return rcv; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/src/terminus.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/terminus.cpp b/proton-c/bindings/cpp/src/terminus.cpp index 1a8ed8a..cae148a 100644 --- a/proton-c/bindings/cpp/src/terminus.cpp +++ b/proton-c/bindings/cpp/src/terminus.cpp @@ -33,7 +33,7 @@ void terminus::type(type_t type) { } terminus::expiry_policy_t terminus::expiry_policy() const { - return expiry_policy_t(pn_terminus_get_type(object_)); + return expiry_policy_t(pn_terminus_get_expiry_policy(object_)); } void terminus::expiry_policy(expiry_policy_t policy) { @@ -41,13 +41,21 @@ void terminus::expiry_policy(expiry_policy_t policy) { } terminus::distribution_mode_t terminus::distribution_mode() const { - return distribution_mode_t(pn_terminus_get_type(object_)); + return distribution_mode_t(pn_terminus_get_distribution_mode(object_)); } void terminus::distribution_mode(distribution_mode_t mode) { pn_terminus_set_distribution_mode(object_, pn_distribution_mode_t(mode)); } +terminus::durability_t terminus::durability() { + return (durability_t) pn_terminus_get_durability(object_); +} + +void terminus::durability(durability_t mode) { + pn_terminus_set_durability(object_, (pn_durability_t) mode); +} + std::string terminus::address() const { const char *addr = pn_terminus_get_address(object_); return addr ? std::string(addr) : std::string(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
