Repository: qpid-proton Updated Branches: refs/heads/cjansen-cpp-client c4e2e596d -> 8e1757ebc
PROTON-865: C++ blocking receiver Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/4339b1c7 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/4339b1c7 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/4339b1c7 Branch: refs/heads/cjansen-cpp-client Commit: 4339b1c7a7d2813ec091578af5508885ab29f43c Parents: c4e2e59 Author: Clifford Jansen <[email protected]> Authored: Sun Jul 26 10:13:30 2015 -0700 Committer: Clifford Jansen <[email protected]> Committed: Sun Jul 26 10:13:30 2015 -0700 ---------------------------------------------------------------------- examples/cpp/helloworld_blocking.cpp | 35 ++----- proton-c/bindings/cpp/CMakeLists.txt | 2 + .../cpp/include/proton/blocking_connection.hpp | 3 + .../cpp/include/proton/blocking_receiver.hpp | 60 +++++++++++ .../bindings/cpp/include/proton/container.hpp | 5 +- .../bindings/cpp/include/proton/delivery.hpp | 3 +- proton-c/bindings/cpp/include/proton/event.hpp | 2 + .../cpp/include/proton/messaging_event.hpp | 1 + .../cpp/include/proton/proton_event.hpp | 1 + .../bindings/cpp/include/proton/receiver.hpp | 1 + .../bindings/cpp/src/blocking_connection.cpp | 19 ++++ .../cpp/src/blocking_connection_impl.hpp | 6 +- proton-c/bindings/cpp/src/blocking_link.cpp | 4 +- proton-c/bindings/cpp/src/blocking_receiver.cpp | 104 +++++++++++++++++++ proton-c/bindings/cpp/src/blocking_sender.cpp | 2 +- proton-c/bindings/cpp/src/connection_impl.cpp | 4 + proton-c/bindings/cpp/src/container.cpp | 5 +- proton-c/bindings/cpp/src/container_impl.cpp | 25 ++++- proton-c/bindings/cpp/src/container_impl.hpp | 3 +- proton-c/bindings/cpp/src/delivery.cpp | 4 + proton-c/bindings/cpp/src/event.cpp | 4 + proton-c/bindings/cpp/src/fetcher.cpp | 92 ++++++++++++++++ proton-c/bindings/cpp/src/fetcher.hpp | 58 +++++++++++ proton-c/bindings/cpp/src/messaging_event.cpp | 8 ++ proton-c/bindings/cpp/src/proton_event.cpp | 7 ++ proton-c/bindings/cpp/src/receiver.cpp | 3 + 26 files changed, 419 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/examples/cpp/helloworld_blocking.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/helloworld_blocking.cpp b/examples/cpp/helloworld_blocking.cpp index 2b6b4db..9f56fa2 100644 --- a/examples/cpp/helloworld_blocking.cpp +++ b/examples/cpp/helloworld_blocking.cpp @@ -22,43 +22,28 @@ #include "proton/container.hpp" #include "proton/messaging_handler.hpp" #include "proton/blocking_sender.hpp" +#include "proton/blocking_receiver.hpp" +#include "proton/duration.hpp" #include <iostream> -class hello_world_blocking : public proton::messaging_handler { - private: - proton::url url; - - public: - - hello_world_blocking(const proton::url& u) : url(u) {} - - void on_start(proton::event &e) { - proton::connection conn = e.container().connect(url); - e.container().create_receiver(conn, url.path()); - } - - void on_message(proton::event &e) { - std::cout << e.message().body() << std::endl; - e.connection().close(); - } - -}; - int main(int argc, char **argv) { try { proton::url url(argc > 1 ? argv[1] : "127.0.0.1:5672/examples"); - proton::blocking_connection conn(url); + proton::blocking_receiver receiver = conn.create_receiver(url.path()); proton::blocking_sender sender = conn.create_sender(url.path()); + proton::message m; m.body("Hello World!"); sender.send(m); - conn.close(); - // TODO Temporary hack until blocking receiver available - hello_world_blocking hw(url); - proton::container(hw).run(); + proton::duration timeout(30000); + proton::message m2 = receiver.receive(timeout); + std::cout << m2.body() << std::endl; + receiver.accept(); + + conn.close(); return 0; } catch (const std::exception& e) { std::cerr << e.what() << std::endl; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt index 6b56a8e..fcb0fb1 100644 --- a/proton-c/bindings/cpp/CMakeLists.txt +++ b/proton-c/bindings/cpp/CMakeLists.txt @@ -64,10 +64,12 @@ set(qpid-proton-cpp-source src/value.cpp src/values.cpp src/proton_bits.cpp + src/fetcher.cpp src/blocking_connection.cpp src/blocking_connection_impl.cpp src/blocking_link.cpp src/blocking_sender.cpp + src/blocking_receiver.cpp src/contexts.cpp src/types.cpp ) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/include/proton/blocking_connection.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/blocking_connection.hpp b/proton-c/bindings/cpp/include/proton/blocking_connection.hpp index 9aa456f..b6f0499 100644 --- a/proton-c/bindings/cpp/include/proton/blocking_connection.hpp +++ b/proton-c/bindings/cpp/include/proton/blocking_connection.hpp @@ -38,6 +38,7 @@ class container; class blocking_connection_impl; class ssl_domain; class blocking_sender; +class blocking_receiver; class wait_condition; // TODO documentation @@ -54,6 +55,8 @@ class blocking_connection : public handle<blocking_connection_impl> PN_CPP_EXTERN void close(); PN_CPP_EXTERN blocking_sender create_sender(const std::string &address, handler *h=0); + PN_CPP_EXTERN blocking_receiver create_receiver(const std::string &address, int credit = 0, + bool dynamic = false, std::string name = std::string()); PN_CPP_EXTERN void wait(wait_condition &condition); PN_CPP_EXTERN void wait(wait_condition &condition, std::string &msg, duration timeout=duration::FOREVER); PN_CPP_EXTERN duration timeout(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp b/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp new file mode 100644 index 0000000..319fd93 --- /dev/null +++ b/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp @@ -0,0 +1,60 @@ +#ifndef PROTON_CPP_BLOCKING_RECEIVER_HPP +#define PROTON_CPP_BLOCKING_RECEIVER_HPP + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/export.hpp" +#include "proton/container.hpp" +#include "proton/blocking_link.hpp" +#include "proton/duration.hpp" +#include "proton/messaging_handler.hpp" +#include "proton/types.h" +#include "proton/delivery.h" +#include <string> + +namespace proton { + +class blocking_connection; +class blocking_link; +class fetcher; + +class blocking_receiver : public blocking_link +{ + public: + PN_CPP_EXTERN blocking_receiver(const blocking_receiver&); + PN_CPP_EXTERN blocking_receiver& operator=(const blocking_receiver&); + PN_CPP_EXTERN ~blocking_receiver(); + PN_CPP_EXTERN message receive(); + PN_CPP_EXTERN message receive(duration timeout); + PN_CPP_EXTERN void accept(); + PN_CPP_EXTERN void reject(); + PN_CPP_EXTERN void release(bool delivered = true); + PN_CPP_EXTERN void settle(); + PN_CPP_EXTERN void settle(delivery::state state); + private: + blocking_receiver(blocking_connection &c, receiver &l, fetcher &f, int credit); + fetcher &fetcher_; + friend class blocking_connection; +}; + +} + +#endif /*!PROTON_CPP_BLOCKING_RECEIVER_HPP*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/include/proton/container.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp index ec55167..a0ca59a 100644 --- a/proton-c/bindings/cpp/include/proton/container.hpp +++ b/proton-c/bindings/cpp/include/proton/container.hpp @@ -81,9 +81,9 @@ class container : public handle<container_impl> PN_CPP_EXTERN sender create_sender(const proton::url &); /** Create a receiver on connection with target=addr and optional handler h */ - PN_CPP_EXTERN receiver create_receiver(connection &connection, const std::string &addr, handler *h=0); + PN_CPP_EXTERN receiver create_receiver(connection &connection, const std::string &addr, bool dynamic=false, handler *h=0); - /** Create a receiver on connection with source=addr and optional handler h */ + /** Create a receiver on connection with source=url.path() */ PN_CPP_EXTERN receiver create_receiver(const url &); /** Open a connection to url and create a receiver with source=url.path() */ @@ -103,6 +103,7 @@ class container : public handle<container_impl> PN_CPP_EXTERN void wakeup(); PN_CPP_EXTERN bool is_quiesced(); + PN_CPP_EXTERN void yield(); private: friend class private_impl_ref<container>; }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/include/proton/delivery.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/delivery.hpp b/proton-c/bindings/cpp/include/proton/delivery.hpp index 3f4dafc..3fa3069 100644 --- a/proton-c/bindings/cpp/include/proton/delivery.hpp +++ b/proton-c/bindings/cpp/include/proton/delivery.hpp @@ -56,7 +56,8 @@ class delivery : public proton_handle<pn_delivery_t> /** Settle the delivery, informs the remote end. */ PN_CPP_EXTERN void settle(); - // TODO aconway 2015-07-15: add update() here? + /** Set the local state of the delivery. */ + PN_CPP_EXTERN void update(delivery::state state); PN_CPP_EXTERN pn_delivery_t *pn_delivery(); private: http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/include/proton/event.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/event.hpp b/proton-c/bindings/cpp/include/proton/event.hpp index c4c5d7c..22866e1 100644 --- a/proton-c/bindings/cpp/include/proton/event.hpp +++ b/proton-c/bindings/cpp/include/proton/event.hpp @@ -52,6 +52,8 @@ class event { virtual PN_CPP_EXTERN class receiver receiver(); /// Get link @throws error if no link. virtual PN_CPP_EXTERN class link link(); + /// Get delivey @throws error if no delivery. + virtual PN_CPP_EXTERN class delivery delivery(); /// Get message @throws error if no message. virtual PN_CPP_EXTERN class message message(); /// Get message @throws error if no message. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/include/proton/messaging_event.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/messaging_event.hpp b/proton-c/bindings/cpp/include/proton/messaging_event.hpp index 950c389..e27003c 100644 --- a/proton-c/bindings/cpp/include/proton/messaging_event.hpp +++ b/proton-c/bindings/cpp/include/proton/messaging_event.hpp @@ -89,6 +89,7 @@ class messaging_event : public proton_event virtual PN_CPP_EXTERN class receiver receiver(); virtual PN_CPP_EXTERN class link link(); virtual PN_CPP_EXTERN class message message(); + virtual PN_CPP_EXTERN class delivery delivery(); virtual PN_CPP_EXTERN void message(class message &); PN_CPP_EXTERN event_type type() const; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/include/proton/proton_event.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/proton_event.hpp b/proton-c/bindings/cpp/include/proton/proton_event.hpp index d811ae5..1243717 100644 --- a/proton-c/bindings/cpp/include/proton/proton_event.hpp +++ b/proton-c/bindings/cpp/include/proton/proton_event.hpp @@ -274,6 +274,7 @@ class proton_event : public event virtual PN_CPP_EXTERN class sender sender(); virtual PN_CPP_EXTERN class receiver receiver(); virtual PN_CPP_EXTERN class link link(); + virtual PN_CPP_EXTERN class delivery delivery(); /** Get type of event */ PN_CPP_EXTERN event_type type(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/include/proton/receiver.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/receiver.hpp b/proton-c/bindings/cpp/include/proton/receiver.hpp index 4f2df0f..1974cd0 100644 --- a/proton-c/bindings/cpp/include/proton/receiver.hpp +++ b/proton-c/bindings/cpp/include/proton/receiver.hpp @@ -38,6 +38,7 @@ class receiver : public link PN_CPP_EXTERN receiver(pn_link_t *lnk); PN_CPP_EXTERN receiver(); PN_CPP_EXTERN receiver(const link& c); + PN_CPP_EXTERN void flow(int count); protected: PN_CPP_EXTERN virtual void verify_type(pn_link_t *l); }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/blocking_connection.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking_connection.cpp b/proton-c/bindings/cpp/src/blocking_connection.cpp index 02cec85..dd1aebb 100644 --- a/proton-c/bindings/cpp/src/blocking_connection.cpp +++ b/proton-c/bindings/cpp/src/blocking_connection.cpp @@ -21,9 +21,11 @@ #include "proton/container.hpp" #include "proton/blocking_connection.hpp" #include "proton/blocking_sender.hpp" +#include "proton/blocking_receiver.hpp" #include "proton/messaging_handler.hpp" #include "proton/url.hpp" #include "proton/error.hpp" +#include "fetcher.hpp" #include "msg.hpp" #include "blocking_connection_impl.hpp" #include "private_impl_ref.hpp" @@ -57,6 +59,23 @@ blocking_sender blocking_connection::create_sender(const std::string &address, h return blocking_sender(*this, sender); } +namespace { +struct fetcher_guard{ + fetcher_guard(fetcher &f) : fetcher_(f) { fetcher_.incref(); } + ~fetcher_guard() { fetcher_.decref(); } + fetcher& fetcher_; +}; +} + +blocking_receiver blocking_connection::create_receiver(const std::string &address, int credit, + bool dynamic, std::string name) { + fetcher *f = new fetcher(*this, credit); + fetcher_guard fg(*f); + receiver receiver = impl_->container_.create_receiver(impl_->connection_, address, dynamic, f); + blocking_receiver brcv(*this, receiver, *f, credit); + return brcv; +} + duration blocking_connection::timeout() { return impl_->timeout(); } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/blocking_connection_impl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking_connection_impl.hpp b/proton-c/bindings/cpp/src/blocking_connection_impl.hpp index edb08a6..7e8c031 100644 --- a/proton-c/bindings/cpp/src/blocking_connection_impl.hpp +++ b/proton-c/bindings/cpp/src/blocking_connection_impl.hpp @@ -1,5 +1,5 @@ -#ifndef PROTON_CPP_CONNECTIONIMPL_H -#define PROTON_CPP_CONNECTIONIMPL_H +#ifndef PROTON_CPP_BLOCKINGCONNECTIONIMPL_H +#define PROTON_CPP_BLOCKINGCONNECTIONIMPL_H /* * @@ -59,4 +59,4 @@ class ssl_domain; } -#endif /*!PROTON_CPP_CONNECTIONIMPL_H*/ +#endif /*!PROTON_CPP_BLOCKINGCONNECTIONIMPL_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/blocking_link.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking_link.cpp b/proton-c/bindings/cpp/src/blocking_link.cpp index b9f23c4..c7f3551 100644 --- a/proton-c/bindings/cpp/src/blocking_link.cpp +++ b/proton-c/bindings/cpp/src/blocking_link.cpp @@ -55,6 +55,7 @@ blocking_link::blocking_link(blocking_connection *c, pn_link_t *pnl) : connectio std::string msg = "Opening link " + link_.name(); link_opened link_opened(link_.pn_link()); connection_.wait(link_opened, msg); + check_closed(); } blocking_link::~blocking_link() {} @@ -70,8 +71,7 @@ void blocking_link::check_closed() { pn_link_t * pn_link = link_.pn_link(); if (pn_link_state(pn_link) & PN_REMOTE_CLOSED) { link_.close(); - // TODO: link_detached exception - throw error(MSG("link detached")); + throw error(MSG("Link detached: " << link_.name())); } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/blocking_receiver.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking_receiver.cpp b/proton-c/bindings/cpp/src/blocking_receiver.cpp new file mode 100644 index 0000000..042eb29 --- /dev/null +++ b/proton-c/bindings/cpp/src/blocking_receiver.cpp @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/blocking_receiver.hpp" +#include "proton/blocking_connection.hpp" +#include "proton/wait_condition.hpp" +#include "proton/error.hpp" +#include "fetcher.hpp" +#include "msg.hpp" + + +namespace proton { + +namespace { + +struct fetcher_has_message : public wait_condition { + fetcher_has_message(fetcher &f) : fetcher_(f) {} + bool achieved() { return fetcher_.has_message(); } + fetcher &fetcher_; +}; + +} // namespace + + +blocking_receiver::blocking_receiver(blocking_connection &c, receiver &l, fetcher &f, int credit) + : blocking_link(&c, l.pn_link()), fetcher_(f) { + std::string sa = link_.source().address(); + std::string rsa = link_.remote_source().address(); + if (sa.empty() || sa.compare(rsa) != 0) { + wait_for_closed(); + link_.close(); + std::string txt = "Failed to open receiver " + link_.name() + ", source does not match"; + throw error(MSG(txt)); + } + if (credit) + pn_link_flow(link_.pn_link(), credit); + fetcher_.incref(); +} + +blocking_receiver::blocking_receiver(const blocking_receiver& r) : blocking_link(r), fetcher_(r.fetcher_) { + fetcher_.incref(); +} +blocking_receiver& blocking_receiver::operator=(const blocking_receiver& r) { + if (this == &r) return *this; + fetcher_ = r.fetcher_; + fetcher_.incref(); + return *this; +} +blocking_receiver::~blocking_receiver() { fetcher_.decref(); } + + + +message blocking_receiver::receive(duration timeout) { + receiver rcv = link_; + if (!rcv.credit()) + rcv.flow(1); + std::string txt = "Receiving on receiver " + link_.name(); + fetcher_has_message cond(fetcher_); + connection_.wait(cond, txt, timeout); + return fetcher_.pop(); +} + +message blocking_receiver::receive() { + // Use default timeout + return receive(connection_.timeout()); +} + +void blocking_receiver::accept() { + settle(delivery::ACCEPTED); +} + +void blocking_receiver::reject() { + settle(delivery::REJECTED); +} + +void blocking_receiver::release(bool delivered) { + if (delivered) + settle(delivery::MODIFIED); + else + settle(delivery::RELEASED); +} + +void blocking_receiver::settle(delivery::state state = delivery::NONE) { + fetcher_.settle(state); +} + +} // namespace http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/blocking_sender.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking_sender.cpp b/proton-c/bindings/cpp/src/blocking_sender.cpp index 2ab1ef1..8fea98a 100644 --- a/proton-c/bindings/cpp/src/blocking_sender.cpp +++ b/proton-c/bindings/cpp/src/blocking_sender.cpp @@ -44,7 +44,7 @@ blocking_sender::blocking_sender(blocking_connection &c, sender &l) : blocking_l wait_for_closed(); link_.close(); std::string txt = "Failed to open sender " + link_.name() + ", target does not match"; - throw error(MSG("container not started")); + throw error(MSG(txt)); } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/connection_impl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/connection_impl.cpp b/proton-c/bindings/cpp/src/connection_impl.cpp index 51da569..e2f4608 100644 --- a/proton-c/bindings/cpp/src/connection_impl.cpp +++ b/proton-c/bindings/cpp/src/connection_impl.cpp @@ -103,6 +103,10 @@ connection &connection_impl::connection() { container &connection_impl::container() { return (container_); } + +// TODO: Rework this. Rename and document excellently for user handlers on connections. +// Better: provide general solution for handlers that delete before the C reactor object +// has finished sending events. void connection_impl::reactor_detach() { // "save" goes out of scope last, preventing possible recursive destructor // confusion with reactor_reference. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/container.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container.cpp b/proton-c/bindings/cpp/src/container.cpp index 07cb971..a424c0b 100644 --- a/proton-c/bindings/cpp/src/container.cpp +++ b/proton-c/bindings/cpp/src/container.cpp @@ -72,8 +72,8 @@ sender container::create_sender(const proton::url &url) { return impl_->create_sender(url); } -receiver container::create_receiver(connection &connection, const std::string &addr, handler *h) { - return impl_->create_receiver(connection, addr, h); +receiver container::create_receiver(connection &connection, const std::string &addr, bool dynamic, handler *h) { + return impl_->create_receiver(connection, addr, dynamic, h); } receiver container::create_receiver(const proton::url &url) { @@ -91,5 +91,6 @@ bool container::process() { return impl_->process(); } void container::stop() { impl_->stop(); } void container::wakeup() { impl_->wakeup(); } bool container::is_quiesced() { return impl_->is_quiesced(); } +void container::yield() { impl_->yield(); } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/container_impl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container_impl.cpp b/proton-c/bindings/cpp/src/container_impl.cpp index 0858d73..e328251 100644 --- a/proton-c/bindings/cpp/src/container_impl.cpp +++ b/proton-c/bindings/cpp/src/container_impl.cpp @@ -148,6 +148,9 @@ void cpp_handler_dispatch(pn_handler_t *c_handler, pn_event_t *cevent, pn_event_ container c(inbound_context::get(c_handler)->container_impl_); messaging_event mevent(cevent, type, c); mevent.dispatch(*inbound_context::get(c_handler)->cpp_handler_); + // Possible decref and deletion via a handler action on this event. + // return without further processing. + return; } void cpp_handler_cleanup(pn_handler_t *c_handler) @@ -199,7 +202,7 @@ container_impl::~container_impl() { connection container_impl::connect(const proton::url &url, handler *h) { if (!reactor_) throw error(MSG("container not started")); container ctnr(this); - connection conn(ctnr, handler_); + connection conn(ctnr, h); connector *ctor = new connector(conn); // connector self-deletes depending on reconnect logic ctor->address(url); // TODO: url vector @@ -238,7 +241,9 @@ sender container_impl::create_sender(connection &connection, const std::string & pn_terminus_set_address(pn_link_target(lnk), addr.c_str()); if (h) { pn_record_t *record = pn_link_attachments(lnk); - pn_record_set_handler(record, wrap_handler(h)); + pn_handler_t *chandler = wrap_handler(h); + pn_record_set_handler(record, chandler); + pn_decref(chandler); } snd.open(); return snd; @@ -255,16 +260,21 @@ sender container_impl::create_sender(const proton::url &url) { return snd; } -receiver container_impl::create_receiver(connection &connection, const std::string &addr, handler *h) { +receiver container_impl::create_receiver(connection &connection, const std::string &addr, bool dynamic, handler *h) { if (!reactor_) throw error(MSG("container not started")); connection_impl *conn_impl = impl(connection); session session = default_session(conn_impl->pn_connection_, &conn_impl->default_session_); receiver rcv = session.create_receiver(container_id_ + '-' + addr); pn_link_t *lnk = rcv.pn_link(); - pn_terminus_set_address(pn_link_source(lnk), addr.c_str()); + pn_terminus_t *src = pn_link_source(lnk); + pn_terminus_set_address(src, addr.c_str()); + if (dynamic) + pn_terminus_set_dynamic(src, true); if (h) { pn_record_t *record = pn_link_attachments(lnk); - pn_record_set_handler(record, wrap_handler(h)); + pn_handler_t *chandler = wrap_handler(h); + pn_record_set_handler(record, chandler); + pn_decref(chandler); } rcv.open(); return rcv; @@ -360,4 +370,9 @@ bool container_impl::is_quiesced() { return pn_reactor_quiesced(reactor_); } +void container_impl::yield() { + if (!reactor_) throw error(MSG("container not started")); + pn_reactor_yield(reactor_); +} + } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/container_impl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container_impl.hpp b/proton-c/bindings/cpp/src/container_impl.hpp index 4378f7d..1ce27ee 100644 --- a/proton-c/bindings/cpp/src/container_impl.hpp +++ b/proton-c/bindings/cpp/src/container_impl.hpp @@ -48,7 +48,7 @@ class container_impl PN_CPP_EXTERN pn_reactor_t *reactor(); PN_CPP_EXTERN sender create_sender(connection &connection, const std::string &addr, handler *h); PN_CPP_EXTERN sender create_sender(const url&); - PN_CPP_EXTERN receiver create_receiver(connection &connection, const std::string &addr, handler *h); + PN_CPP_EXTERN receiver create_receiver(connection &connection, const std::string &addr, bool dynamic, handler *h); PN_CPP_EXTERN receiver create_receiver(const url&); PN_CPP_EXTERN class acceptor listen(const url&); PN_CPP_EXTERN std::string container_id(); @@ -59,6 +59,7 @@ class container_impl void stop(); void wakeup(); bool is_quiesced(); + void yield(); pn_handler_t *wrap_handler(handler *h); static void incref(container_impl *); static void decref(container_impl *); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/delivery.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/delivery.cpp b/proton-c/bindings/cpp/src/delivery.cpp index 7ff596f..6c87939 100644 --- a/proton-c/bindings/cpp/src/delivery.cpp +++ b/proton-c/bindings/cpp/src/delivery.cpp @@ -52,6 +52,10 @@ void delivery::settle() { pn_delivery_settle(impl_); } +void delivery::update(delivery::state state) { + pn_delivery_update(impl_, state); +} + pn_delivery_t *delivery::pn_delivery() { return impl_; } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/event.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/event.cpp b/proton-c/bindings/cpp/src/event.cpp index e12d19c..2affed1 100644 --- a/proton-c/bindings/cpp/src/event.cpp +++ b/proton-c/bindings/cpp/src/event.cpp @@ -57,6 +57,10 @@ class link event::link() { throw error(MSG("No link context for event")); } +class delivery event::delivery() { + throw error(MSG("No link context for event")); +} + class message event::message() { throw error(MSG("No message associated with event")); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/fetcher.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/fetcher.cpp b/proton-c/bindings/cpp/src/fetcher.cpp new file mode 100644 index 0000000..46ee18c --- /dev/null +++ b/proton-c/bindings/cpp/src/fetcher.cpp @@ -0,0 +1,92 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "fetcher.hpp" +#include "proton/event.hpp" + +namespace proton { + +fetcher::fetcher(blocking_connection &c, int prefetch) : + messaging_handler(prefetch, false), // no auto_accept + connection_(c), refcount_(0), pn_link_(0) { +} + +void fetcher::incref() { refcount_++; } +void fetcher::decref() { + refcount_--; + if (!refcount_) { + // fetcher needs to outlive its blocking_receiver unless disconnected from reactor + if (pn_link_) { + pn_record_set_handler(pn_link_attachments(pn_link_), 0); + pn_decref(pn_link_); + } + delete this; + return; + } +} + +void fetcher::on_link_init(event &e) { + pn_link_ = e.link().pn_link(); + pn_incref(pn_link_); +} + +void fetcher::on_message(event &e) { + messages_.push_back(e.message()); + deliveries_.push_back(e.delivery()); + // Wake up enclosing blocking_connection.wait() + e.container().yield(); +} + +void fetcher::on_link_error(event &e) { + link lnk = e.link(); + if (pn_link_state(lnk.pn_link()) & PN_LOCAL_ACTIVE) { + lnk.close(); + throw error(MSG("Link detached: " << lnk.name())); + } +} + +void fetcher::on_connection_error(event &e) { + throw error(MSG("Connection closed")); +} + +bool fetcher::has_message() { + return !messages_.empty(); +} + +message fetcher::pop() { + if (messages_.empty()) + throw error(MSG("blocking_receiver has no messages")); + delivery &dlv(deliveries_.front()); + if (!dlv.settled()) + unsettled_.push_back(dlv); + message m = messages_.front(); + messages_.pop_front(); + deliveries_.pop_front(); + return m; +} + +void fetcher::settle(delivery::state state) { + delivery &dlv = unsettled_.front(); + if (state) + dlv.update(state); + dlv.settle(); +} + +} // namespace http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/fetcher.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/fetcher.hpp b/proton-c/bindings/cpp/src/fetcher.hpp new file mode 100644 index 0000000..85200fe --- /dev/null +++ b/proton-c/bindings/cpp/src/fetcher.hpp @@ -0,0 +1,58 @@ +#ifndef PROTON_CPP_FETCHER_H +#define PROTON_CPP_FETCHER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/container.hpp" +#include "proton/messaging_handler.hpp" +#include "proton/blocking_connection.hpp" +#include "proton/error.hpp" +#include "msg.hpp" +#include <string> +#include <deque> + +namespace proton { + +class fetcher : public messaging_handler { + private: + blocking_connection connection_; + std::deque<message> messages_; + std::deque<delivery> deliveries_; + std::deque<delivery> unsettled_; + int refcount_; + pn_link_t *pn_link_; + public: + fetcher(blocking_connection &c, int p); + void incref(); + void decref(); + void on_message(event &e); + void on_link_error(event &e); + void on_connection_error(event &e); + void on_link_init(event &e); + bool has_message(); + message pop(); + void settle(delivery::state state = delivery::NONE); +}; + + +} + +#endif /*!PROTON_CPP_FETCHER_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/messaging_event.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/messaging_event.cpp b/proton-c/bindings/cpp/src/messaging_event.cpp index d79974d..5062d76 100644 --- a/proton-c/bindings/cpp/src/messaging_event.cpp +++ b/proton-c/bindings/cpp/src/messaging_event.cpp @@ -82,6 +82,14 @@ link messaging_event::link() { throw error(MSG("No link context for event")); } +delivery messaging_event::delivery() { + if (type_ == messaging_event::PROTON) + return proton_event::delivery(); + if (parent_event_) + return parent_event_->delivery(); + throw error(MSG("No delivery context for event")); +} + message messaging_event::message() { if (parent_event_) { pn_message_t *m = event_context(parent_event_->pn_event()); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/proton_event.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proton_event.cpp b/proton-c/bindings/cpp/src/proton_event.cpp index 71f2a08..46b43ee 100644 --- a/proton-c/bindings/cpp/src/proton_event.cpp +++ b/proton-c/bindings/cpp/src/proton_event.cpp @@ -78,6 +78,13 @@ link proton_event::link() { throw error(MSG("No link context for this event")); } +delivery proton_event::delivery() { + pn_delivery_t *dlv = pn_event_delivery(pn_event()); + if (dlv) + return proton::delivery(dlv); + throw error(MSG("No delivery context for this event")); +} + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/receiver.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/receiver.cpp b/proton-c/bindings/cpp/src/receiver.cpp index 84412e6..b9ed680 100644 --- a/proton-c/bindings/cpp/src/receiver.cpp +++ b/proton-c/bindings/cpp/src/receiver.cpp @@ -40,5 +40,8 @@ void receiver::verify_type(pn_link_t *lnk) { throw error(MSG("Creating receiver with sender context")); } +void receiver::flow(int count) { + pn_link_flow(pn_link(), count); +} } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
