http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/flow_control.cpp ---------------------------------------------------------------------- diff --git a/cpp/examples/flow_control.cpp b/cpp/examples/flow_control.cpp new file mode 100644 index 0000000..c74070c --- /dev/null +++ b/cpp/examples/flow_control.cpp @@ -0,0 +1,261 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include <proton/connection.hpp> +#include <proton/connection_options.hpp> +#include <proton/container.hpp> +#include <proton/delivery.hpp> +#include <proton/listen_handler.hpp> +#include <proton/listener.hpp> +#include <proton/message.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/receiver_options.hpp> +#include <proton/sender.hpp> +#include <proton/tracker.hpp> + +#include <iostream> +#include <sstream> + +#include "fake_cpp11.hpp" + +namespace { + +bool verbose = true; + +void verify(bool success, const std::string &msg) { + if (!success) + throw std::runtime_error("example failure:" + msg); + else { + std::cout << "success: " << msg << std::endl; + if (verbose) std::cout << std::endl; + } +} + +} + +// flow_sender manages the incoming connection and acts as the message sender. +class flow_sender : public proton::messaging_handler { + private: + int available; // Number of messages the sender may send assuming sufficient credit. + int sequence; + + public: + flow_sender() : available(0), sequence(0) {} + + void send_available_messages(proton::sender &s) { + for (int i = sequence; available && s.credit() > 0; i++) { + std::ostringstream mbody; + mbody << "flow_sender message " << sequence++; + proton::message m(mbody.str()); + s.send(m); + available--; + } + } + + void on_sendable(proton::sender &s) OVERRIDE { + if (verbose) + std::cout << "flow_sender in \"on_sendable\" with credit " << s.credit() + << " and " << available << " available messages" << std::endl; + send_available_messages(s); + } + + void on_sender_drain_start(proton::sender &s) OVERRIDE { + if (verbose) + std::cout << "flow_sender in \"on_drain_start\" with credit " << s.credit() + << " and " << available << " available messages" << std::endl; + send_available_messages(s); + if (s.credit()) { + s.return_credit(); // return the rest + } + } + + void set_available(int n) { available = n; } +}; + +class flow_receiver : public proton::messaging_handler { + public: + int stage; + int received; + flow_sender &sender; + + flow_receiver(flow_sender &s) : stage(0), received(0), sender(s) {} + + void example_setup(int n) { + received = 0; + sender.set_available(n); + } + + void run_stage(proton::receiver &r, const std::string &caller) { + // Serialize the progression of the flow control examples. + switch (stage) { + case 0: + if (verbose) std::cout << "Example 1. Simple use of credit." << std::endl; + // TODO: add timeout callbacks, show no messages until credit. + example_setup(2); + r.add_credit(2); + break; + case 1: + if (r.credit() > 0) return; + verify(received == 2, "Example 1: simple credit"); + + if (verbose) std::cout << "Example 2. Use basic drain, sender has 3 \"immediate\" messages." << std::endl; + example_setup(3); + r.add_credit(5); // ask for up to 5 + r.drain(); // but only use what's available + break; + case 2: + if (caller == "on_message") return; + if (caller == "on_receiver_drain_finish") { + // Note that unused credit of 2 at sender is returned and is now 0. + verify(received == 3 && r.credit() == 0, "Example 2: basic drain"); + + if (verbose) std::cout << "Example 3. Drain use with no credit." << std::endl; + example_setup(0); + r.drain(); + break; + } + verify(false, "example 2 run_stage"); + return; + + case 3: + verify(caller == "on_receiver_drain_finish" && received == 0, "Example 3: drain without credit"); + + if (verbose) std::cout << "Example 4. Show using high(10)/low(3) watermark for 25 messages." << std::endl; + example_setup(25); + r.add_credit(10); + break; + + case 4: + if (received < 25) { + // Top up credit as needed. + uint32_t credit = r.credit(); + if (credit <= 3) { + uint32_t new_credit = 10; + uint32_t remaining = 25 - received; + if (new_credit > remaining) + new_credit = remaining; + if (new_credit > credit) { + r.add_credit(new_credit - credit); + if (verbose) + std::cout << "flow_receiver adding credit for " << new_credit - credit + << " messages" << std::endl; + } + } + return; + } + + verify(received == 25 && r.credit() == 0, "Example 4: high/low watermark"); + r.connection().close(); + break; + + default: + throw std::runtime_error("run_stage sequencing error"); + } + stage++; + } + + void on_receiver_open(proton::receiver &r) OVERRIDE { + run_stage(r, "on_receiver_open"); + } + + void on_message(proton::delivery &d, proton::message &m) OVERRIDE { + if (verbose) + std::cout << "flow_receiver in \"on_message\" with " << m.body() << std::endl; + proton::receiver r(d.receiver()); + received++; + run_stage(r, "on_message"); + } + + void on_receiver_drain_finish(proton::receiver &r) OVERRIDE { + if (verbose) + std::cout << "flow_receiver in \"on_receiver_drain_finish\"" << std::endl; + run_stage(r, "on_receiver_drain_finish"); + } +}; + +class flow_listener : public proton::listen_handler { + proton::connection_options opts; + public: + flow_listener(flow_sender& sh) { + opts.handler(sh); + } + + void on_open(proton::listener& l) OVERRIDE { + std::ostringstream url; + url << "//:" << l.port() << "/example"; // Connect to the actual listening port + l.container().connect(url.str()); + } + + proton::connection_options on_accept(proton::listener&) OVERRIDE { return opts; } +}; + +class flow_control : public proton::messaging_handler { + private: + proton::listener listener; + flow_sender send_handler; + flow_receiver receive_handler; + flow_listener listen_handler; + + public: + flow_control() : receive_handler(send_handler), listen_handler(send_handler) {} + + void on_container_start(proton::container &c) OVERRIDE { + // Listen on a dynamic port on the local host. + listener = c.listen("//:0", listen_handler); + } + + void on_connection_open(proton::connection &c) OVERRIDE { + if (c.active()) { + // outbound connection + c.open_receiver("flow_example", proton::receiver_options().handler(receive_handler).credit_window(0)); + } + } + + void on_connection_close(proton::connection &) OVERRIDE { + listener.stop(); + } +}; + +int main(int argc, char **argv) { + // Pick an "unusual" port since we are going to be talking to + // ourselves, not a broker. + bool quiet = false; + + example::options opts(argc, argv); + opts.add_flag(quiet, 'q', "quiet", "suppress additional commentary of credit allocation and consumption"); + + try { + opts.parse(); + if (quiet) + verbose = false; + + flow_control fc; + proton::container(fc).run(); + + return 0; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/helloworld.cpp ---------------------------------------------------------------------- diff --git a/cpp/examples/helloworld.cpp b/cpp/examples/helloworld.cpp new file mode 100644 index 0000000..5962826 --- /dev/null +++ b/cpp/examples/helloworld.cpp @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/connection.hpp> +#include <proton/container.hpp> +#include <proton/delivery.hpp> +#include <proton/message.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/tracker.hpp> + +#include <iostream> + +#include "fake_cpp11.hpp" + +class hello_world : public proton::messaging_handler { + std::string conn_url_; + std::string addr_; + + public: + hello_world(const std::string& u, const std::string& a) : + conn_url_(u), addr_(a) {} + + void on_container_start(proton::container& c) OVERRIDE { + c.connect(conn_url_); + } + + void on_connection_open(proton::connection& c) OVERRIDE { + c.open_receiver(addr_); + c.open_sender(addr_); + } + + void on_sendable(proton::sender &s) OVERRIDE { + proton::message m("Hello World!"); + s.send(m); + s.close(); + } + + void on_message(proton::delivery &d, proton::message &m) OVERRIDE { + std::cout << m.body() << std::endl; + d.connection().close(); + } +}; + +int main(int argc, char **argv) { + try { + std::string conn_url = argc > 1 ? argv[1] : "//127.0.0.1:5672"; + std::string addr = argc > 2 ? argv[2] : "examples"; + + hello_world hw(conn_url, addr); + proton::container(hw).run(); + + return 0; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/message_properties.cpp ---------------------------------------------------------------------- diff --git a/cpp/examples/message_properties.cpp b/cpp/examples/message_properties.cpp new file mode 100644 index 0000000..cb5c6b8 --- /dev/null +++ b/cpp/examples/message_properties.cpp @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <proton/message.hpp> +#include <proton/types.hpp> +#include <iostream> +#include <map> + +int main(int argc, char **argv) { + try { + proton::message m; + + // Setting properties: legal types are converted automatically to their + // AMQP counterpart. + m.properties().put("short", int16_t(123)); + m.properties().put("string", "foo"); + m.properties().put("symbol", proton::symbol("sym")); + + // Examining properties using proton::get() + + // 1 argument get<>() template specifies expected type of property. + std::string s = proton::get<std::string>(m.properties().get("string")); + + // 2 argument get, property must have matching type to output argument. + int16_t i; + proton::get(m.properties().get("short"), i); + + // Checking property types + proton::type_id type = m.properties().get("symbol").type(); + if (type != proton::SYMBOL) { + throw std::logic_error("wrong type!"); + } + + // proton::scalar has its own ostream << + std::cout << "using put/get:" + << " short=" << i + << " string=" << s + << " symbol=" << m.properties().get("symbol") + << std::endl; + + // Converting properties to a convertible type + std::cout << "using coerce:" + << " short(as long)=" + << proton::coerce<long>(m.properties().get("short")) + << std::endl; + + // Extract the properties as a std::map for more complex map operations. + // You can use other map and sequence types to hold a map, see @ref types_page + typedef std::map<std::string, proton::scalar> property_map; + property_map props; + proton::get(m.properties(), props); + for (property_map::iterator i = props.begin(); i != props.end(); ++i) { + std::cout << "props[" << i->first << "]=" << i->second << std::endl; + } + props["string"] = "bar"; + props["short"] = 42; + // Update the properties in the message from the modified props map + m.properties() = props; + + std::cout << "short=" << m.properties().get("short") + << " string=" << m.properties().get("string") + << std::endl; + + // proton::get throws an exception if types do not match exactly. + try { + proton::get<uint32_t>(m.properties().get("short")); // bad: uint32_t != int16_t + throw std::logic_error("expected exception"); + } catch (const proton::conversion_error& e) { + std::cout << "expected conversion_error: \"" << e.what() << '"' << std::endl; + } + + // proton::coerce throws an exception if types are not convertible. + try { + proton::get<uint32_t>(m.properties().get("string")); // bad: string to uint32_t + throw std::logic_error("expected exception"); + } catch (const proton::conversion_error& e) { + std::cout << "expected conversion_error: \"" << e.what() << '"' << std::endl; + } + + return 0; + } catch (const std::exception& e) { + std::cerr << "unexpected exception: " << e.what() << std::endl; + return 1; + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/multithreaded_client.cpp ---------------------------------------------------------------------- diff --git a/cpp/examples/multithreaded_client.cpp b/cpp/examples/multithreaded_client.cpp new file mode 100644 index 0000000..78085e2 --- /dev/null +++ b/cpp/examples/multithreaded_client.cpp @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// +// C++11 or greater +// +// A multi-threaded client that calls proton::container::run() in one thread, sends +// messages in another and receives messages in a third. +// +// Note this client does not deal with flow-control. If the sender is faster +// than the receiver, messages will build up in memory on the sending side. +// See @ref multithreaded_client_flow_control.cpp for a more complex example with +// flow control. +// +// NOTE: no proper error handling + +#include <proton/connection.hpp> +#include <proton/connection_options.hpp> +#include <proton/container.hpp> +#include <proton/message.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/receiver.hpp> +#include <proton/sender.hpp> +#include <proton/work_queue.hpp> + +#include <condition_variable> +#include <iostream> +#include <mutex> +#include <queue> +#include <sstream> +#include <string> +#include <thread> + +// Lock output from threads to avoid scrambling +std::mutex out_lock; +#define OUT(x) do { std::lock_guard<std::mutex> l(out_lock); x; } while (false) + +// Handler for a single thread-safe sending and receiving connection. +class client : public proton::messaging_handler { + // Invariant + const std::string url_; + const std::string address_; + + // Only used in proton handler thread + proton::sender sender_; + + // Shared by proton and user threads, protected by lock_ + std::mutex lock_; + proton::work_queue *work_queue_; + std::condition_variable sender_ready_; + std::queue<proton::message> messages_; + std::condition_variable messages_ready_; + + public: + client(const std::string& url, const std::string& address) : url_(url), address_(address), work_queue_(0) {} + + // Thread safe + void send(const proton::message& msg) { + // Use [=] to copy the message, we cannot pass it by reference since it + // will be used in another thread. + work_queue()->add([=]() { sender_.send(msg); }); + } + + // Thread safe + proton::message receive() { + std::unique_lock<std::mutex> l(lock_); + while (messages_.empty()) messages_ready_.wait(l); + auto msg = std::move(messages_.front()); + messages_.pop(); + return msg; + } + + // Thread safe + void close() { + work_queue()->add([=]() { sender_.connection().close(); }); + } + + private: + + proton::work_queue* work_queue() { + // Wait till work_queue_ and sender_ are initialized. + std::unique_lock<std::mutex> l(lock_); + while (!work_queue_) sender_ready_.wait(l); + return work_queue_; + } + + // == messaging_handler overrides, only called in proton handler thread + + // Note: this example creates a connection when the container starts. + // To create connections after the container has started, use + // container::connect(). + // See @ref multithreaded_client_flow_control.cpp for an example. + void on_container_start(proton::container& cont) override { + cont.connect(url_); + } + + void on_connection_open(proton::connection& conn) override { + conn.open_sender(address_); + conn.open_receiver(address_); + } + + void on_sender_open(proton::sender& s) override { + // sender_ and work_queue_ must be set atomically + std::lock_guard<std::mutex> l(lock_); + sender_ = s; + work_queue_ = &s.work_queue(); + sender_ready_.notify_all(); + } + + void on_message(proton::delivery& dlv, proton::message& msg) override { + std::lock_guard<std::mutex> l(lock_); + messages_.push(msg); + messages_ready_.notify_all(); + } + + void on_error(const proton::error_condition& e) override { + OUT(std::cerr << "unexpected error: " << e << std::endl); + exit(1); + } +}; + +int main(int argc, const char** argv) { + try { + if (argc != 4) { + std ::cerr << + "Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT\n" + "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n" + "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n" + "MESSAGE-COUNT: number of messages to send\n"; + return 1; + } + const char *url = argv[1]; + const char *address = argv[2]; + int n_messages = atoi(argv[3]); + + client cl(url, address); + proton::container container(cl); + std::thread container_thread([&]() { container.run(); }); + + std::thread sender([&]() { + for (int i = 0; i < n_messages; ++i) { + proton::message msg(std::to_string(i + 1)); + cl.send(msg); + OUT(std::cout << "sent \"" << msg.body() << '"' << std::endl); + } + }); + + int received = 0; + std::thread receiver([&]() { + for (int i = 0; i < n_messages; ++i) { + auto msg = cl.receive(); + OUT(std::cout << "received \"" << msg.body() << '"' << std::endl); + ++received; + } + }); + + sender.join(); + receiver.join(); + cl.close(); + container_thread.join(); + std::cout << received << " messages sent and received" << std::endl; + + return 0; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/multithreaded_client_flow_control.cpp ---------------------------------------------------------------------- diff --git a/cpp/examples/multithreaded_client_flow_control.cpp b/cpp/examples/multithreaded_client_flow_control.cpp new file mode 100644 index 0000000..93c6b3d --- /dev/null +++ b/cpp/examples/multithreaded_client_flow_control.cpp @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// +// C++11 or greater +// +// A multi-threaded client that sends and receives messages from multiple AMQP +// addresses. +// +// Demonstrates how to: +// +// - implement proton handlers that interact with user threads safely +// - block sender threads to respect AMQP flow control +// - use AMQP flow control to limit message buffering for receivers threads +// +// We define sender and receiver classes with simple, thread-safe blocking +// send() and receive() functions. +// +// These classes are also privately proton::message_handler instances. They use +// the thread-safe proton::work_queue and standard C++ synchronization (std::mutex +// etc.) to pass messages between user and proton::container threads. +// +// NOTE: no proper error handling + +#include <proton/connection.hpp> +#include <proton/connection_options.hpp> +#include <proton/container.hpp> +#include <proton/message.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/receiver.hpp> +#include <proton/receiver_options.hpp> +#include <proton/sender.hpp> +#include <proton/work_queue.hpp> + +#include <atomic> +#include <condition_variable> +#include <iostream> +#include <mutex> +#include <queue> +#include <sstream> +#include <string> +#include <thread> + +// Lock output from threads to avoid scrambling +std::mutex out_lock; +#define OUT(x) do { std::lock_guard<std::mutex> l(out_lock); x; } while (false) + +// A thread-safe sending connection that blocks sending threads when there +// is no AMQP credit to send messages. +class sender : private proton::messaging_handler { + // Only used in proton handler thread + proton::sender sender_; + + // Shared by proton and user threads, protected by lock_ + std::mutex lock_; + proton::work_queue *work_queue_; + std::condition_variable sender_ready_; + int queued_; // Queued messages waiting to be sent + int credit_; // AMQP credit - number of messages we can send + + public: + sender(proton::container& cont, const std::string& url, const std::string& address) + : work_queue_(0), queued_(0), credit_(0) + { + cont.open_sender(url+"/"+address, proton::connection_options().handler(*this)); + } + + // Thread safe + void send(const proton::message& m) { + { + std::unique_lock<std::mutex> l(lock_); + // Don't queue up more messages than we have credit for + while (!work_queue_ || queued_ >= credit_) sender_ready_.wait(l); + ++queued_; + } + work_queue_->add([=]() { this->do_send(m); }); // work_queue_ is thread safe + } + + // Thread safe + void close() { + work_queue()->add([=]() { sender_.connection().close(); }); + } + + private: + + proton::work_queue* work_queue() { + // Wait till work_queue_ and sender_ are initialized. + std::unique_lock<std::mutex> l(lock_); + while (!work_queue_) sender_ready_.wait(l); + return work_queue_; + } + + // == messaging_handler overrides, only called in proton handler thread + + void on_sender_open(proton::sender& s) override { + // Make sure sender_ and work_queue_ are set atomically + std::lock_guard<std::mutex> l(lock_); + sender_ = s; + work_queue_ = &s.work_queue(); + } + + void on_sendable(proton::sender& s) override { + std::lock_guard<std::mutex> l(lock_); + credit_ = s.credit(); + sender_ready_.notify_all(); // Notify senders we have credit + } + + // work_queue work items is are automatically dequeued and called by proton + // This function is called because it was queued by send() + void do_send(const proton::message& m) { + sender_.send(m); + std::lock_guard<std::mutex> l(lock_); + --queued_; // work item was consumed from the work_queue + credit_ = sender_.credit(); // update credit + sender_ready_.notify_all(); // Notify senders we have space on queue + } + + void on_error(const proton::error_condition& e) override { + OUT(std::cerr << "unexpected error: " << e << std::endl); + exit(1); + } +}; + +// A thread safe receiving connection that blocks receiving threads when there +// are no messages available, and maintains a bounded buffer of incoming +// messages by issuing AMQP credit only when there is space in the buffer. +class receiver : private proton::messaging_handler { + static const size_t MAX_BUFFER = 100; // Max number of buffered messages + + // Used in proton threads only + proton::receiver receiver_; + + // Used in proton and user threads, protected by lock_ + std::mutex lock_; + proton::work_queue* work_queue_; + std::queue<proton::message> buffer_; // Messages not yet returned by receive() + std::condition_variable can_receive_; // Notify receivers of messages + + public: + + // Connect to url + receiver(proton::container& cont, const std::string& url, const std::string& address) + : work_queue_() + { + // NOTE:credit_window(0) disables automatic flow control. + // We will use flow control to match AMQP credit to buffer capacity. + cont.open_receiver(url+"/"+address, proton::receiver_options().credit_window(0), + proton::connection_options().handler(*this)); + } + + // Thread safe receive + proton::message receive() { + std::unique_lock<std::mutex> l(lock_); + // Wait for buffered messages + while (!work_queue_ || buffer_.empty()) + can_receive_.wait(l); + proton::message m = std::move(buffer_.front()); + buffer_.pop(); + // Add a lambda to the work queue to call receive_done(). + // This will tell the handler to add more credit. + work_queue_->add([=]() { this->receive_done(); }); + return m; + } + + void close() { + std::lock_guard<std::mutex> l(lock_); + if (work_queue_) work_queue_->add([this]() { this->receiver_.connection().close(); }); + } + + private: + // ==== The following are called by proton threads only. + + void on_receiver_open(proton::receiver& r) override { + receiver_ = r; + std::lock_guard<std::mutex> l(lock_); + work_queue_ = &receiver_.work_queue(); + receiver_.add_credit(MAX_BUFFER); // Buffer is empty, initial credit is the limit + } + + void on_message(proton::delivery &d, proton::message &m) override { + // Proton automatically reduces credit by 1 before calling on_message + std::lock_guard<std::mutex> l(lock_); + buffer_.push(m); + can_receive_.notify_all(); + } + + // called via work_queue + void receive_done() { + // Add 1 credit, a receiver has taken a message out of the buffer. + receiver_.add_credit(1); + } + + void on_error(const proton::error_condition& e) override { + OUT(std::cerr << "unexpected error: " << e << std::endl); + exit(1); + } +}; + +// ==== Example code using the sender and receiver + +// Send n messages +void send_thread(sender& s, int n) { + auto id = std::this_thread::get_id(); + for (int i = 0; i < n; ++i) { + std::ostringstream ss; + ss << std::this_thread::get_id() << "-" << i; + s.send(proton::message(ss.str())); + OUT(std::cout << id << " sent \"" << ss.str() << '"' << std::endl); + } + OUT(std::cout << id << " sent " << n << std::endl); +} + +// Receive messages till atomic remaining count is 0. +// remaining is shared among all receiving threads +void receive_thread(receiver& r, std::atomic_int& remaining) { + auto id = std::this_thread::get_id(); + int n = 0; + // atomically check and decrement remaining *before* receiving. + // If it is 0 or less then return, as there are no more + // messages to receive so calling r.receive() would block forever. + while (remaining-- > 0) { + auto m = r.receive(); + ++n; + OUT(std::cout << id << " received \"" << m.body() << '"' << std::endl); + } + OUT(std::cout << id << " received " << n << " messages" << std::endl); +} + +int main(int argc, const char **argv) { + try { + if (argc != 5) { + std::cerr << + "Usage: " << argv[0] << " MESSAGE-COUNT THREAD-COUNT URL\n" + "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n" + "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n" + "MESSAGE-COUNT: number of messages to send\n" + "THREAD-COUNT: number of sender/receiver thread pairs\n"; + return 1; + } + + const char *url = argv[1]; + const char *address = argv[2]; + int n_messages = atoi(argv[3]); + int n_threads = atoi(argv[4]); + int count = n_messages * n_threads; + + // Total messages to be received, multiple receiver threads will decrement this. + std::atomic_int remaining; + remaining.store(count); + + // Run the proton container + proton::container container; + auto container_thread = std::thread([&]() { container.run(); }); + + // A single sender and receiver to be shared by all the threads + sender send(container, url, address); + receiver recv(container, url, address); + + // Start receiver threads, then sender threads. + // Starting receivers first gives all receivers a chance to compete for messages. + std::vector<std::thread> threads; + threads.reserve(n_threads*2); // Avoid re-allocation once threads are started + for (int i = 0; i < n_threads; ++i) + threads.push_back(std::thread([&]() { receive_thread(recv, remaining); })); + for (int i = 0; i < n_threads; ++i) + threads.push_back(std::thread([&]() { send_thread(send, n_messages); })); + + // Wait for threads to finish + for (auto& t : threads) t.join(); + send.close(); + recv.close(); + container_thread.join(); + if (remaining > 0) + throw std::runtime_error("not all messages were received"); + std::cout << count << " messages sent and received" << std::endl; + + return 0; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + return 1; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/options.hpp ---------------------------------------------------------------------- diff --git a/cpp/examples/options.hpp b/cpp/examples/options.hpp new file mode 100644 index 0000000..dab1bc2 --- /dev/null +++ b/cpp/examples/options.hpp @@ -0,0 +1,175 @@ +#ifndef OPTIONS_HPP +#define OPTIONS_HPP +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <string> +#include <sstream> +#include <ostream> +#include <vector> +#include <stdexcept> + +namespace example { +/** bad_option is thrown for option parsing errors */ +struct bad_option : public std::runtime_error { + bad_option(const std::string& s) : std::runtime_error(s) {} +}; + +/** Simple command-line option parser for example programs */ +class options { + public: + + options(int argc, char const * const * argv) : argc_(argc), argv_(argv), prog_(argv[0]), help_() { + size_t slash = prog_.find_last_of("/\\"); + if (slash != std::string::npos) + prog_ = prog_.substr(slash+1); // Extract prog name from path + add_flag(help_, 'h', "help", "Print the help message"); + } + + ~options() { + for (opts::iterator i = opts_.begin(); i != opts_.end(); ++i) + delete *i; + } + + /** Updates value when parse() is called if option is present with a value. */ + template<class T> + void add_value(T& value, char short_name, const std::string& long_name, const std::string& description, const std::string var) { + opts_.push_back(new option_value<T>(value, short_name, long_name, description, var)); + } + + /** Sets flag when parse() is called if option is present. */ + void add_flag(bool& flag, char short_name, const std::string& long_name, const std::string& description) { + opts_.push_back(new option_flag(flag, short_name, long_name, description)); + } + + /** Parse the command line, return the index of the first non-option argument. + *@throws bad_option if there is a parsing error or unknown option. + */ + int parse() { + int arg = 1; + for (; arg < argc_ && argv_[arg][0] == '-'; ++arg) { + opts::iterator i = opts_.begin(); + while (i != opts_.end() && !(*i)->parse(argc_, argv_, arg)) + ++i; + if (i == opts_.end()) + throw bad_option(std::string("unknown option ") + argv_[arg]); + } + if (help_) throw bad_option(""); + return arg; + } + + /** Print a usage message */ + friend std::ostream& operator<<(std::ostream& os, const options& op) { + os << std::endl << "usage: " << op.prog_ << " [options]" << std::endl; + os << std::endl << "options:" << std::endl; + for (opts::const_iterator i = op.opts_.begin(); i < op.opts_.end(); ++i) + os << **i << std::endl; + return os; + } + + private: + class option { + public: + option(char s, const std::string& l, const std::string& d, const std::string v) : + short_(std::string("-") + s), long_("--" + l), desc_(d), var_(v) {} + virtual ~option() {} + + virtual bool parse(int argc, char const * const * argv, int &i) = 0; + virtual void print_default(std::ostream&) const {} + + friend std::ostream& operator<<(std::ostream& os, const option& op) { + os << " " << op.short_; + if (!op.var_.empty()) os << " " << op.var_; + os << ", " << op.long_; + if (!op.var_.empty()) os << "=" << op.var_; + os << std::endl << " " << op.desc_; + op.print_default(os); + return os; + } + + protected: + std::string short_, long_, desc_, var_; + }; + + template <class T> + class option_value : public option { + public: + option_value(T& value, char s, const std::string& l, const std::string& d, const std::string& v) : + option(s, l, d, v), value_(value) {} + + bool parse(int argc, char const * const * argv, int &i) { + std::string arg(argv[i]); + if (arg == short_ || arg == long_) { + if (i < argc-1) { + set_value(arg, argv[++i]); + return true; + } else { + throw bad_option("missing value for " + arg); + } + } + if (arg.compare(0, long_.size(), long_) == 0 && arg[long_.size()] == '=' ) { + set_value(long_, arg.substr(long_.size()+1)); + return true; + } + return false; + } + + virtual void print_default(std::ostream& os) const { os << " (default " << value_ << ")"; } + + void set_value(const std::string& opt, const std::string& s) { + std::istringstream is(s); + is >> value_; + if (is.fail() || is.bad()) + throw bad_option("bad value for " + opt + ": " + s); + } + + private: + T& value_; + }; + + class option_flag: public option { + public: + option_flag(bool& flag, const char s, const std::string& l, const std::string& d) : + option(s, l, d, ""), flag_(flag) + { flag_ = false; } + + bool parse(int /*argc*/, char const * const * argv, int &i) { + if (argv[i] == short_ || argv[i] == long_) { + flag_ = true; + return true; + } else { + return false; + } + } + + private: + bool &flag_; + }; + + typedef std::vector<option*> opts; + + int argc_; + char const * const * argv_; + std::string prog_; + opts opts_; + bool help_; +}; +} + +#endif // OPTIONS_HPP http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/queue_browser.cpp ---------------------------------------------------------------------- diff --git a/cpp/examples/queue_browser.cpp b/cpp/examples/queue_browser.cpp new file mode 100644 index 0000000..b306e76 --- /dev/null +++ b/cpp/examples/queue_browser.cpp @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/connection.hpp> +#include <proton/container.hpp> +#include <proton/delivery.hpp> +#include <proton/message.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/receiver_options.hpp> +#include <proton/source_options.hpp> + +#include <iostream> + +#include "fake_cpp11.hpp" + +class queue_browser : public proton::messaging_handler { + std::string conn_url_; + std::string addr_; + + public: + queue_browser(const std::string& u, const std::string& a) : + conn_url_(u), addr_(a) {} + + void on_container_start(proton::container& c) OVERRIDE { + proton::receiver_options ropts; + proton::source_options sopts; + ropts.source(sopts.distribution_mode(proton::source::COPY)); + + proton::connection conn = c.connect(conn_url_); + conn.open_receiver(addr_, ropts); + } + + void on_message(proton::delivery&, proton::message& m) OVERRIDE { + std::cout << m.body() << std::endl; + } +}; + +int main(int argc, char** argv) { + try { + std::string conn_url = argc > 1 ? argv[1] : "//127.0.0.1:5672"; + std::string addr = argc > 2 ? argv[2] : "examples"; + + queue_browser qb(conn_url, addr); + proton::container(qb).run(); + + return 0; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/reconnect_client.cpp ---------------------------------------------------------------------- diff --git a/cpp/examples/reconnect_client.cpp b/cpp/examples/reconnect_client.cpp new file mode 100644 index 0000000..ed93214 --- /dev/null +++ b/cpp/examples/reconnect_client.cpp @@ -0,0 +1,143 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include <proton/connection.hpp> +#include <proton/connection_options.hpp> +#include <proton/container.hpp> +#include <proton/delivery.hpp> +#include <proton/link.hpp> +#include <proton/message.hpp> +#include <proton/message_id.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/reconnect_options.hpp> +#include <proton/value.hpp> +#include <proton/types.hpp> + +#include <iostream> +#include <map> +#include <string> + +#include "fake_cpp11.hpp" + +class reconnect_client : public proton::messaging_handler { + std::string url; + std::string address; + std::vector<std::string> failovers; + proton::sender sender; + int sent; + int expected; + int received; + + public: + reconnect_client(const std::string &u, const std::string& a, int c, const std::vector<std::string>& f) : + url(u), address(a), failovers(f), sent(0), expected(c), received(0) {} + + private: + void on_container_start(proton::container &c) OVERRIDE { + proton::connection_options co; + proton::reconnect_options ro; + + ro.failover_urls(failovers); + co.reconnect(ro); + c.connect(url, co); + } + + void on_connection_open(proton::connection & c) OVERRIDE { + c.open_receiver(address); + c.open_sender(address); + // reconnect we probably lost the last message sent + sent = received; + std::cout << "simple_recv listening on " << url << std::endl; + } + + void on_message(proton::delivery &d, proton::message &msg) OVERRIDE { + if (proton::coerce<int>(msg.id()) < received) { + return; // Ignore duplicate + } + + if (expected == 0 || received < expected) { + std::cout << msg.body() << std::endl; + received++; + + if (received == expected) { + d.receiver().close(); + sender.close(); + d.connection().close(); + } else { + // See if we can send any messages now + send(sender); + } + } + } + + void send(proton::sender& s) { + // Only send with credit and only allow one outstanding message + while (s.credit() && sent < received+1) { + std::map<std::string, int> m; + m["sequence"] = sent + 1; + + proton::message msg; + msg.id(sent + 1); + msg.body(m); + + std::cout << "Sending: " << sent+1 << std::endl; + s.send(msg); + sent++; + } + } + + void on_sender_open(proton::sender & s) OVERRIDE { + sender = s; + } + + void on_sendable(proton::sender &s) OVERRIDE { + send(s); + } +}; + +int main(int argc, const char** argv) { + try { + if (argc < 4) { + std ::cerr << + "Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT FAILOVER-URL...\n" + "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n" + "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n" + "MESSAGE-COUNT: number of messages to receive\n" + "FAILOVER_URL...: zero or more failover urls\n"; + return 1; + } + const char *url = argv[1]; + const char *address = argv[2]; + int message_count = atoi(argv[3]); + std::vector<std::string> failovers(&argv[4], &argv[argc]); + + reconnect_client client(url, address, message_count, failovers); + proton::container(client).run(); + + return 0; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/scheduled_send.cpp ---------------------------------------------------------------------- diff --git a/cpp/examples/scheduled_send.cpp b/cpp/examples/scheduled_send.cpp new file mode 100644 index 0000000..3244540 --- /dev/null +++ b/cpp/examples/scheduled_send.cpp @@ -0,0 +1,122 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include <proton/container.hpp> +#include <proton/connection.hpp> +#include <proton/message.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/sender.hpp> +#include <proton/tracker.hpp> +#include <proton/work_queue.hpp> + +#include <iostream> + +#include "fake_cpp11.hpp" + +// Send messages at a constant rate one per interval. cancel after a timeout. +class scheduled_sender : public proton::messaging_handler { + private: + std::string url; + proton::sender sender; + proton::duration interval, timeout; + proton::work_queue* work_queue; + bool ready, canceled; + + public: + + scheduled_sender(const std::string &s, double d, double t) : + url(s), + interval(int(d*proton::duration::SECOND.milliseconds())), // Send interval. + timeout(int(t*proton::duration::SECOND.milliseconds())), // Cancel after timeout. + work_queue(0), + ready(true), // Ready to send. + canceled(false) // Canceled. + {} + + // The awkward looking double lambda is necessary because the scheduled lambdas run in the container context + // and must arrange lambdas for send and close to happen in the connection context. + void on_container_start(proton::container &c) OVERRIDE { + c.open_sender(url); + } + + void on_sender_open(proton::sender &s) OVERRIDE { + sender = s; + work_queue = &s.work_queue(); + // Call this->cancel after timeout. + s.container().schedule(timeout, [this]() { this->work_queue->add( [this]() { this->cancel(); }); }); + // Start regular ticks every interval. + s.container().schedule(interval, [this]() { this->work_queue->add( [this]() { this->tick(); }); }); + } + + void cancel() { + canceled = true; + sender.connection().close(); + } + + void tick() { + // Schedule the next tick unless we have been cancelled. + if (!canceled) + sender.container().schedule(interval, [this]() { this->work_queue->add( [this]() { this->tick(); }); }); + if (sender.credit() > 0) // Only send if we have credit + send(); + else + ready = true; // Set the ready flag, send as soon as we get credit. + } + + void on_sendable(proton::sender &) OVERRIDE { + if (ready) // We have been ticked since the last send. + send(); + } + + void send() { + std::cout << "send" << std::endl; + sender.send(proton::message("ping")); + ready = false; + } +}; + + +int main(int argc, char **argv) { + std::string address("127.0.0.1:5672/examples"); + double interval = 1.0; + double timeout = 5.0; + + example::options opts(argc, argv); + + opts.add_value(address, 'a', "address", "connect and send to URL", "URL"); + opts.add_value(interval, 'i', "interval", "send a message every INTERVAL seconds", "INTERVAL"); + opts.add_value(timeout, 't', "timeout", "stop after T seconds", "T"); + + try { + opts.parse(); + scheduled_sender h(address, interval, timeout); + proton::container(h).run(); + return 0; + } catch (const example::bad_option& e) { + std::cout << opts << std::endl << e.what() << std::endl; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/scheduled_send_03.cpp ---------------------------------------------------------------------- diff --git a/cpp/examples/scheduled_send_03.cpp b/cpp/examples/scheduled_send_03.cpp new file mode 100644 index 0000000..9050429 --- /dev/null +++ b/cpp/examples/scheduled_send_03.cpp @@ -0,0 +1,118 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include <proton/container.hpp> +#include <proton/connection.hpp> +#include <proton/duration.hpp> +#include <proton/message.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/sender.hpp> +#include <proton/tracker.hpp> +#include <proton/work_queue.hpp> + +#include <iostream> + +#include "fake_cpp11.hpp" + +// Send messages at a constant rate one per interval. cancel after a timeout. +// This example uses only C++03 features. +class scheduled_sender : public proton::messaging_handler { + private: + std::string url; + proton::duration interval, timeout; + proton::work_queue *work_queue; + bool ready, canceled; + + public: + scheduled_sender(const std::string &s, double d, double t) : + url(s), + interval(int(d*proton::duration::SECOND.milliseconds())), // Send interval. + timeout(int(t*proton::duration::SECOND.milliseconds())), // Cancel after timeout. + work_queue(0), + ready(true), // Ready to send. + canceled(false) // Canceled. + {} + + void on_container_start(proton::container &c) OVERRIDE { + c.open_sender(url); + } + + void on_sender_open(proton::sender & s) OVERRIDE { + work_queue = &s.work_queue(); + + work_queue->schedule(timeout, make_work(&scheduled_sender::cancel, this, s)); + work_queue->schedule(interval, make_work(&scheduled_sender::tick, this, s)); + } + + void cancel(proton::sender sender) { + canceled = true; + sender.connection().close(); + } + + void tick(proton::sender sender) { + if (!canceled) { + work_queue->schedule(interval, make_work(&scheduled_sender::tick, this, sender)); // Next tick + if (sender.credit() > 0) // Only send if we have credit + send(sender); + else + ready = true; // Set the ready flag, send as soon as we get credit. + } + } + + void on_sendable(proton::sender &sender) OVERRIDE { + if (ready) // We have been ticked since the last send. + send(sender); + } + + void send(proton::sender& sender) { + std::cout << "send" << std::endl; + sender.send(proton::message("ping")); + ready = false; + } +}; + + +int main(int argc, char **argv) { + std::string address("127.0.0.1:5672/examples"); + double interval = 1.0; + double timeout = 5.0; + + example::options opts(argc, argv); + + opts.add_value(address, 'a', "address", "connect and send to URL", "URL"); + opts.add_value(interval, 'i', "interval", "send a message every INTERVAL seconds", "INTERVAL"); + opts.add_value(timeout, 't', "timeout", "stop after T seconds", "T"); + + try { + opts.parse(); + scheduled_sender h(address, interval, timeout); + proton::container(h).run(); + return 0; + } catch (const example::bad_option& e) { + std::cout << opts << std::endl << e.what() << std::endl; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/selected_recv.cpp ---------------------------------------------------------------------- diff --git a/cpp/examples/selected_recv.cpp b/cpp/examples/selected_recv.cpp new file mode 100644 index 0000000..a7f9cea --- /dev/null +++ b/cpp/examples/selected_recv.cpp @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/connection.hpp> +#include <proton/container.hpp> +#include <proton/message.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/receiver_options.hpp> +#include <proton/source_options.hpp> + +#include <iostream> + +#include "fake_cpp11.hpp" + +namespace { + + // Example custom function to configure an AMQP filter, + // specifically an APACHE.ORG:SELECTOR + // (http://www.amqp.org/specification/1.0/filters) + + void set_filter(proton::source_options &opts, const std::string& selector_str) { + proton::source::filter_map map; + proton::symbol filter_key("selector"); + proton::value filter_value; + // The value is a specific AMQP "described type": binary string with symbolic descriptor + proton::codec::encoder enc(filter_value); + enc << proton::codec::start::described() + << proton::symbol("apache.org:selector-filter:string") + << selector_str + << proton::codec::finish(); + // In our case, the map has this one element + map.put(filter_key, filter_value); + opts.filters(map); + } +} + + +class selected_recv : public proton::messaging_handler { + std::string conn_url_; + std::string addr_; + + public: + selected_recv(const std::string& u, const std::string& a) : + conn_url_(u), addr_(a) {} + + void on_container_start(proton::container &c) OVERRIDE { + proton::source_options opts; + set_filter(opts, "colour = 'green'"); + proton::connection conn = c.connect(conn_url_); + conn.open_receiver(addr_, proton::receiver_options().source(opts)); + } + + void on_message(proton::delivery &, proton::message &m) OVERRIDE { + std::cout << m.body() << std::endl; + } +}; + +int main(int argc, char **argv) { + try { + std::string conn_url = argc > 1 ? argv[1] : "//127.0.0.1:5672"; + std::string addr = argc > 2 ? argv[2] : "examples"; + + selected_recv recv(conn_url, addr); + proton::container(recv).run(); + + return 0; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/server.cpp ---------------------------------------------------------------------- diff --git a/cpp/examples/server.cpp b/cpp/examples/server.cpp new file mode 100644 index 0000000..8e177df --- /dev/null +++ b/cpp/examples/server.cpp @@ -0,0 +1,95 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/connection.hpp> +#include <proton/container.hpp> +#include <proton/message.hpp> +#include <proton/message_id.hpp> +#include <proton/messaging_handler.hpp> + +#include <iostream> +#include <map> +#include <string> +#include <cctype> + +#include "fake_cpp11.hpp" + +class server : public proton::messaging_handler { + std::string conn_url_; + std::string addr_; + proton::connection conn_; + std::map<std::string, proton::sender> senders_; + + public: + server(const std::string& u, const std::string& a) : + conn_url_(u), addr_(a) {} + + void on_container_start(proton::container& c) OVERRIDE { + conn_ = c.connect(conn_url_); + conn_.open_receiver(addr_); + + std::cout << "Server connected to " << conn_url_ << std::endl; + } + + std::string to_upper(const std::string& s) { + std::string uc(s); + size_t l = uc.size(); + + for (size_t i=0; i<l; i++) { + uc[i] = static_cast<char>(std::toupper(uc[i])); + } + + return uc; + } + + void on_message(proton::delivery&, proton::message& m) OVERRIDE { + std::cout << "Received " << m.body() << std::endl; + + std::string reply_to = m.reply_to(); + proton::message reply; + + reply.to(reply_to); + reply.body(to_upper(proton::get<std::string>(m.body()))); + reply.correlation_id(m.correlation_id()); + + if (!senders_[reply_to]) { + senders_[reply_to] = conn_.open_sender(reply_to); + } + + senders_[reply_to].send(reply); + } +}; + +int main(int argc, char** argv) { + try { + std::string conn_url = argc > 1 ? argv[1] : "//127.0.0.1:5672"; + std::string addr = argc > 2 ? argv[2] : "examples"; + + server srv(conn_url, addr); + proton::container(srv).run(); + + return 0; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/server_direct.cpp ---------------------------------------------------------------------- diff --git a/cpp/examples/server_direct.cpp b/cpp/examples/server_direct.cpp new file mode 100644 index 0000000..d46fc29 --- /dev/null +++ b/cpp/examples/server_direct.cpp @@ -0,0 +1,130 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include <proton/container.hpp> +#include <proton/listen_handler.hpp> +#include <proton/listener.hpp> +#include <proton/message.hpp> +#include <proton/message_id.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/sender.hpp> +#include <proton/sender_options.hpp> +#include <proton/source_options.hpp> +#include <proton/tracker.hpp> + +#include <iostream> +#include <map> +#include <string> +#include <sstream> +#include <cctype> + +#include "fake_cpp11.hpp" + +class server : public proton::messaging_handler { + private: + class listener_ready_handler : public proton::listen_handler { + void on_open(proton::listener& l) OVERRIDE { + std::cout << "listening on " << l.port() << std::endl; + } + }; + + typedef std::map<std::string, proton::sender> sender_map; + listener_ready_handler listen_handler; + std::string url; + sender_map senders; + int address_counter; + + public: + server(const std::string &u) : url(u), address_counter(0) {} + + void on_container_start(proton::container &c) OVERRIDE { + c.listen(url, listen_handler); + } + + std::string to_upper(const std::string &s) { + std::string uc(s); + size_t l = uc.size(); + + for (size_t i=0; i<l; i++) + uc[i] = static_cast<char>(std::toupper(uc[i])); + + return uc; + } + + std::string generate_address() { + std::ostringstream addr; + addr << "server" << address_counter++; + + return addr.str(); + } + + void on_sender_open(proton::sender &sender) OVERRIDE { + if (sender.source().dynamic()) { + std::string addr = generate_address(); + sender.open(proton::sender_options().source(proton::source_options().address(addr))); + senders[addr] = sender; + } + } + + void on_message(proton::delivery &, proton::message &m) OVERRIDE { + std::cout << "Received " << m.body() << std::endl; + + std::string reply_to = m.reply_to(); + sender_map::iterator it = senders.find(reply_to); + + if (it == senders.end()) { + std::cout << "No link for reply_to: " << reply_to << std::endl; + } else { + proton::sender sender = it->second; + proton::message reply; + + reply.to(reply_to); + reply.body(to_upper(proton::get<std::string>(m.body()))); + reply.correlation_id(m.correlation_id()); + + sender.send(reply); + } + } +}; + +int main(int argc, char **argv) { + std::string address("amqp://127.0.0.1:5672/examples"); + example::options opts(argc, argv); + + opts.add_value(address, 'a', "address", "listen on URL", "URL"); + + try { + opts.parse(); + + server srv(address); + proton::container(srv).run(); + + return 0; + } catch (const example::bad_option& e) { + std::cout << opts << std::endl << e.what() << std::endl; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/service_bus.cpp ---------------------------------------------------------------------- diff --git a/cpp/examples/service_bus.cpp b/cpp/examples/service_bus.cpp new file mode 100644 index 0000000..c99bca6 --- /dev/null +++ b/cpp/examples/service_bus.cpp @@ -0,0 +1,322 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * Service Bus example. + * + * This is an example of using "Service Bus sessions" (not the same thing as an + * AMQP session) to selectively retrieve messages from a queue. The queue must + * be configured within Service Bus to support sessions. Service Bus uses the + * AMQP group_id message property to associate messages with a particular + * Service Bus session. It uses AMQP filters to specify which session is + * associated with a receiver. + * + * The mechanics for sending and receiving to other types of service bus queue + * are broadly the same, as long as the step using the + * receiver.source().filters() is omitted. + * + * Other Service Bus notes: There is no drain support, hence the need to to use + * timeouts in this example to detect the end of the message stream. There is + * no browse support when setting the AMQP link distribution mode to COPY. + * Service Bus claims to support browsing, but it is unclear how to manage that + * with an AMQP client. Maximum message sizes (for body and headers) vary + * between queue types and fee tier ranging from 64KB to 1MB. Due to the + * distributed nature of Service Bus, queues do not automatically preserve FIFO + * order of messages unless the user takes steps to force the message stream to + * a single partition of the queue or creates the queue with partitioning disabled. + * + * This example shows use of the simpler SAS (Shared Access Signature) + * authentication scheme where the credentials are supplied on the connection. + * Service Bus does not actually check these credentials when setting up the + * connection, it merely caches the SAS key and policy (AKA key name) for later + * access authorization when creating senders and receivers. There is a second + * authentication scheme that allows for multiple tokens and even updating them + * within a long-lived connection which uses special management request-response + * queues in Service Bus. The format of this exchange may be documented + * somewhere but is also available by working through the CbsAsyncExample.cs + * program in the Amqp.Net Lite project. + * + * The sample output for this program is: + + sent message: message 0 in service bus session "red" + sent message: message 1 in service bus session "green" + sent message: message 2 in service bus session "blue" + sent message: message 3 in service bus session "red" + sent message: message 4 in service bus session "black" + sent message: message 5 in service bus session "blue" + sent message: message 6 in service bus session "yellow" +receiving messages with session identifier "green" from queue ses_q1 + received message: message 1 in service bus session "green" +receiving messages with session identifier "red" from queue ses_q1 + received message: message 0 in service bus session "red" + received message: message 3 in service bus session "red" +receiving messages with session identifier "blue" from queue ses_q1 + received message: message 2 in service bus session "blue" + received message: message 5 in service bus session "blue" +receiving messages with session identifier "black" from queue ses_q1 + received message: message 4 in service bus session "black" +receiving messages with session identifier "yellow" from queue ses_q1 + received message: message 6 in service bus session "yellow" +Done. No more messages. + + * + */ + +#include "options.hpp" + +#include <proton/connection.hpp> +#include <proton/connection_options.hpp> +#include <proton/container.hpp> +#include <proton/delivery.hpp> +#include <proton/message.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/receiver_options.hpp> +#include <proton/sender.hpp> +#include <proton/sender_options.hpp> +#include <proton/source_options.hpp> +#include <proton/tracker.hpp> +#include <proton/work_queue.hpp> + +#include <iostream> +#include <sstream> + +#include "fake_cpp11.hpp" + +using proton::source_options; +using proton::connection_options; +using proton::sender_options; +using proton::receiver_options; + +void do_next_sequence(); + +namespace { +void check_arg(const std::string &value, const std::string &name) { + if (value.empty()) + throw std::runtime_error("missing argument for \"" + name + "\""); +} +} + +/// Connect to Service Bus queue and retrieve messages in a particular session. +class session_receiver : public proton::messaging_handler { + private: + const std::string &connection_url; + const std::string &entity; + proton::value session_identifier; // AMQP null type by default, matches any Service Bus sequence identifier + int message_count; + bool closed; + proton::duration read_timeout; + proton::timestamp last_read; + proton::container *container; + proton::receiver receiver; + + public: + session_receiver(const std::string &c, const std::string &e, + const char *sid) : connection_url(c), entity(e), message_count(0), closed(false), read_timeout(5000), last_read(0), container(0) { + if (sid) + session_identifier = std::string(sid); + // session_identifier is now either empty/null or an AMQP string type. + // If null, Service Bus will pick the first available message and create + // a filter at its end with that message's session identifier. + // Technically, an AMQP string is not a valid filter-set value unless it + // is annotated as an AMQP described type, so this may change. + + } + + void run (proton::container &c) { + message_count = 0; + closed = false; + c.connect(connection_url, connection_options().handler(*this)); + container = &c; + } + + void on_connection_open(proton::connection &connection) OVERRIDE { + proton::source::filter_map sb_filter_map; + proton::symbol key("com.microsoft:session-filter"); + sb_filter_map.put(key, session_identifier); + receiver = connection.open_receiver(entity, receiver_options().source(source_options().filters(sb_filter_map))); + + // Start timeout processing here. If Service Bus has no pending + // messages, it may defer completing the receiver open until a message + // becomes available (e.g. to be able to set the actual session + // identifier if none was specified). + last_read = proton::timestamp::now(); + // Call this->process_timeout after read_timeout. + container->schedule(read_timeout, [this]() { this->process_timeout(); }); + } + + void on_receiver_open(proton::receiver &r) OVERRIDE { + if (closed) return; // PROTON-1264 + proton::value actual_session_id = r.source().filters().get("com.microsoft:session-filter"); + std::cout << "receiving messages with session identifier \"" << actual_session_id + << "\" from queue " << entity << std::endl; + last_read = proton::timestamp::now(); + } + + void on_message(proton::delivery &, proton::message &m) OVERRIDE { + message_count++; + std::cout << " received message: " << m.body() << std::endl; + last_read = proton::timestamp::now(); + } + + void process_timeout() { + proton::timestamp deadline = last_read + read_timeout; + proton::timestamp now = proton::timestamp::now(); + if (now >= deadline) { + receiver.close(); + closed = true; + receiver.connection().close(); + if (message_count) + do_next_sequence(); + else + std::cout << "Done. No more messages." << std::endl; + } else { + proton::duration next = deadline - now; + container->schedule(next, [this]() { this->process_timeout(); }); + } + } +}; + + +/// Connect to Service Bus queue and send messages divided into different sessions. +class session_sender : public proton::messaging_handler { + private: + const std::string &connection_url; + const std::string &entity; + int msg_count; + int total; + int accepts; + + public: + session_sender(const std::string &c, const std::string &e) : connection_url(c), entity(e), + msg_count(0), total(7), accepts(0) {} + + void run(proton::container &c) { + c.open_sender(connection_url + "/" + entity, sender_options(), connection_options().handler(*this)); + } + + void send_remaining_messages(proton::sender &s) { + std::string gid; + for (; msg_count < total && s.credit() > 0; msg_count++) { + switch (msg_count) { + case 0: gid = "red"; break; + case 1: gid = "green"; break; + case 2: gid = "blue"; break; + case 3: gid = "red"; break; + case 4: gid = "black"; break; + case 5: gid = "blue"; break; + case 6: gid = "yellow"; break; + } + + std::ostringstream mbody; + mbody << "message " << msg_count << " in service bus session \"" << gid << "\""; + proton::message m(mbody.str()); + m.group_id(gid); // Service Bus uses the group_id property to as the session identifier. + s.send(m); + std::cout << " sent message: " << m.body() << std::endl; + } + } + + void on_sendable(proton::sender &s) OVERRIDE { + send_remaining_messages(s); + } + + void on_tracker_accept(proton::tracker &t) OVERRIDE { + accepts++; + if (accepts == total) { + // upload complete + t.sender().close(); + t.sender().connection().close(); + do_next_sequence(); + } + } +}; + + +/// Orchestrate the sequential actions of sending and receiving session-based messages. +class sequence : public proton::messaging_handler { + private: + proton::container *container; + int sequence_no; + session_sender snd; + session_receiver rcv_red, rcv_green, rcv_null; + + public: + static sequence *the_sequence; + + sequence (const std::string &c, const std::string &e) : + container(0), sequence_no(0), + snd(c, e), rcv_red(c, e, "red"), rcv_green(c, e, "green"), rcv_null(c, e, NULL) { + the_sequence = this; + } + + void on_container_start(proton::container &c) OVERRIDE { + container = &c; + next_sequence(); + } + + void next_sequence() { + switch (sequence_no++) { + // run these in order exactly once + case 0: snd.run(*container); break; + case 1: rcv_green.run(*container); break; + case 2: rcv_red.run(*container); break; + // Run this until the receiver decides there is no messages left to sequence through + default: rcv_null.run(*container); break; + } + } +}; + +sequence *sequence::the_sequence = NULL; + +void do_next_sequence() { sequence::the_sequence->next_sequence(); } + + +int main(int argc, char **argv) { + std::string sb_namespace; // i.e. "foo.servicebus.windows.net" + // Make sure the next two are urlencoded for Proton + std::string sb_key_name; // shared access key name for entity (AKA "Policy Name") + std::string sb_key; // shared access key + std::string sb_entity; // AKA the service bus queue. Must enable + // sessions on it for this example. + + example::options opts(argc, argv); + opts.add_value(sb_namespace, 'n', "namespace", "Service Bus full namespace", "NAMESPACE"); + opts.add_value(sb_key_name, 'p', "policy", "policy name that specifies access rights (key name)", "POLICY"); + opts.add_value(sb_key, 'k', "key", "secret key for the policy", "key"); + opts.add_value(sb_entity, 'e', "entity", "entity path (queue name)", "ENTITY"); + + try { + opts.parse(); + check_arg(sb_namespace, "namespace"); + check_arg(sb_key_name, "policy"); + check_arg(sb_key, "key"); + check_arg(sb_entity, "entity"); + std::string connection_string("amqps://" + sb_key_name + ":" + sb_key + "@" + sb_namespace); + + sequence seq(connection_string, sb_entity); + proton::container(seq).run(); + return 0; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/simple_connect.cpp ---------------------------------------------------------------------- diff --git a/cpp/examples/simple_connect.cpp b/cpp/examples/simple_connect.cpp new file mode 100644 index 0000000..74a8c87 --- /dev/null +++ b/cpp/examples/simple_connect.cpp @@ -0,0 +1,111 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include <proton/connection.hpp> +#include <proton/connection_options.hpp> +#include <proton/container.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/reconnect_options.hpp> + +#include <iostream> + +#include "fake_cpp11.hpp" + +class simple_connect : public proton::messaging_handler { + private: + std::string url; + std::string user; + std::string password; + bool reconnect; + bool sasl; + std::string mechs; + bool insecure; + proton::connection connection; + + public: + simple_connect(const std::string &a, const std::string &u, const std::string &p, + bool r, bool s, const std::string& ms, bool in) : + url(a), user(u), password(p), + reconnect(r), sasl(s), mechs(ms), insecure(in) {} + + void on_container_start(proton::container &c) OVERRIDE { + proton::connection_options co; + if (!user.empty()) co.user(user); + if (!password.empty()) co.password(password); + if (reconnect) co.reconnect(proton::reconnect_options()); + if (sasl) co.sasl_enabled(true); + // + // NB: We only set sasl options if they are not default to avoid + // forcing SASL negotiation on when it's not needed. + // + // This is because the SASL negotiation is turned off unless + // it is needed. Setting a username/password or any SASL option will + // force the SASL negotiation to be turned on. + // + if (!mechs.empty()) co.sasl_allowed_mechs(mechs); + if (insecure) co.sasl_allow_insecure_mechs(true); + connection = c.connect(url, co); + } + + void on_connection_open(proton::connection &c) OVERRIDE { + c.close(); + } + + void on_error(const proton::error_condition& e) OVERRIDE { + throw std::runtime_error(e.what()); + } +}; + +int main(int argc, char **argv) { + std::string address("127.0.0.1:5672/examples"); + std::string user; + std::string password; + bool reconnect = false; + bool sasl = false; + std::string mechs; + bool insecure = false; + example::options opts(argc, argv); + + opts.add_value(address, 'a', "address", "connect and send to URL", "URL"); + opts.add_value(user, 'u', "user", "authenticate as USER", "USER"); + opts.add_value(password, 'p', "password", "authenticate with PASSWORD", "PASSWORD"); + opts.add_flag(reconnect, 'r', "reconnect", "reconnect on connection failure"); + opts.add_flag(sasl,'s', "sasl", "force SASL authentication with no user specified (Use for Kerberos/GSSAPI)"); + opts.add_value(mechs, 'm', "mechs", "allowed SASL mechanisms", "MECHS"); + opts.add_flag(insecure, 'i', "insecure", "allow clear-text passwords"); + + try { + opts.parse(); + + simple_connect connect(address, user, password, reconnect, sasl, mechs, insecure); + proton::container(connect).run(); + + return 0; + } catch (const example::bad_option& e) { + std::cout << opts << std::endl << e.what() << std::endl; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/simple_recv.cpp ---------------------------------------------------------------------- diff --git a/cpp/examples/simple_recv.cpp b/cpp/examples/simple_recv.cpp new file mode 100644 index 0000000..5a7cde4 --- /dev/null +++ b/cpp/examples/simple_recv.cpp @@ -0,0 +1,103 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include <proton/connection.hpp> +#include <proton/connection_options.hpp> +#include <proton/container.hpp> +#include <proton/delivery.hpp> +#include <proton/link.hpp> +#include <proton/message.hpp> +#include <proton/message_id.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/value.hpp> + +#include <iostream> +#include <map> + +#include "fake_cpp11.hpp" + +class simple_recv : public proton::messaging_handler { + private: + std::string url; + std::string user; + std::string password; + proton::receiver receiver; + int expected; + int received; + + public: + simple_recv(const std::string &s, const std::string &u, const std::string &p, int c) : + url(s), user(u), password(p), expected(c), received(0) {} + + void on_container_start(proton::container &c) OVERRIDE { + proton::connection_options co; + if (!user.empty()) co.user(user); + if (!password.empty()) co.password(password); + receiver = c.open_receiver(url, co); + } + + void on_message(proton::delivery &d, proton::message &msg) OVERRIDE { + if (proton::coerce<int>(msg.id()) < received) { + return; // Ignore duplicate + } + + if (expected == 0 || received < expected) { + std::cout << msg.body() << std::endl; + received++; + + if (received == expected) { + d.receiver().close(); + d.connection().close(); + } + } + } +}; + +int main(int argc, char **argv) { + std::string address("127.0.0.1:5672/examples"); + std::string user; + std::string password; + int message_count = 100; + example::options opts(argc, argv); + + opts.add_value(address, 'a', "address", "connect to and receive from URL", "URL"); + opts.add_value(message_count, 'm', "messages", "receive COUNT messages", "COUNT"); + opts.add_value(user, 'u', "user", "authenticate as USER", "USER"); + opts.add_value(password, 'p', "password", "authenticate with PASSWORD", "PASSWORD"); + + + try { + opts.parse(); + + simple_recv recv(address, user, password, message_count); + proton::container(recv).run(); + + return 0; + } catch (const example::bad_option& e) { + std::cout << opts << std::endl << e.what() << std::endl; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org