Repository: qpid-proton Updated Branches: refs/heads/master b53a684e7 -> deccf354a
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/include/proton/sender.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/sender.hpp b/proton-c/bindings/cpp/include/proton/sender.hpp index 6f13abe..43c0747 100644 --- a/proton-c/bindings/cpp/include/proton/sender.hpp +++ b/proton-c/bindings/cpp/include/proton/sender.hpp @@ -25,6 +25,7 @@ #include "proton/export.hpp" #include "proton/link.hpp" #include "proton/message.hpp" +#include "proton/tracker.hpp" #include "proton/types.h" #include <string> @@ -33,8 +34,6 @@ struct pn_connection_t; namespace proton { -class tracker; - /// A link for sending messages. class PN_CPP_CLASS_EXTERN sender : public internal::link http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/include/proton/work_queue.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/work_queue.hpp b/proton-c/bindings/cpp/include/proton/work_queue.hpp new file mode 100644 index 0000000..1fb84ce --- /dev/null +++ b/proton-c/bindings/cpp/include/proton/work_queue.hpp @@ -0,0 +1,75 @@ +#ifndef PROTON_WORK_QUEUE_HPP +#define PROTON_WORK_QUEUE_HPP + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +pp * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <proton/handler.hpp> +#include <proton/connection_options.hpp> + +#include <functional> +#include <memory> + +namespace proton { + +class connection; + +/// A work_queue takes work (in the form of function objects) that will be be +/// serialized with other activity on a connection. Typically the work is a call +/// to user-defined member functions on the handler(s) associated with a +/// connection, which will be called serialized with respect to +/// proton::handler::on_* event functions. +/// +class work_queue : public std::enable_shared_from_this<work_queue> { + public: + work_queue(const work_queue&) = delete; + virtual ~work_queue() {} + + /// Get the work_queue associated with a connection. + /// @throw proton::error if this is not a controller-managed connection. + PN_CPP_EXTERN static std::shared_ptr<work_queue> get(const proton::connection&); + + /// push a function object on the queue to be invoked in a safely serialized + /// away. + /// + /// @return true if `f()` was pushed and will be called. False if the + /// work_queue is already closed and f() will never be called. + /// + /// Note 1: On returning true, the application can rely on f() being called + /// eventually. However f() should check the state when it executes as + /// links, sessions or even the connection may have closed by the time f() + /// is executed. + /// + /// Note 2: You must not push() in a handler or work_queue function on the + /// *same connection* as the work_queue you are pushing to. That could cause + /// a deadlock. + /// + virtual bool push(std::function<void()>) = 0; + + /// Get the controller associated with this work_queue. + virtual class controller& controller() const = 0; + + protected: + work_queue() {} +}; + +} + + +#endif // PROTON_WORK_QUEUE_HPP http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/connection_options.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/connection_options.cpp b/proton-c/bindings/cpp/src/connection_options.cpp index 4a0956b..19a494a 100644 --- a/proton-c/bindings/cpp/src/connection_options.cpp +++ b/proton-c/bindings/cpp/src/connection_options.cpp @@ -145,9 +145,11 @@ class connection_options::impl { }; connection_options::connection_options() : impl_(new impl()) {} + connection_options::connection_options(const connection_options& x) : impl_(new impl()) { *this = x; } + connection_options::~connection_options() {} connection_options& connection_options::operator=(const connection_options& x) { @@ -155,7 +157,16 @@ connection_options& connection_options::operator=(const connection_options& x) { return *this; } -void connection_options::update(const connection_options& x) { impl_->update(*x.impl_); } +connection_options& connection_options::update(const connection_options& x) { + impl_->update(*x.impl_); + return *this; +} + +connection_options connection_options::update(const connection_options& x) const { + connection_options copy(*this); + copy.update(x); + return copy; +} connection_options& connection_options::handler(class handler *h) { impl_->handler = h->messaging_adapter_.get(); return *this; } connection_options& connection_options::max_frame_size(uint32_t n) { impl_->max_frame_size = n; return *this; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/contexts.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/contexts.hpp b/proton-c/bindings/cpp/src/contexts.hpp index a60c1fa..0aa539e 100644 --- a/proton-c/bindings/cpp/src/contexts.hpp +++ b/proton-c/bindings/cpp/src/contexts.hpp @@ -40,6 +40,8 @@ struct pn_acceptor_t; namespace proton { class proton_handler; +class work_queue; + // Base class for C++ classes that are used as proton contexts. // Contexts are pn_objects managed by pn reference counts, the C++ value is allocated in-place. @@ -81,12 +83,13 @@ class context { // Connection context used by all connections. class connection_context : public context { public: - connection_context() : default_session(0) {} + connection_context() : default_session(0), work_queue(0) {} // Used by all connections pn_session_t *default_session; // Owned by connection. message event_message; // re-used by messaging_adapter for performance. id_generator link_gen; // Link name generator. + class work_queue* work_queue; // Work queue if this is proton::controller connection. internal::pn_unique_ptr<proton_handler> handler; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/controller.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/controller.cpp b/proton-c/bindings/cpp/src/controller.cpp new file mode 100644 index 0000000..73403c2 --- /dev/null +++ b/proton-c/bindings/cpp/src/controller.cpp @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "contexts.hpp" + +#include <proton/error.hpp> +#include <proton/controller.hpp> +#include <proton/work_queue.hpp> + +#include <proton/io/default_controller.hpp> + +#include <utility> +#include <memory> + +static proton::io::default_controller::make_fn make_default_controller; + +namespace proton { + +std::unique_ptr<controller> controller::create() { + if (!make_default_controller) + throw error("no default controller"); + return make_default_controller(); +} + +controller& controller::get(const proton::connection& c) { + return work_queue::get(c)->controller(); +} + +std::shared_ptr<work_queue> work_queue::get(const proton::connection& c) { + work_queue* wq = connection_context::get(c).work_queue; + if (!wq) + throw proton::error("connection has no controller"); + return wq->shared_from_this(); +} + +namespace io { +// Register a default controller factory. +default_controller::default_controller(default_controller::make_fn f) { + make_default_controller = f; +} +} // namespace io + +} // namespace proton http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/engine_test.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/engine_test.cpp b/proton-c/bindings/cpp/src/engine_test.cpp index d94abdc..a6fd71e 100644 --- a/proton-c/bindings/cpp/src/engine_test.cpp +++ b/proton-c/bindings/cpp/src/engine_test.cpp @@ -74,8 +74,6 @@ struct in_memory_engine : public connection_engine { /// A pair of engines that talk to each other in-memory. struct engine_pair { byte_stream ab, ba; - connection_engine::container cont; - in_memory_engine a, b; engine_pair(handler& ha, handler& hb, @@ -152,49 +150,7 @@ void test_engine_prefix() { ASSERT_EQUAL("y/1", quick_pop(hb.receivers).name()); } -void test_container_prefix() { - /// Let the container set the options. - record_handler ha, hb; - connection_engine::container ca("a"), cb("b"); - engine_pair e(ha, hb, ca.make_options(), cb.make_options()); - - ASSERT_EQUAL("a", e.a.connection().container_id()); - ASSERT_EQUAL("b", e.b.connection().container_id()); - - e.a.connection().open(); - sender s = e.a.connection().open_sender("x"); - ASSERT_EQUAL("1/1", s.name()); - - while (ha.senders.empty() || hb.receivers.empty()) e.process(); - - ASSERT_EQUAL("1/1", quick_pop(ha.senders).name()); - ASSERT_EQUAL("1/1", quick_pop(hb.receivers).name()); - - e.a.connection().open_receiver("y"); - while (ha.receivers.empty() || hb.senders.empty()) e.process(); - ASSERT_EQUAL("1/2", quick_pop(ha.receivers).name()); - ASSERT_EQUAL("1/2", quick_pop(hb.senders).name()); - - // Open a second connection in each container, make sure links have different IDs. - record_handler ha2, hb2; - engine_pair e2(ha2, hb2, ca.make_options(), cb.make_options()); - - ASSERT_EQUAL("a", e2.a.connection().container_id()); - ASSERT_EQUAL("b", e2.b.connection().container_id()); - - e2.b.connection().open(); - receiver r = e2.b.connection().open_receiver("z"); - ASSERT_EQUAL("2/1", r.name()); - - while (ha2.senders.empty() || hb2.receivers.empty()) e2.process(); - - ASSERT_EQUAL("2/1", quick_pop(ha2.senders).name()); - ASSERT_EQUAL("2/1", quick_pop(hb2.receivers).name()); -}; - void test_endpoint_close() { - // Make sure conditions are sent to the remote end. - record_handler ha, hb; engine_pair e(ha, hb); e.a.connection().open(); @@ -246,7 +202,6 @@ void test_transport_close() { int main(int, char**) { int failed = 0; RUN_TEST(failed, test_engine_prefix()); - RUN_TEST(failed, test_container_prefix()); RUN_TEST(failed, test_endpoint_close()); RUN_TEST(failed, test_transport_close()); return failed; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/io/connection_engine.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/io/connection_engine.cpp b/proton-c/bindings/cpp/src/io/connection_engine.cpp index e2a9356..ffef3b7 100644 --- a/proton-c/bindings/cpp/src/io/connection_engine.cpp +++ b/proton-c/bindings/cpp/src/io/connection_engine.cpp @@ -42,38 +42,7 @@ namespace proton { namespace io { -namespace { -std::string make_id(const std::string s="") { - return s.empty() ? uuid::random().str() : s; -} -} - -class connection_engine::container::impl { - public: - impl(const std::string s="") : id_(make_id(s)) {} - - const std::string id_; - id_generator id_gen_; - connection_options options_; -}; - -connection_engine::container::container(const std::string& s) : impl_(new impl(s)) {} - -connection_engine::container::~container() {} - -std::string connection_engine::container::id() const { return impl_->id_; } - -connection_options connection_engine::container::make_options() { - connection_options opts = impl_->options_; - opts.container_id(id()).link_prefix(impl_->id_gen_.next()+"/"); - return opts; -} - -void connection_engine::container::options(const connection_options &opts) { - impl_->options_ = opts; -} - -connection_engine::connection_engine(class handler &h, const connection_options& opts) : +connection_engine::connection_engine(class handler &h, const connection_options& opts): handler_(h), connection_(internal::take_ownership(pn_connection()).get()), transport_(internal::take_ownership(pn_transport()).get()), @@ -85,16 +54,12 @@ connection_engine::connection_engine(class handler &h, const connection_options& pn_connection_collect(connection_.pn_object(), collector_.get()); opts.apply(connection_); - // Provide defaults for connection_id and link_prefix if not set. - std::string cid = connection_.container_id(); - if (cid.empty()) { - cid = make_id(); - pn_connection_set_container(connection_.pn_object(), cid.c_str()); - } + // Provide local random defaults for connection_id and link_prefix if not by opts. + if (connection_.container_id().empty()) + pn_connection_set_container(connection_.pn_object(), uuid::random().str().c_str()); id_generator &link_gen = connection_context::get(connection_).link_gen; - if (link_gen.prefix().empty()) { - link_gen.prefix(make_id()+"/"); - } + if (link_gen.prefix().empty()) + link_gen.prefix(uuid::random().str()+"/"); } connection_engine::~connection_engine() { @@ -108,11 +73,15 @@ bool connection_engine::dispatch() { e; e = pn_collector_peek(collector_.get())) { - proton_event(e, 0).dispatch(h); + proton_event pe(e, 0); + try { + pe.dispatch(h); + } catch (const std::exception& e) { + close(error_condition("exception", e.what())); + } pn_collector_pop(collector_.get()); } - return !(pn_transport_closed(transport_.pn_object()) || - (connection().closed() && write_buffer().size == 0)); + return !(pn_transport_closed(transport_.pn_object())); } mutable_buffer connection_engine::read_buffer() { @@ -124,7 +93,8 @@ mutable_buffer connection_engine::read_buffer() { } void connection_engine::read_done(size_t n) { - pn_transport_process(transport_.pn_object(), n); + if (n > 0) + pn_transport_process(transport_.pn_object(), n); } void connection_engine::read_close() { @@ -140,7 +110,8 @@ const_buffer connection_engine::write_buffer() const { } void connection_engine::write_done(size_t n) { - pn_transport_pop(transport_.pn_object(), n); + if (n > 0) + pn_transport_pop(transport_.pn_object(), n); } void connection_engine::write_close() { @@ -161,4 +132,8 @@ proton::transport connection_engine::transport() const { return transport_; } +void connection_engine::work_queue(class work_queue* wq) { + connection_context::get(connection()).work_queue = wq; +} + }} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/io/posix/socket.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/io/posix/socket.cpp b/proton-c/bindings/cpp/src/io/posix/socket.cpp deleted file mode 100644 index 204b530..0000000 --- a/proton-c/bindings/cpp/src/io/posix/socket.cpp +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "msg.hpp" - -#include <proton/error_condition.hpp> -#include <proton/io/socket.hpp> -#include <proton/url.hpp> - -#include <errno.h> -#include <string.h> -#include <fcntl.h> -#include <netdb.h> -#include <sys/socket.h> -#include <sys/select.h> -#include <sys/types.h> -#include <unistd.h> - -namespace proton { -namespace io { -namespace socket { - -io_error::io_error(const std::string& s) : error(s) {} - -const descriptor INVALID_DESCRIPTOR = -1; - -std::string error_str() { - char buf[512] = "Unknown error"; -#ifdef _GNU_SOURCE - // GNU strerror_r returns the message - return ::strerror_r(errno, buf, sizeof(buf)); -#else - // POSIX strerror_r doesn't return the buffer - ::strerror_r(errno, buf, sizeof(buf)); - return std::string(buf) -#endif -} - -namespace { - -template <class T> T check(T result, const std::string& msg=std::string()) { - if (result < 0) throw io_error(msg + error_str()); - return result; -} - -void gai_check(int result, const std::string& msg="") { - if (result) throw io_error(msg + gai_strerror(result)); -} - -} - -void engine::init() { - check(fcntl(socket_, F_SETFL, fcntl(socket_, F_GETFL, 0) | O_NONBLOCK), "set nonblock: "); -} - -engine::engine(descriptor fd, handler& h, const connection_options &opts) - : connection_engine(h, opts), socket_(fd) -{ - init(); -} - -engine::engine(const url& u, handler& h, const connection_options& opts) - : connection_engine(h, opts), socket_(connect(u)) -{ - init(); - connection().open(); -} - -engine::~engine() {} - -void engine::read() { - mutable_buffer rbuf = read_buffer(); - if (rbuf.size > 0) { - ssize_t n = ::read(socket_, rbuf.data, rbuf.size); - if (n > 0) - read_done(n); - else if (n == 0) - read_close(); - else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) - close(error_condition("io_error", error_str())); - } -} - -void engine::write() { - const_buffer wbuf = write_buffer(); - if (wbuf.size > 0) { - ssize_t n = ::write(socket_, wbuf.data, wbuf.size); - if (n > 0) - write_done(n); - else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { - close(error_condition("io_error", error_str())); - } - } -} - -void engine::run() { - while (dispatch()) { - fd_set rd, wr; - FD_ZERO(&rd); - if (read_buffer().size) - FD_SET(socket_, &rd); - FD_ZERO(&wr); - if (write_buffer().size) - FD_SET(socket_, &wr); - int n = ::select(FD_SETSIZE, &rd, &wr, NULL, NULL); - if (n < 0) { - close(error_condition("select: ", error_str())); - break; - } - if (FD_ISSET(socket_, &rd)) { - read(); - } - if (FD_ISSET(socket_, &wr)) - write(); - } - ::close(socket_); -} - -namespace { -struct auto_addrinfo { - struct addrinfo *ptr; - auto_addrinfo() : ptr(0) {} - ~auto_addrinfo() { ::freeaddrinfo(ptr); } - addrinfo* operator->() const { return ptr; } -}; -} - -descriptor connect(const proton::url& u) { - descriptor fd = INVALID_DESCRIPTOR; - try{ - auto_addrinfo addr; - gai_check(::getaddrinfo(u.host().empty() ? 0 : u.host().c_str(), - u.port().empty() ? 0 : u.port().c_str(), - 0, &addr.ptr), u.str()+": "); - fd = check(::socket(addr->ai_family, SOCK_STREAM, 0), "connect: "); - check(::connect(fd, addr->ai_addr, addr->ai_addrlen), "connect: "); - return fd; - } catch (...) { - if (fd >= 0) close(fd); - throw; - } -} - -listener::listener(const std::string& host, const std::string &port) : socket_(INVALID_DESCRIPTOR) { - try { - auto_addrinfo addr; - gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(), - port.empty() ? 0 : port.c_str(), 0, &addr.ptr), - "listener address invalid: "); - socket_ = check(::socket(addr->ai_family, SOCK_STREAM, 0), "listen: "); - int yes = 1; - check(setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)), "setsockopt: "); - check(::bind(socket_, addr->ai_addr, addr->ai_addrlen), "bind: "); - check(::listen(socket_, 32), "listen: "); - } catch (...) { - if (socket_ >= 0) close(socket_); - throw; - } -} - -listener::~listener() { ::close(socket_); } - -descriptor listener::accept(std::string& host_str, std::string& port_str) { - struct sockaddr_storage addr; - socklen_t size = sizeof(addr); - int fd = check(::accept(socket_, (struct sockaddr *)&addr, &size), "accept: "); - char host[NI_MAXHOST], port[NI_MAXSERV]; - gai_check(getnameinfo((struct sockaddr *) &addr, sizeof(addr), - host, sizeof(host), port, sizeof(port), 0), - "accept invalid remote address: "); - host_str = host; - port_str = port; - return fd; -} - -// Empty stubs, only needed on windows. -void initialize() {} -void finalize() {} - -}}} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/io/windows/socket.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/io/windows/socket.cpp b/proton-c/bindings/cpp/src/io/windows/socket.cpp deleted file mode 100644 index f312525..0000000 --- a/proton-c/bindings/cpp/src/io/windows/socket.cpp +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "msg.hpp" - -#include <proton/io/socket.hpp> -#include <proton/url.hpp> - -#define FD_SETSIZE 2048 -#ifndef _WIN32_WINNT -#define _WIN32_WINNT 0x0501 -#endif -#if _WIN32_WINNT < 0x0501 -#error "Proton requires Windows API support for XP or later." -#endif -#include <winsock2.h> -#include <mswsock.h> -#include <Ws2tcpip.h> - -#include <ctype.h> -#include <errno.h> -#include <stdio.h> -#include <assert.h> - -namespace proton { -namespace io { -namespace socket { - -const descriptor INVALID_DESCRIPTOR = INVALID_SOCKET; - -std::string error_str() { - HRESULT code = WSAGetLastError(); - char err[1024] = {0}; - FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS | - FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL); - return err; -} - -io_error::io_error(const std::string& s) : error(s) {} - -namespace { - -template <class T> T check(T result, const std::string& msg=std::string()) { - if (result == SOCKET_ERROR) - throw io_error(msg + error_str()); - return result; -} - -void gai_check(int result, const std::string& msg="") { - if (result) - throw io_error(msg + gai_strerror(result)); -} - -} // namespace - -void initialize() { - WSADATA unused; - check(WSAStartup(0x0202, &unused), "can't load WinSock: "); // Version 2.2 -} - -void finalize() { - WSACleanup(); -} - -void engine::init() { - u_long nonblock = 1; - check(::ioctlsocket(socket_, FIONBIO, &nonblock), "ioctlsocket: "); -} - -engine::engine(descriptor fd, handler& h, const connection_options &opts) - : connection_engine(h, opts), socket_(fd) -{ - init(); -} - -engine::engine(const url& u, handler& h, const connection_options &opts) - : connection_engine(h, opts), socket_(connect(u)) -{ - init(); - connection().open(); -} - -engine::~engine() {} - -void engine::read() { - mutable_buffer rbuf = read_buffer(); - if (rbuf.size > 0) { - int n = ::recv(socket_, rbuf.data, rbuf.size, 0); - if (n > 0) - read_done(n); - else if (n == 0) - read_close(); - else if (n == SOCKET_ERROR && WSAGetLastError() != WSAEWOULDBLOCK) - close(error_condition("io_error", error_str())); - } -} - -void engine::write() { - const_buffer wbuf = write_buffer(); - if (wbuf.size > 0) { - int n = ::send(socket_, wbuf.data, wbuf.size, 0); - if (n > 0) - write_done(n); - else if (n == SOCKET_ERROR && WSAGetLastError() != WSAEWOULDBLOCK) - close(error_condition("io_error", error_str())); - } -} - -void engine::run() { - while (dispatch()) { - fd_set rd, wr; - FD_ZERO(&rd); - if (read_buffer().size) - FD_SET(socket_, &rd); - FD_ZERO(&wr); - if (write_buffer().size) - FD_SET(socket_, &wr); - int n = ::select(FD_SETSIZE, &rd, &wr, NULL, NULL); - if (n < 0) { - close(error_condition("select: ", error_str())); - break; - } - if (FD_ISSET(socket_, &rd)) { - read(); - } - if (FD_ISSET(socket_, &wr)) - write(); - } - ::closesocket(socket_); -} - -namespace { -struct auto_addrinfo { - struct addrinfo *ptr; - auto_addrinfo() : ptr(0) {} - ~auto_addrinfo() { ::freeaddrinfo(ptr); } - addrinfo* operator->() const { return ptr; } -}; - -static const char *amqp_service(const char *port) { - // Help older Windows to know about amqp[s] ports - if (port) { - if (!strcmp("amqp", port)) return "5672"; - if (!strcmp("amqps", port)) return "5671"; - } - return port; -} -} - - -descriptor connect(const proton::url& u) { - // convert "0.0.0.0" to "127.0.0.1" on Windows for outgoing sockets - std::string host = (u.host() == "0.0.0.0") ? "127.0.0.1" : u.host(); - descriptor fd = INVALID_SOCKET; - try{ - auto_addrinfo addr; - gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(), - amqp_service(u.port().empty() ? 0 : u.port().c_str()), - 0, &addr.ptr), - "connect address invalid: "); - fd = check(::socket(addr->ai_family, SOCK_STREAM, 0), "connect socket: "); - check(::connect(fd, addr->ai_addr, addr->ai_addrlen), "connect: "); - return fd; - } catch (...) { - if (fd != INVALID_SOCKET) ::closesocket(fd); - throw; - } -} - -listener::listener(const std::string& host, const std::string &port) : socket_(INVALID_SOCKET) { - try { - auto_addrinfo addr; - gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(), - port.empty() ? 0 : port.c_str(), 0, &addr.ptr), - "listener address invalid: "); - socket_ = check(::socket(addr->ai_family, SOCK_STREAM, 0), "listener socket: "); - bool yes = true; - check(setsockopt(socket_, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char*)&yes, sizeof(yes)), "setsockopt: "); - check(::bind(socket_, addr->ai_addr, addr->ai_addrlen), "listener bind: "); - check(::listen(socket_, 32), "listener listen: "); - } catch (...) { - if (socket_ != INVALID_SOCKET) ::closesocket(socket_); - throw; - } -} - -listener::~listener() { ::closesocket(socket_); } - -descriptor listener::accept(std::string& host_str, std::string& port_str) { - struct sockaddr_storage addr; - socklen_t size = sizeof(addr); - int fd = check(::accept(socket_, (struct sockaddr *)&addr, &size), "accept: "); - char host[NI_MAXHOST], port[NI_MAXSERV]; - gai_check(getnameinfo((struct sockaddr *) &addr, sizeof(addr), - host, sizeof(host), port, sizeof(port), 0), - "accept invalid remote address: "); - host_str = host; - port_str = port; - return fd; -} - -}}} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/messaging_adapter.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/messaging_adapter.cpp b/proton-c/bindings/cpp/src/messaging_adapter.cpp index 1981726..a1ba250 100644 --- a/proton-c/bindings/cpp/src/messaging_adapter.cpp +++ b/proton-c/bindings/cpp/src/messaging_adapter.cpp @@ -68,7 +68,10 @@ void messaging_adapter::on_link_flow(proton_event &pe) { pn_event_t *pne = pe.pn_event(); pn_link_t *lnk = pn_event_link(pne); sender s(lnk); - if (lnk && pn_link_is_sender(lnk) && pn_link_credit(lnk) > 0) { + int state = pn_link_state(lnk); + if (lnk && pn_link_is_sender(lnk) && pn_link_credit(lnk) > 0 && + (state&PN_LOCAL_ACTIVE) && (state&PN_REMOTE_ACTIVE)) + { // create on_message extended event delegate_.on_sendable(s); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/tests/tools/apps/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/tests/tools/apps/cpp/CMakeLists.txt b/tests/tools/apps/cpp/CMakeLists.txt index 0c120f2..2bc1bc5 100644 --- a/tests/tools/apps/cpp/CMakeLists.txt +++ b/tests/tools/apps/cpp/CMakeLists.txt @@ -17,7 +17,7 @@ # under the License. # -include_directories("${CMAKE_SOURCE_DIR}/examples/cpp") +include_directories("${CMAKE_SOURCE_DIR}/examples/cpp" "${CMAKE_SOURCE_DIR}/examples/cpp/lib") add_executable(reactor_send_cpp reactor_send.cpp) target_link_libraries(reactor_send_cpp qpid-proton qpid-proton-cpp) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/tests/tools/apps/cpp/reactor_send.cpp ---------------------------------------------------------------------- diff --git a/tests/tools/apps/cpp/reactor_send.cpp b/tests/tools/apps/cpp/reactor_send.cpp index a3dc003..d4045b4 100644 --- a/tests/tools/apps/cpp/reactor_send.cpp +++ b/tests/tools/apps/cpp/reactor_send.cpp @@ -114,7 +114,7 @@ int main(int argc, char **argv) { int message_count = 10; int message_size = 100; bool replying = false; - options opts(argc, argv); + example::options opts(argc, argv); opts.add_value(address, 'a', "address", "connect and send to URL", "URL"); opts.add_value(message_count, 'c', "messages", "send COUNT messages", "COUNT"); opts.add_value(message_size, 'b', "bytes", "send binary messages BYTES long", "BYTES"); @@ -124,7 +124,7 @@ int main(int argc, char **argv) { reactor_send send(address, message_count, message_size, replying); proton::container(send).run(); return 0; - } catch (const bad_option& e) { + } catch (const example::bad_option& e) { std::cout << opts << std::endl << e.what() << std::endl; } catch (const std::exception& e) { std::cerr << e.what() << std::endl; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
