PROTON-1046: C++ multi-threaded controller and improved broker example A complete portable multi-threaded API for proton that can be implemented on an threading/IO platform.
API: - proton::controller: A multi-threaded alternative to the proton::container. - proton::work_queue: async functions serialized per-connection. Examples: - mt/epoll_controller.hpp: controller/work_queue implemented using native Linux epoll. - mt/broker.cpp: multi-threaded broker, portable over any controller implementation. - illustrates multi-threading, use of work_queue, remote shutdown TODO: - Examples and implementations for non-Linux platforms. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/deccf354 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/deccf354 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/deccf354 Branch: refs/heads/master Commit: deccf354a653e2106f40cdd59df9b67b74911e8b Parents: b53a684 Author: Alan Conway <[email protected]> Authored: Thu Mar 31 17:12:18 2016 -0400 Committer: Alan Conway <[email protected]> Committed: Wed Apr 27 10:39:59 2016 -0400 ---------------------------------------------------------------------- config.sh.in | 2 +- examples/cpp/CMakeLists.txt | 25 +- examples/cpp/README.dox | 99 ++-- examples/cpp/broker.cpp | 4 +- examples/cpp/client.cpp | 4 +- examples/cpp/direct_recv.cpp | 4 +- examples/cpp/direct_send.cpp | 4 +- examples/cpp/engine/CMakeLists.txt | 37 -- examples/cpp/engine/broker.cpp | 176 ------- examples/cpp/engine/client.cpp | 103 ---- examples/cpp/engine/direct_recv.cpp | 79 --- examples/cpp/engine/direct_send.cpp | 91 ---- examples/cpp/engine/helloworld.cpp | 68 --- examples/cpp/engine/options.hpp | 173 ------- examples/cpp/engine/server.cpp | 90 ---- examples/cpp/engine/simple_recv.cpp | 85 --- examples/cpp/engine/simple_send.cpp | 93 ---- examples/cpp/example/socket_windows.cpp | 218 ++++++++ examples/cpp/example_test.py | 106 ++-- examples/cpp/mt/broker.cpp | 280 ++++++++++ examples/cpp/mt/epoll_controller.cpp | 517 +++++++++++++++++++ examples/cpp/options.hpp | 2 + examples/cpp/recurring_timer.cpp | 4 +- examples/cpp/server.cpp | 4 +- examples/cpp/server_direct.cpp | 4 +- examples/cpp/simple_recv.cpp | 4 +- examples/cpp/simple_send.cpp | 4 +- examples/cpp/tutorial.dox | 403 +++++++++++++++ proton-c/bindings/cpp/CMakeLists.txt | 13 +- proton-c/bindings/cpp/cpp.cmake | 3 + proton-c/bindings/cpp/docs/mainpage.md | 152 +++--- proton-c/bindings/cpp/docs/mt_page.md | 21 + proton-c/bindings/cpp/docs/tutorial.dox | 428 --------------- proton-c/bindings/cpp/docs/user.doxygen.in | 3 +- .../cpp/include/proton/connection_options.hpp | 9 +- .../bindings/cpp/include/proton/controller.hpp | 118 +++++ proton-c/bindings/cpp/include/proton/error.hpp | 7 +- .../bindings/cpp/include/proton/handler.hpp | 12 + .../cpp/include/proton/io/connection_engine.hpp | 88 ++-- .../include/proton/io/default_controller.hpp | 47 ++ .../bindings/cpp/include/proton/io/socket.hpp | 130 ----- proton-c/bindings/cpp/include/proton/sender.hpp | 3 +- .../bindings/cpp/include/proton/work_queue.hpp | 75 +++ .../bindings/cpp/src/connection_options.cpp | 13 +- proton-c/bindings/cpp/src/contexts.hpp | 5 +- proton-c/bindings/cpp/src/controller.cpp | 59 +++ proton-c/bindings/cpp/src/engine_test.cpp | 45 -- .../bindings/cpp/src/io/connection_engine.cpp | 67 +-- proton-c/bindings/cpp/src/io/posix/socket.cpp | 196 ------- proton-c/bindings/cpp/src/io/windows/socket.cpp | 218 -------- proton-c/bindings/cpp/src/messaging_adapter.cpp | 5 +- tests/tools/apps/cpp/CMakeLists.txt | 2 +- tests/tools/apps/cpp/reactor_send.cpp | 4 +- 53 files changed, 2054 insertions(+), 2352 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/config.sh.in ---------------------------------------------------------------------- diff --git a/config.sh.in b/config.sh.in index 744ddb3..5eb779b 100755 --- a/config.sh.in +++ b/config.sh.in @@ -73,7 +73,7 @@ export LD_LIBRARY_PATH="$(merge_paths $PROTON_BUILD/proton-c $LD_LIBRARY_PATH)" export PATH="$(merge_paths $PATH $PROTON_BUILD/tests/tools/apps/c $PROTON_HOME/tests/tools/apps/python $PROTON_HOME/tests/python)" # can the test harness use valgrind? -if [[ -x "$(type -p valgrind)" ]] ; then +if [[ -x "$(type -p valgrind)" && "@ENABLE_VALGRIND" == "ON" ]] ; then export VALGRIND=$(type -p valgrind) fi http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index 4f6b742..3a81718 100644 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -20,7 +20,10 @@ find_package(ProtonCpp REQUIRED) include_directories(${ProtonCpp_INCLUDE_DIRS}) +link_libraries(${ProtonCpp_LIBRARIES}) +add_compile_options(${CXX_WARNING_FLAGS}) +# Single-threaded examples. foreach(example broker helloworld @@ -40,12 +43,9 @@ foreach(example ssl_client_cert encode_decode) add_executable(${example} ${example}.cpp) - target_link_libraries(${example} ${ProtonCpp_LIBRARIES}) - set_source_files_properties(${example}.cpp PROPERTIES COMPILE_FLAGS "${CXX_WARNING_FLAGS}") endforeach() -add_subdirectory(engine) - +# Python test runner set(env_py ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py) function(set_test_path dir) @@ -61,7 +61,16 @@ set_test_path("$<TARGET_FILE_DIR:broker>") add_test(NAME cpp_container_example_test COMMAND ${env_py} -- "PATH=${test_path}" ${VALGRIND_ENV} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v ContainerExampleTest) -set_test_path("$<TARGET_FILE_DIR:engine-broker>") - -add_test(NAME cpp_engine_example_test - COMMAND ${env_py} -- "PATH=${test_path}" ${VALGRIND_ENV} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v ConnectionEngineExampleTest) +# TODO aconway 2016-04-26: need portable MT and IO examples. +if(CMAKE_SYSTEM_NAME STREQUAL "Linux" AND BUILD_CPP_MT) + set(controller_src mt/epoll_controller.cpp) + foreach(example + broker + ) + add_executable(mt_${example} mt/${example}.cpp ${controller_src}) + target_link_libraries(mt_${example} pthread) + set_target_properties(mt_${example} PROPERTIES CXX_STANDARD 11) + endforeach() + add_test(NAME cpp_mt_example_test + COMMAND ${env_py} -- "PATH=${test_path}" ${VALGRIND_ENV} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v MtBrokerTest) +endif() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/README.dox ---------------------------------------------------------------------- diff --git a/examples/cpp/README.dox b/examples/cpp/README.dox index 1e78774..d545366 100644 --- a/examples/cpp/README.dox +++ b/examples/cpp/README.dox @@ -1,15 +1,22 @@ -// Examples overview. +// C++ examples list (doxygen format) // -// For a better overview, see the tutorial in the generated documentation. -// -// In your build directory do: +// For a tutorial-style description of the examples see tutorial.dox. +// To build the full HTML tutorial and documentation, in your build directory do: // // make docs-cpp // // then open proton-c/bindings/cpp/docs/html/tutorial.html in your browser. -// DEVELOPER NOTE: if you are adding or modifying examples you should keep this -// file and ../proton-c/bindings/cpp/docs/tutorial.hpp up to date. +// DEVELOPER NOTE: if you add or modify examples, please add/update a short +// description below and (if appropriate) extend/update tutorial.dox. + +/** example sub directory + +The example sub-directory has utilities classes to make the example simpler, +these classes are not directly related to the use of proton so are in a separate +`example` directory and namespace. + +*/ /** @example helloworld.cpp @@ -46,7 +53,7 @@ on 127.0.0.1:5672. Simply prints out the body of received messages. /** @example direct_send.cpp Accepts an incoming connection and then sends like `simple_send`. You can -connect directly to `direct_send` *without* a broker using \ref simple_recv.cpp. +connect directly to `direct_send` *without* a broker using @ref simple_recv.cpp. Make sure to stop the broker first or use a different port for `direct_send`. */ @@ -54,7 +61,7 @@ Make sure to stop the broker first or use a different port for `direct_send`. /** @example direct_recv.cpp Accepts an incoming connection and then receives like `simple_recv`. You can -connect directly to `direct_recv` *without* a broker using \ref simple_send.cpp. +connect directly to `direct_recv` *without* a broker using @ref simple_send.cpp. Make sure to stop the broker first or use a different port for `direct_recv`. */ @@ -108,9 +115,6 @@ automatically when a client tries to send or subscribe. This file contains the `queue` class that queues messages and the `broker_handler` class that manages queues and links and transfers messages to/from clients. -Examples \ref broker.cpp and \ref engine/broker.cpp use this same -broker logic but show different ways to run it in a server application. - */ /** @example broker.cpp @@ -120,79 +124,40 @@ to run other examples that reqiure an intermediary, or you can use any AMQP 1.0 broker. This broker creates queues automatically when a client tries to send or subscribe. -Uses the broker logic from \ref broker.hpp, the same logic as the -`proton::connection_engine` broker example \ref engine/broker.cpp. - */ -//////////////// connection_engine examples. +/** @example mt/epoll_controller.cpp -/** \example engine/helloworld.cpp +An example implementation of the proton::mt::controller API that shows how to +use the prton::io::connection_engine SPI to adapt the proton API to native +IO. In this case using a multi-threaded Linux epoll poller as the implementation. -`proton::connection_engine` example to send a "Hello World" message to -itself. Compare with the corresponding `proton::container` example \ref -helloworld.cpp. +__Requires C++11__ */ -/** \example engine/simple_send.cpp +/** @example mt/broker.cpp -`proton::connection_engine` example of sending a fixed number of messages and -tracking their (asynchronous) acknowledgement. Messages are sent through the -'examples' node on an intermediary accessible on 127.0.0.1:5672. +A multi-threaded broker, using the proton::mt extensions. This broker is +portable over any implementation of the proton::mt API, see @ref +mt/epoll_controller.cpp for an example. -*/ - -/** \example engine/simple_recv.cpp - -`proton::connection_engine` example that subscribes to the 'examples' node and prints - the body of received messages. +__Requires C++11__ */ -/** \example engine/direct_send.cpp +/** @example mt/simple_send.cpp -`proton::connection_engine` example accepts an incoming connection and then -sends like `simple_send`. You can connect directly to `direct_send` *without* a -broker using \ref simple_recv.cpp. Make sure to stop the broker first or use a -different port for `direct_send`. +A multi-threaded sender client. Sends messages concurrently to multiple addresses. -*/ - -/** \example engine/direct_recv.cpp - -`proton::connection_engine` example accepts an incoming connection and then -receives like `simple_recv`. You can connect directly to `direct_recv` -*without* a broker using \ref simple_send.cpp. Make sure to stop the broker -first or use a different port for `direct_recv`. +__Requires C++11__ */ -/** \example engine/client.cpp +/** @example mt/simple_recv.cpp -`proton::connection_engine` client for request-response example. Sends requests and -prints out responses. Requires an intermediary that supports the AMQP 1.0 -dynamic nodes on which the responses are received. The requests are sent through -the 'examples' node. +A multi-threaded receiver client. Receives messages concurrently to multiple addresses. -*/ +__Requires C++11__ -/** \example engine/server.cpp - -`proton::connection_engine` server for request-response example, that receives -requests via the examples node, converts the body to uppercase and sends the -result back to the indicated reply address. - -*/ - -/** \example engine/broker.cpp - -A simple, single-threaded broker using the `proton::container`. You can use this -to run other examples that reqiure an intermediary, or you can use any AMQP 1.0 -broker. This broker creates queues automatically when a client tries to send or -subscribe. - -Uses the broker logic from \ref broker.hpp, the same logic as the -proton::container` broker example \ref broker.cpp. - -*/ +*/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/broker.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/broker.cpp b/examples/cpp/broker.cpp index 37839c6..a19997f 100644 --- a/examples/cpp/broker.cpp +++ b/examples/cpp/broker.cpp @@ -61,7 +61,7 @@ class broker { int main(int argc, char **argv) { proton::url url("0.0.0.0"); - options opts(argc, argv); + example::options opts(argc, argv); opts.add_value(url, 'a', "address", "listen on URL", "URL"); @@ -72,7 +72,7 @@ int main(int argc, char **argv) { proton::container(b.handler()).run(); return 0; - } catch (const bad_option& e) { + } catch (const example::bad_option& e) { std::cout << opts << std::endl << e.what() << std::endl; } catch (const std::exception& e) { std::cerr << e.what() << std::endl; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/client.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/client.cpp b/examples/cpp/client.cpp index 0c38ac6..494294e 100644 --- a/examples/cpp/client.cpp +++ b/examples/cpp/client.cpp @@ -80,7 +80,7 @@ class client : public proton::handler { int main(int argc, char **argv) { proton::url url("127.0.0.1:5672/examples"); - options opts(argc, argv); + example::options opts(argc, argv); opts.add_value(url, 'a', "address", "connect and send to URL", "URL"); @@ -97,7 +97,7 @@ int main(int argc, char **argv) { proton::container(c).run(); return 0; - } catch (const bad_option& e) { + } catch (const example::bad_option& e) { std::cout << opts << std::endl << e.what() << std::endl; } catch (const std::exception& e) { std::cerr << e.what() << std::endl; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/direct_recv.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/direct_recv.cpp b/examples/cpp/direct_recv.cpp index f999869..76bbaf9 100644 --- a/examples/cpp/direct_recv.cpp +++ b/examples/cpp/direct_recv.cpp @@ -72,7 +72,7 @@ class direct_recv : public proton::handler { int main(int argc, char **argv) { std::string address("127.0.0.1:5672/examples"); int message_count = 100; - options opts(argc, argv); + example::options opts(argc, argv); opts.add_value(address, 'a', "address", "listen and receive on URL", "URL"); opts.add_value(message_count, 'm', "messages", "receive COUNT messages", "COUNT"); @@ -84,7 +84,7 @@ int main(int argc, char **argv) { proton::container(recv).run(); return 0; - } catch (const bad_option& e) { + } catch (const example::bad_option& e) { std::cout << opts << std::endl << e.what() << std::endl; } catch (const std::exception& e) { std::cerr << e.what() << std::endl; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/direct_send.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/direct_send.cpp b/examples/cpp/direct_send.cpp index 0b63ec5..860acc4 100644 --- a/examples/cpp/direct_send.cpp +++ b/examples/cpp/direct_send.cpp @@ -82,7 +82,7 @@ class simple_send : public proton::handler { int main(int argc, char **argv) { std::string address("127.0.0.1:5672/examples"); int message_count = 100; - options opts(argc, argv); + example::options opts(argc, argv); opts.add_value(address, 'a', "address", "listen and send on URL", "URL"); opts.add_value(message_count, 'm', "messages", "send COUNT messages", "COUNT"); @@ -94,7 +94,7 @@ int main(int argc, char **argv) { proton::container(send).run(); return 0; - } catch (const bad_option& e) { + } catch (const example::bad_option& e) { std::cout << opts << std::endl << e.what() << std::endl; } catch (const std::exception& e) { std::cerr << e.what() << std::endl; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/cpp/engine/CMakeLists.txt b/examples/cpp/engine/CMakeLists.txt deleted file mode 100644 index bafa20c..0000000 --- a/examples/cpp/engine/CMakeLists.txt +++ /dev/null @@ -1,37 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -find_package(ProtonCpp REQUIRED) - -include_directories(${ProtonCpp_INCLUDE_DIRS}) - -foreach(example - broker - helloworld - simple_recv - simple_send - direct_recv - direct_send - client - server) - add_executable(engine-${example} ${example}.cpp ${extra_source}) - target_link_libraries(engine-${example} ${ProtonCpp_LIBRARIES}) - set_source_files_properties(engine-${example}.cpp PROPERTIES COMPILE_FLAGS "${CXX_WARNING_FLAGS}") - set_target_properties(engine-${example} PROPERTIES OUTPUT_NAME ${example}) -endforeach() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/broker.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/engine/broker.cpp b/examples/cpp/engine/broker.cpp deleted file mode 100644 index bfe84fc..0000000 --- a/examples/cpp/engine/broker.cpp +++ /dev/null @@ -1,176 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "../options.hpp" -#include "../broker.hpp" - -#include <iostream> - -#ifndef WIN32 // TODO aconway 2016-03-23: windows broker example -#include <proton/io/socket.hpp> -#include <sys/select.h> -#include <set> - -template <class T> T check(T result, const std::string& msg="io_error: ") { - if (result < 0) - throw proton::io::socket::io_error(msg + proton::io::socket::error_str()); - return result; -} - -void fd_set_if(bool on, int fd, fd_set *fds); - -class broker { - typedef std::set<proton::io::socket::engine*> engines; - - queues queues_; - broker_handler handler_; - proton::io::connection_engine::container container_; - engines engines_; - fd_set reading_, writing_; - - public: - broker() : handler_(queues_) { - FD_ZERO(&reading_); - FD_ZERO(&writing_); - } - - ~broker() { - for (engines::iterator i = engines_.begin(); i != engines_.end(); ++i) - delete *i; - } - - void run(const proton::url& url) { - proton::io::socket::listener listener(url.host(), url.port()); - std::cout << "listening on " << url << " fd=" << listener.socket() << std::endl; - FD_SET(listener.socket(), &reading_); - while(true) { - fd_set readable_set = reading_; - fd_set writable_set = writing_; - check(select(FD_SETSIZE, &readable_set, &writable_set, NULL, NULL), "select"); - - if (FD_ISSET(listener.socket(), &readable_set)) { - std::string client_host, client_port; - int fd = listener.accept(client_host, client_port); - std::cout << "accepted " << client_host << ":" << client_port - << " fd=" << fd << std::endl; - engines_.insert( - new proton::io::socket::engine( - fd, handler_, container_.make_options())); - FD_SET(fd, &reading_); - FD_SET(fd, &writing_); - } - - for (engines::iterator i = engines_.begin(); i != engines_.end(); ) { - proton::io::socket::engine *eng = *(i++); - int flags = 0; - if (FD_ISSET(eng->socket(), &writable_set)) - eng->write(); - if (FD_ISSET(eng->socket(), &readable_set)) - eng->read(); - if (eng->dispatch()) { - fd_set_if(eng->read_buffer().size, eng->socket(), &reading_); - fd_set_if(eng->write_buffer().size, eng->socket(), &writing_); - } else { - std::cout << "closed fd=" << eng->socket() << std::endl; - engines_.erase(eng); - delete eng; - } - } - } - } -}; - -void fd_set_if(bool on, int fd, fd_set *fds) { - if (on) - FD_SET(fd, fds); - else - FD_CLR(fd, fds); -} - -int main(int argc, char **argv) { - // Command line options - std::string address("0.0.0.0"); - options opts(argc, argv); - opts.add_value(address, 'a', "address", "listen on URL", "URL"); - try { - opts.parse(); - broker().run(address); - return 0; - } catch (const bad_option& e) { - std::cout << opts << std::endl << e.what() << std::endl; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - return 1; -} -#else // WIN32 - -#include "proton/acceptor.hpp" -#include "proton/container.hpp" -#include "proton/value.hpp" - -#include "../fake_cpp11.hpp" - -class broker { - public: - broker(const proton::url& url) : handler_(url, queues_) {} - - proton::handler& handler() { return handler_; } - - private: - - class my_handler : public broker_handler { - public: - my_handler(const proton::url& u, queues& qs) : broker_handler(qs), url_(u) {} - - void on_container_start(proton::container &c) override { - c.listen(url_); - std::cout << "broker listening on " << url_ << std::endl; - } - - private: - const proton::url& url_; - }; - - private: - queues queues_; - my_handler handler_; -}; - -int main(int argc, char **argv) { - // Command line options - proton::url url("0.0.0.0"); - options opts(argc, argv); - opts.add_value(url, 'a', "address", "listen on URL", "URL"); - try { - opts.parse(); - broker b(url); - proton::container(b.handler()).run(); - return 0; - } catch (const bad_option& e) { - std::cout << opts << std::endl << e.what() << std::endl; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - return 1; -} - -#endif http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/client.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/engine/client.cpp b/examples/cpp/engine/client.cpp deleted file mode 100644 index 8e58a38..0000000 --- a/examples/cpp/engine/client.cpp +++ /dev/null @@ -1,103 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "options.hpp" -#include "proton/io/socket.hpp" -#include "proton/url.hpp" -#include "proton/delivery.hpp" -#include "proton/handler.hpp" -#include "proton/connection.hpp" -#include "proton/tracker.hpp" -#include "proton/source_options.hpp" - -#include <iostream> -#include <vector> - -#include "../fake_cpp11.hpp" - -using proton::receiver_options; -using proton::source_options; - -class client : public proton::handler { - private: - proton::url url; - std::vector<std::string> requests; - proton::sender sender; - proton::receiver receiver; - - public: - client(const proton::url &u, const std::vector<std::string>& r) : url(u), requests(r) {} - - void on_connection_open(proton::connection &c) override { - sender = c.open_sender(url.path()); - // Create a receiver requesting a dynamically created queue - // for the message source. - receiver_options dynamic_addr = receiver_options().source(source_options().dynamic(true)); - receiver = c.open_receiver("", dynamic_addr); - } - - void send_request() { - proton::message req; - req.body(requests.front()); - req.reply_to(receiver.source().address()); - sender.send(req); - } - - void on_receiver_open(proton::receiver &) override { - send_request(); - } - - void on_message(proton::delivery &d, proton::message &response) override { - if (requests.empty()) return; // Spurious extra message! - std::cout << requests.front() << " => " << response.body() << std::endl; - requests.erase(requests.begin()); - if (!requests.empty()) { - send_request(); - } else { - d.connection().close(); - } - } -}; - -int main(int argc, char **argv) { - // Command line options - std::string address("127.0.0.1:5672/examples"); - options opts(argc, argv); - opts.add_value(address, 'a', "address", "connect and send to URL", "URL"); - - try { - opts.parse(); - - std::vector<std::string> requests; - requests.push_back("Twas brillig, and the slithy toves"); - requests.push_back("Did gire and gymble in the wabe."); - requests.push_back("All mimsy were the borogroves,"); - requests.push_back("And the mome raths outgrabe."); - client handler(address, requests); - proton::io::socket::engine(address, handler).run(); - return 0; - } catch (const bad_option& e) { - std::cout << opts << std::endl << e.what() << std::endl; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - return 1; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/direct_recv.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/engine/direct_recv.cpp b/examples/cpp/engine/direct_recv.cpp deleted file mode 100644 index 48f4478..0000000 --- a/examples/cpp/engine/direct_recv.cpp +++ /dev/null @@ -1,79 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "options.hpp" - -#include "proton/delivery.hpp" -#include "proton/io/socket.hpp" -#include "proton/handler.hpp" -#include "proton/receiver.hpp" -#include "proton/url.hpp" -#include "proton/value.hpp" - -#include <iostream> -#include <map> - -#include "../fake_cpp11.hpp" - -class direct_recv : public proton::handler { - private: - uint64_t expected; - uint64_t received; - - public: - direct_recv(int c) : expected(c), received(0) {} - - void on_message(proton::delivery &d, proton::message &msg) override { - if (msg.id().get<uint64_t>() < received) - return; // ignore duplicate - if (expected == 0 || received < expected) { - std::cout << msg.body() << std::endl; - received++; - } - if (received == expected) { - d.receiver().close(); - d.connection().close(); - } - } -}; - -int main(int argc, char **argv) { - // Command line options - std::string address("127.0.0.1:5672/examples"); - int message_count = 100; - options opts(argc, argv); - opts.add_value(address, 'a', "address", "listen and receive on URL", "URL"); - opts.add_value(message_count, 'm', "messages", "receive COUNT messages", "COUNT"); - try { - opts.parse(); - proton::url url(address); - proton::io::socket::listener listener(url.host(), url.port()); - std::cout << "direct_recv listening on " << url << std::endl; - direct_recv handler(message_count); - proton::io::socket::engine(listener.accept(), handler).run(); - return 0; - } catch (const bad_option& e) { - std::cout << opts << std::endl << e.what() << std::endl; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - return 1; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/direct_send.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/engine/direct_send.cpp b/examples/cpp/engine/direct_send.cpp deleted file mode 100644 index 2d9acf0..0000000 --- a/examples/cpp/engine/direct_send.cpp +++ /dev/null @@ -1,91 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "options.hpp" - -#include "proton/acceptor.hpp" -#include "proton/connection.hpp" -#include "proton/io/socket.hpp" -#include "proton/url.hpp" -#include "proton/handler.hpp" -#include "proton/tracker.hpp" -#include "proton/value.hpp" - -#include <iostream> -#include <map> - -#include "../fake_cpp11.hpp" - -class simple_send : public proton::handler { - private: - int sent; - int confirmed; - int total; - public: - simple_send(int c) : sent(0), confirmed(0), total(c) {} - - void on_sendable(proton::sender &sender) override { - while (sender.credit() && sent < total) { - proton::message msg; - msg.id(sent + 1); - std::map<std::string, int> m; - m["sequence"] = sent+1; - msg.body(m); - sender.send(msg); - sent++; - } - } - - void on_tracker_accept(proton::tracker &t) override { - confirmed++; - if (confirmed == total) { - std::cout << "all messages confirmed" << std::endl; - t.connection().close(); - } - } - - void on_transport_close(proton::transport &) override { - sent = confirmed; - } -}; - -int main(int argc, char **argv) { - // Command line options - std::string address("127.0.0.1:5672/examples"); - int message_count = 100; - options opts(argc, argv); - opts.add_value(address, 'a', "address", "listen and send on URL", "URL"); - opts.add_value(message_count, 'm', "messages", "send COUNT messages", "COUNT"); - try { - opts.parse(); - proton::url url(address); - proton::io::socket::listener listener(url.host(), url.port()); - std::cout << "direct_send listening on " << url << std::endl; - simple_send handler(message_count); - proton::io::socket::engine(listener.accept(), handler).run(); - return 0; - } catch (const bad_option& e) { - std::cout << opts << std::endl << e.what() << std::endl; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - return 1; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/helloworld.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/engine/helloworld.cpp b/examples/cpp/engine/helloworld.cpp deleted file mode 100644 index a4f23ef..0000000 --- a/examples/cpp/engine/helloworld.cpp +++ /dev/null @@ -1,68 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "proton/delivery.hpp" -#include "proton/handler.hpp" -#include "proton/tracker.hpp" -#include "proton/url.hpp" -#include "proton/io/socket.hpp" - -#include <iostream> - -#include "../fake_cpp11.hpp" - -class hello_world : public proton::handler { - private: - std::string address_; - - public: - hello_world(const std::string& address) : address_(address) {} - - void on_connection_open(proton::connection &c) override { - c.open_receiver(address_); - c.open_sender(address_); - } - - void on_sendable(proton::sender &s) override { - proton::message m("Hello World!"); - s.send(m); - s.close(); - } - - void on_message(proton::delivery &d, proton::message &m) override { - std::cout << m.body() << std::endl; - d.connection().close(); - } -}; - -int main(int argc, char **argv) { - try { - proton::url url(argc > 1 ? argv[1] : "127.0.0.1:5672/examples"); - hello_world hw(url.path()); - proton::io::socket::engine(url, hw).run(); - - return 0; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - - return 1; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/options.hpp ---------------------------------------------------------------------- diff --git a/examples/cpp/engine/options.hpp b/examples/cpp/engine/options.hpp deleted file mode 100644 index bd477b5..0000000 --- a/examples/cpp/engine/options.hpp +++ /dev/null @@ -1,173 +0,0 @@ -#ifndef OPTIONS_HPP -#define OPTIONS_HPP -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <string> -#include <sstream> -#include <ostream> -#include <vector> -#include <stdexcept> - -/** bad_option is thrown for option parsing errors */ -struct bad_option : public std::runtime_error { - bad_option(const std::string& s) : std::runtime_error(s) {} -}; - -/** Simple command-line option parser for example programs */ -class options { - public: - - options(int argc, char const * const * argv) : argc_(argc), argv_(argv), prog_(argv[0]), help_() { - size_t slash = prog_.find_last_of("/\\"); - if (slash != std::string::npos) - prog_ = prog_.substr(slash+1); // Extract prog name from path - add_flag(help_, 'h', "help", "Print the help message"); - } - - ~options() { - for (opts::iterator i = opts_.begin(); i != opts_.end(); ++i) - delete *i; - } - - /** Updates value when parse() is called if option is present with a value. */ - template<class T> - void add_value(T& value, char short_name, const std::string& long_name, const std::string& description, const std::string var) { - opts_.push_back(new option_value<T>(value, short_name, long_name, description, var)); - } - - /** Sets flag when parse() is called if option is present. */ - void add_flag(bool& flag, char short_name, const std::string& long_name, const std::string& description) { - opts_.push_back(new option_flag(flag, short_name, long_name, description)); - } - - /** Parse the command line, return the index of the first non-option argument. - *@throws bad_option if there is a parsing error or unknown option. - */ - int parse() { - int arg = 1; - for (; arg < argc_ && argv_[arg][0] == '-'; ++arg) { - opts::iterator i = opts_.begin(); - while (i != opts_.end() && !(*i)->parse(argc_, argv_, arg)) - ++i; - if (i == opts_.end()) - throw bad_option(std::string("unknown option ") + argv_[arg]); - } - if (help_) throw bad_option(""); - return arg; - } - - /** Print a usage message */ - friend std::ostream& operator<<(std::ostream& os, const options& op) { - os << std::endl << "usage: " << op.prog_ << " [options]" << std::endl; - os << std::endl << "options:" << std::endl; - for (opts::const_iterator i = op.opts_.begin(); i < op.opts_.end(); ++i) - os << **i << std::endl; - return os; - } - - private: - class option { - public: - option(char s, const std::string& l, const std::string& d, const std::string v) : - short_(std::string("-") + s), long_("--" + l), desc_(d), var_(v) {} - virtual ~option() {} - - virtual bool parse(int argc, char const * const * argv, int &i) = 0; - virtual void print_default(std::ostream&) const {}; - - friend std::ostream& operator<<(std::ostream& os, const option& op) { - os << " " << op.short_; - if (!op.var_.empty()) os << " " << op.var_; - os << ", " << op.long_; - if (!op.var_.empty()) os << "=" << op.var_; - os << std::endl << " " << op.desc_; - op.print_default(os); - return os; - } - - protected: - std::string short_, long_, desc_, var_; - }; - - template <class T> - class option_value : public option { - public: - option_value(T& value, char s, const std::string& l, const std::string& d, const std::string& v) : - option(s, l, d, v), value_(value) {} - - bool parse(int argc, char const * const * argv, int &i) { - std::string arg(argv[i]); - if (arg == short_ || arg == long_) { - if (i < argc-1) { - set_value(arg, argv[++i]); - return true; - } else { - throw bad_option("missing value for " + arg); - } - } - if (arg.compare(0, long_.size(), long_) == 0 && arg[long_.size()] == '=' ) { - set_value(long_, arg.substr(long_.size()+1)); - return true; - } - return false; - } - - virtual void print_default(std::ostream& os) const { os << " (default " << value_ << ")"; } - - void set_value(const std::string& opt, const std::string& s) { - std::istringstream is(s); - is >> value_; - if (is.fail() || is.bad()) - throw bad_option("bad value for " + opt + ": " + s); - } - - private: - T& value_; - }; - - class option_flag: public option { - public: - option_flag(bool& flag, const char s, const std::string& l, const std::string& d) : - option(s, l, d, ""), flag_(flag) - { flag_ = false; } - - bool parse(int /*argc*/, char const * const * argv, int &i) { - if (argv[i] == short_ || argv[i] == long_) { - flag_ = true; - return true; - } else { - return false; - } - } - - private: - bool &flag_; - }; - - typedef std::vector<option*> opts; - - int argc_; - char const * const * argv_; - std::string prog_; - opts opts_; - bool help_; -}; - -#endif // OPTIONS_HPP http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/server.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/engine/server.cpp b/examples/cpp/engine/server.cpp deleted file mode 100644 index 31f3599..0000000 --- a/examples/cpp/engine/server.cpp +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "options.hpp" - -#include "proton/connection.hpp" -#include "proton/delivery.hpp" -#include "proton/io/socket.hpp" -#include "proton/url.hpp" -#include "proton/handler.hpp" -#include "proton/tracker.hpp" -#include "proton/url.hpp" - -#include <iostream> -#include <map> -#include <string> -#include <cctype> - -#include "../fake_cpp11.hpp" - -class server : public proton::handler { - private: - typedef std::map<std::string, proton::sender> sender_map; - proton::url url; - sender_map senders; - - public: - - server(const std::string &u) : url(u) {} - - void on_connection_open(proton::connection &c) override { - c.open_receiver(url.path()); - std::cout << "server connected to " << url << std::endl; - } - - std::string to_upper(const std::string &s) { - std::string uc(s); - size_t l = uc.size(); - for (size_t i=0; i<l; i++) uc[i] = std::toupper(uc[i]); - return uc; - } - - void on_message(proton::delivery &d, proton::message &m) override { - std::cout << "Received " << m.body() << std::endl; - std::string reply_to = m.reply_to(); - proton::message reply; - reply.to(reply_to); - reply.body(to_upper(proton::get<std::string>(m.body()))); - reply.correlation_id(m.correlation_id()); - if (!senders[reply_to]) - senders[reply_to] = d.connection().open_sender(reply_to); - senders[reply_to].send(reply); - } -}; - -int main(int argc, char **argv) { - // Command line options - std::string address("amqp://0.0.0.0:5672/examples"); - options opts(argc, argv); - opts.add_value(address, 'a', "address", "listen on URL", "URL"); - try { - opts.parse(); - server handler(address); - proton::io::socket::engine(address, handler).run(); - return 0; - } catch (const bad_option& e) { - std::cout << opts << std::endl << e.what() << std::endl; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - return 1; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/simple_recv.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/engine/simple_recv.cpp b/examples/cpp/engine/simple_recv.cpp deleted file mode 100644 index ffd80f9..0000000 --- a/examples/cpp/engine/simple_recv.cpp +++ /dev/null @@ -1,85 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "options.hpp" - -#include "proton/io/socket.hpp" -#include "proton/url.hpp" -#include "proton/handler.hpp" -#include "proton/receiver.hpp" -#include "proton/value.hpp" -#include "proton/message_id.hpp" -#include "proton/delivery.hpp" - -#include <iostream> -#include <map> - -#include "../fake_cpp11.hpp" - -class simple_recv : public proton::handler { - private: - proton::url url; - proton::receiver receiver; - uint64_t expected; - uint64_t received; - public: - - simple_recv(const std::string &s, int c) : url(s), expected(c), received(0) {} - - void on_connection_open(proton::connection &c) override { - receiver = c.open_receiver(url.path()); - std::cout << "simple_recv listening on " << url << std::endl; - } - - void on_message(proton::delivery& d, proton::message &msg) override { - if (msg.id().get<uint64_t>() < received) - return; // ignore duplicate - if (expected == 0 || received < expected) { - std::cout << msg.body() << std::endl; - received++; - if (received == expected) { - d.receiver().close(); - d.connection().close(); - } - } - } -}; - -int main(int argc, char **argv) { - // Command line options - std::string address("127.0.0.1:5672/examples"); - int message_count = 100; - options opts(argc, argv); - opts.add_value(address, 'a', "address", "connect to and receive from URL", "URL"); - opts.add_value(message_count, 'm', "messages", "receive COUNT messages", "COUNT"); - - try { - opts.parse(); - simple_recv handler(address, message_count); - proton::io::socket::engine(address, handler).run(); - return 0; - } catch (const bad_option& e) { - std::cout << opts << std::endl << e.what() << std::endl; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - return 1; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/simple_send.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/engine/simple_send.cpp b/examples/cpp/engine/simple_send.cpp deleted file mode 100644 index e08f39f..0000000 --- a/examples/cpp/engine/simple_send.cpp +++ /dev/null @@ -1,93 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "options.hpp" - -#include "proton/io/socket.hpp" -#include "proton/url.hpp" -#include "proton/handler.hpp" -#include "proton/connection.hpp" -#include "proton/tracker.hpp" -#include "proton/value.hpp" - -#include <iostream> -#include <map> - -#include "../fake_cpp11.hpp" - -class simple_send : public proton::handler { - private: - proton::url url; - int sent; - int confirmed; - int total; - public: - - simple_send(const std::string &s, int c) : url(s), sent(0), confirmed(0), total(c) {} - - void on_connection_open(proton::connection &c) override { - c.open_sender(url.path()); - } - - void on_sendable(proton::sender &sender) override { - while (sender.credit() && sent < total) { - proton::message msg; - msg.id(sent + 1); - std::map<std::string, int> m; - m["sequence"] = sent+1; - msg.body(m); - sender.send(msg); - sent++; - } - } - - void on_tracker_accept(proton::tracker &t) override { - confirmed++; - if (confirmed == total) { - std::cout << "all messages confirmed" << std::endl; - t.connection().close(); - } - } - - void on_transport_close(proton::transport &) override { - sent = confirmed; - } -}; - -int main(int argc, char **argv) { - // Command line options - std::string address("127.0.0.1:5672/examples"); - int message_count = 100; - options opts(argc, argv); - opts.add_value(address, 'a', "address", "connect and send to URL", "URL"); - opts.add_value(message_count, 'm', "messages", "send COUNT messages", "COUNT"); - try { - opts.parse(); - simple_send handler(address, message_count); - proton::io::socket::engine(address, handler).run(); - return 0; - } catch (const bad_option& e) { - std::cout << opts << std::endl << e.what() << std::endl; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - return 1; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/example/socket_windows.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/example/socket_windows.cpp b/examples/cpp/example/socket_windows.cpp new file mode 100644 index 0000000..f312525 --- /dev/null +++ b/examples/cpp/example/socket_windows.cpp @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "msg.hpp" + +#include <proton/io/socket.hpp> +#include <proton/url.hpp> + +#define FD_SETSIZE 2048 +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0501 +#endif +#if _WIN32_WINNT < 0x0501 +#error "Proton requires Windows API support for XP or later." +#endif +#include <winsock2.h> +#include <mswsock.h> +#include <Ws2tcpip.h> + +#include <ctype.h> +#include <errno.h> +#include <stdio.h> +#include <assert.h> + +namespace proton { +namespace io { +namespace socket { + +const descriptor INVALID_DESCRIPTOR = INVALID_SOCKET; + +std::string error_str() { + HRESULT code = WSAGetLastError(); + char err[1024] = {0}; + FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS | + FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL); + return err; +} + +io_error::io_error(const std::string& s) : error(s) {} + +namespace { + +template <class T> T check(T result, const std::string& msg=std::string()) { + if (result == SOCKET_ERROR) + throw io_error(msg + error_str()); + return result; +} + +void gai_check(int result, const std::string& msg="") { + if (result) + throw io_error(msg + gai_strerror(result)); +} + +} // namespace + +void initialize() { + WSADATA unused; + check(WSAStartup(0x0202, &unused), "can't load WinSock: "); // Version 2.2 +} + +void finalize() { + WSACleanup(); +} + +void engine::init() { + u_long nonblock = 1; + check(::ioctlsocket(socket_, FIONBIO, &nonblock), "ioctlsocket: "); +} + +engine::engine(descriptor fd, handler& h, const connection_options &opts) + : connection_engine(h, opts), socket_(fd) +{ + init(); +} + +engine::engine(const url& u, handler& h, const connection_options &opts) + : connection_engine(h, opts), socket_(connect(u)) +{ + init(); + connection().open(); +} + +engine::~engine() {} + +void engine::read() { + mutable_buffer rbuf = read_buffer(); + if (rbuf.size > 0) { + int n = ::recv(socket_, rbuf.data, rbuf.size, 0); + if (n > 0) + read_done(n); + else if (n == 0) + read_close(); + else if (n == SOCKET_ERROR && WSAGetLastError() != WSAEWOULDBLOCK) + close(error_condition("io_error", error_str())); + } +} + +void engine::write() { + const_buffer wbuf = write_buffer(); + if (wbuf.size > 0) { + int n = ::send(socket_, wbuf.data, wbuf.size, 0); + if (n > 0) + write_done(n); + else if (n == SOCKET_ERROR && WSAGetLastError() != WSAEWOULDBLOCK) + close(error_condition("io_error", error_str())); + } +} + +void engine::run() { + while (dispatch()) { + fd_set rd, wr; + FD_ZERO(&rd); + if (read_buffer().size) + FD_SET(socket_, &rd); + FD_ZERO(&wr); + if (write_buffer().size) + FD_SET(socket_, &wr); + int n = ::select(FD_SETSIZE, &rd, &wr, NULL, NULL); + if (n < 0) { + close(error_condition("select: ", error_str())); + break; + } + if (FD_ISSET(socket_, &rd)) { + read(); + } + if (FD_ISSET(socket_, &wr)) + write(); + } + ::closesocket(socket_); +} + +namespace { +struct auto_addrinfo { + struct addrinfo *ptr; + auto_addrinfo() : ptr(0) {} + ~auto_addrinfo() { ::freeaddrinfo(ptr); } + addrinfo* operator->() const { return ptr; } +}; + +static const char *amqp_service(const char *port) { + // Help older Windows to know about amqp[s] ports + if (port) { + if (!strcmp("amqp", port)) return "5672"; + if (!strcmp("amqps", port)) return "5671"; + } + return port; +} +} + + +descriptor connect(const proton::url& u) { + // convert "0.0.0.0" to "127.0.0.1" on Windows for outgoing sockets + std::string host = (u.host() == "0.0.0.0") ? "127.0.0.1" : u.host(); + descriptor fd = INVALID_SOCKET; + try{ + auto_addrinfo addr; + gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(), + amqp_service(u.port().empty() ? 0 : u.port().c_str()), + 0, &addr.ptr), + "connect address invalid: "); + fd = check(::socket(addr->ai_family, SOCK_STREAM, 0), "connect socket: "); + check(::connect(fd, addr->ai_addr, addr->ai_addrlen), "connect: "); + return fd; + } catch (...) { + if (fd != INVALID_SOCKET) ::closesocket(fd); + throw; + } +} + +listener::listener(const std::string& host, const std::string &port) : socket_(INVALID_SOCKET) { + try { + auto_addrinfo addr; + gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(), + port.empty() ? 0 : port.c_str(), 0, &addr.ptr), + "listener address invalid: "); + socket_ = check(::socket(addr->ai_family, SOCK_STREAM, 0), "listener socket: "); + bool yes = true; + check(setsockopt(socket_, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char*)&yes, sizeof(yes)), "setsockopt: "); + check(::bind(socket_, addr->ai_addr, addr->ai_addrlen), "listener bind: "); + check(::listen(socket_, 32), "listener listen: "); + } catch (...) { + if (socket_ != INVALID_SOCKET) ::closesocket(socket_); + throw; + } +} + +listener::~listener() { ::closesocket(socket_); } + +descriptor listener::accept(std::string& host_str, std::string& port_str) { + struct sockaddr_storage addr; + socklen_t size = sizeof(addr); + int fd = check(::accept(socket_, (struct sockaddr *)&addr, &size), "accept: "); + char host[NI_MAXHOST], port[NI_MAXSERV]; + gai_check(getnameinfo((struct sockaddr *) &addr, sizeof(addr), + host, sizeof(host), port, sizeof(port), 0), + "accept invalid remote address: "); + host_str = host; + port_str = port; + return fd; +} + +}}} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/example_test.py ---------------------------------------------------------------------- diff --git a/examples/cpp/example_test.py b/examples/cpp/example_test.py index d228d67..38a5154 100644 --- a/examples/cpp/example_test.py +++ b/examples/cpp/example_test.py @@ -131,60 +131,40 @@ class Proc(Popen): raise ProcError(self, "timeout waiting for exit") -def count_tests(cls): - methods = inspect.getmembers(cls, predicate=inspect.ismethod) - tests = [ i for i,j in methods if i.startswith('test_') ] - return len(tests) - -class CompatSetupClass(object): - # Roughly provides setUpClass and tearDownClass functionality for older python versions - # in our test scenarios - def __init__(self, target): - self.completed = False - self.test_count = count_tests(target) - self.target = target - self.global_setup = False - - def note_setup(self): - if not self.global_setup: - self.global_setup = True - self.target.setup_class() - - def note_teardown(self): - self.test_count -= 1 - if self.test_count == 0: - self.completed = True - self.target.teardown_class() - - -class ExampleTestCase(unittest.TestCase): - - @classmethod - def setup_class(cls): - pass - - @classmethod - def teardown_class(cls): - pass - - def completed(self): - cls = self.__class__ - return cls.compat_ and cls.compat_.completed - +if hasattr(unittest.TestCase, 'setUpClass') and hasattr(unittest.TestCase, 'tearDownClass'): + TestCase = unittest.TestCase +else: + class TestCase(unittest.TestCase): + """ + Roughly provides setUpClass and tearDownClass functionality for older python + versions in our test scenarios. If subclasses override setUp or tearDown + they *must* call the superclass. + """ + def setUp(self): + if not hasattr(type(self), '_setup_class_count'): + type(self)._setup_class_count = len( + inspect.getmembers( + type(self), + predicate=lambda(m): inspect.ismethod(m) and m.__name__.startswith('test_'))) + type(self).setUpClass() + + def tearDown(self): + self.assertTrue(self._setup_class_count > 0) + self._setup_class_count -= 1 + if self._setup_class_count == 0: + type(self).tearDownClass() + + +class ExampleTestCase(TestCase): + """TestCase that manages started processes""" def setUp(self): - cls = self.__class__ - if not hasattr(cls, "compat_"): - cls.compat_ = CompatSetupClass(cls) - if cls.compat_.completed: - # Last test for this class already seen. - raise Exception("Test sequencing error") - cls.compat_.note_setup() + super(ExampleTestCase, self).setUp() self.procs = [] def tearDown(self): for p in self.procs: p.safe_kill() - self.__class__.compat_.note_teardown() + super(ExampleTestCase, self).tearDown() def proc(self, *args, **kwargs): p = Proc(*args, **kwargs) @@ -194,27 +174,26 @@ class ExampleTestCase(unittest.TestCase): class BrokerTestCase(ExampleTestCase): """ ExampleTest that starts a broker in setUpClass and kills it in tearDownClass. + Subclasses must set `broker_exe` class variable with the name of the broker executable. """ - # setUpClass not available until 2.7 @classmethod - def setup_class(cls): + def setUpClass(cls): cls.addr = pick_addr() + "/examples" - cls.broker = Proc(["broker", "-a", cls.addr], ready="listening") + cls.broker = None # In case Proc throws, create the attribute. + cls.broker = Proc([cls.broker_exe, "-a", cls.addr], ready="listening") cls.broker.wait_ready() - # tearDownClass not available until 2.7 @classmethod - def teardown_class(cls): - cls.broker.safe_kill() + def tearDownClass(cls): + if cls.broker: cls.broker.safe_kill() def tearDown(self): + b = type(self).broker + if b and b.poll() != None: # Broker crashed + type(self).setUpClass() # Start another for the next test. + raise ProcError(b, "broker crash") super(BrokerTestCase, self).tearDown() - if not self.completed(): - b = type(self).broker - if b.poll() != None: # Broker crashed - type(self).setUpClass() # Start another for the next test. - raise ProcError(b, "broker crash") CLIENT_EXPECT="""Twas brillig, and the slithy toves => TWAS BRILLIG, AND THE SLITHY TOVES @@ -230,6 +209,8 @@ def recv_expect(name, addr): class ContainerExampleTest(BrokerTestCase): """Run the container examples, verify they behave as expected.""" + broker_exe = "broker" + def test_helloworld(self): self.assertEqual('Hello World!\n', self.proc(["helloworld", self.addr]).wait_exit()) @@ -341,8 +322,8 @@ Hello World! self.assertEqual(expect_found, True) -class ConnectionEngineExampleTest(BrokerTestCase): - """Run the connction_engine examples, verify they behave as expected.""" +class EngineTestCase(BrokerTestCase): + """Run selected clients to test a connction_engine broker.""" def test_helloworld(self): self.assertEqual('Hello World!\n', @@ -380,5 +361,8 @@ class ConnectionEngineExampleTest(BrokerTestCase): self.assertEqual(CLIENT_EXPECT, self.proc(["client", "-a", self.addr]).wait_exit()) +class MtBrokerTest(EngineTestCase): + broker_exe = "mt_broker" + if __name__ == "__main__": unittest.main() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/mt/broker.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/mt/broker.cpp b/examples/cpp/mt/broker.cpp new file mode 100644 index 0000000..48738c9 --- /dev/null +++ b/examples/cpp/mt/broker.cpp @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "../options.hpp" + +#include <proton/connection.hpp> +#include <proton/controller.hpp> +#include <proton/delivery.hpp> +#include <proton/handler.hpp> +#include <proton/url.hpp> +#include <proton/work_queue.hpp> + +#include <atomic> +#include <functional> +#include <iostream> +#include <mutex> +#include <thread> + +// Thread safe queue. +// Stores messages, notifies subscribed connections when there is data. +class queue { + public: + queue(const std::string& name) : name_(name) {} + + std::string name() const { return name_; } + + // Push a message onto the queue. + // If the queue was previously empty, notify subscribers it has messages. + // Called from receiver's connection. + void push(const proton::message &m) { + std::lock_guard<std::mutex> g(lock_); + messages_.push_back(m); + if (messages_.size() == 1) { // Non-empty, notify subscribers + for (auto cb : callbacks_) + cb(this); + callbacks_.clear(); + } + } + + // If the queue is not empty, pop a message into m and return true. + // Otherwise save callback to be called when there are messages and return false. + // Called from sender's connection. + bool pop(proton::message& m, std::function<void(queue*)> callback) { + std::lock_guard<std::mutex> g(lock_); + if (messages_.empty()) { + callbacks_.push_back(callback); + return false; + } else { + m = std::move(messages_.front()); + messages_.pop_front(); + return true; + } + } + + private: + const std::string name_; + std::mutex lock_; + std::deque<proton::message> messages_; + std::vector<std::function<void(queue*)> > callbacks_; +}; + +/// Thread safe map of queues. +class queues { + public: + queues() : next_id_(0) {} + + // Get or create the named queue. + queue* get(const std::string& name) { + std::lock_guard<std::mutex> g(lock_); + auto i = queues_.insert(queue_map::value_type(name, nullptr)).first; + if (!i->second) + i->second.reset(new queue(name)); + return i->second.get(); + } + + // Create a dynamic queue with a unique name. + queue* dynamic() { + std::ostringstream os; + os << "_dynamic_" << next_id_++; + return get(os.str()); + } + + private: + typedef std::map<std::string, std::unique_ptr<queue> > queue_map; + + std::mutex lock_; + queue_map queues_; + std::atomic<uint64_t> next_id_; // Use to generate unique queue IDs. +}; + +/// Broker connection handler. Things to note: +/// +/// Each handler manages a single connection. Proton AMQP callbacks and queue +/// callbacks via proton::work_queue are serialized per-connection, so the +/// handler does not need a lock. Handlers for different connections can be +/// called concurrently. +/// +/// Senders (aka subscriptions) need some cross-thread notification:. +/// +/// - a sender that gets credit calls queue::pop() in `on_sendable()` +/// - on success it sends the message immediatly. +/// - on queue empty, the sender is added to the `blocked_` set and the queue stores a callback. +/// - when a receiver thread pushes a message, the queue calls its callbacks. +/// - the callback causes a serialized call to has_messages() which re-tries all `blocked_` senders. +/// +class broker_connection_handler : public proton::handler { + public: + broker_connection_handler(queues& qs) : queues_(qs) {} + + void on_connection_open(proton::connection& c) override { + // Create the has_messages callback for use with queue subscriptions. + // + // Note the captured and bound arguments must be thread-safe to copy, + // shared_ptr<work_queue>, and plain pointers this and q are all safe. + // + // The proton::connection object c is not thread-safe to copy. + // However when the work_queue calls this->has_messages it will be safe + // to use any proton objects associated with c again. + auto work = proton::work_queue::get(c); + has_messages_callback_ = [this, work](queue* q) { + work->push(std::bind(&broker_connection_handler::has_messages, this, q)); + }; + c.open(); // Always accept + } + + // A sender sends messages from a queue to a subscriber. + void on_sender_open(proton::sender &sender) override { + queue *q = sender.source().dynamic() ? + queues_.dynamic() : queues_.get(sender.source().address()); + std::cout << "sending from " << q->name() << std::endl; + } + + // We have credit to send a message. + void on_sendable(proton::sender &s) override { + queue* q = sender_queue(s); + if (!do_send(q, s)) // Queue is empty, save ourselves in the blocked set. + blocked_.insert(std::make_pair(q, s)); + } + + // A receiver receives messages from a publisher to a queue. + void on_receiver_open(proton::receiver &receiver) override { + std::string qname = receiver.target().address(); + if (qname == "shutdown") { + std::cout << "broker shutting down" << std::endl; + // Sending to the special "shutdown" queue stops the broker. + proton::controller::get(receiver.connection()).stop( + proton::error_condition("shutdown", "stop broker")); + } else { + std::cout << "receiving to " << qname << std::endl; + } + } + + // A message is received. + void on_message(proton::delivery &d, proton::message &m) override { + std::string qname = d.receiver().target().address(); + queues_.get(qname)->push(m); + } + + void on_session_close(proton::session &session) override { + // Erase all blocked senders that belong to session. + auto predicate = [session](const proton::sender& s) { + return s.session() == session; + }; + erase_sender_if(blocked_.begin(), blocked_.end(), predicate); + } + + void on_sender_close(proton::sender &sender) override { + // Erase sender from the blocked set. + auto range = blocked_.equal_range(sender_queue(sender)); + auto predicate = [sender](const proton::sender& s) { return s == sender; }; + erase_sender_if(range.first, range.second, predicate); + } + + // The controller calls on_transport_close() last. + void on_transport_close(proton::transport&) override { + delete this; // All done. + } + + private: + typedef std::multimap<queue*, proton::sender> blocked_map; + + // Get the queue associated with a sender. + queue* sender_queue(const proton::sender& s) { + return queues_.get(s.source().address()); // Thread safe. + } + + // Only called if we have credit. Return true if we sent a message. + bool do_send(queue* q, proton::sender &s) { + proton::message m; + bool popped = q->pop(m, has_messages_callback_); + if (popped) + s.send(m); + /// if !popped the queue has saved the callback for later. + return popped; + } + + // Called via @ref work_queue when q has messages. Try all the blocked senders. + void has_messages(queue* q) { + auto range = blocked_.equal_range(q); + for (auto i = range.first; i != range.second;) { + if (i->second.credit() <= 0 || do_send(q, i->second)) + i = blocked_.erase(i); // No credit or send was successful, stop blocked. + else + ++i; // have credit, didn't send, keep blocked + } + } + + // Use to erase closed senders from blocked_ set. + template <class Predicate> + void erase_sender_if(blocked_map::iterator begin, blocked_map::iterator end, Predicate p) { + for (auto i = begin; i != end; ) { + if (p(i->second)) + i = blocked_.erase(i); + else + ++i; + } + } + + queues& queues_; + blocked_map blocked_; + std::function<void(queue*)> has_messages_callback_; + proton::connection connection_; +}; + + +class broker { + public: + broker(const std::string addr) : controller_(proton::controller::create()) { + controller_->options(proton::connection_options().container_id("mt_broker")); + std::cout << "broker listening on " << addr << std::endl; + controller_->listen(addr, std::bind(&broker::new_handler, this)); + } + + void run() { + for(int i = 0; i < std::thread::hardware_concurrency(); ++i) + std::thread(&proton::controller::run, controller_.get()).detach(); + controller_->wait(); + } + + private: + proton::handler* new_handler() { + return new broker_connection_handler(queues_); + } + + queues queues_; + std::unique_ptr<proton::controller> controller_; +}; + +int main(int argc, char **argv) { + // Command line options + std::string address("0.0.0.0"); + example::options opts(argc, argv); + opts.add_value(address, 'a', "address", "listen on URL", "URL"); + try { + opts.parse(); + broker(address).run(); + return 0; + } catch (const example::bad_option& e) { + std::cout << opts << std::endl << e.what() << std::endl; + } catch (const std::exception& e) { + std::cerr << "broker shutdown: " << e.what() << std::endl; + } + return 1; +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
