Repository: qpid-proton Updated Branches: refs/heads/master ffacd0f69 -> d24bd64b9
PROTON-1267: Service Bus example Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d24bd64b Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d24bd64b Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d24bd64b Branch: refs/heads/master Commit: d24bd64b9797e4a9a9400004ffe13481db6e6752 Parents: ffacd0f Author: Clifford Jansen <[email protected]> Authored: Fri Jul 22 07:40:35 2016 -0700 Committer: Clifford Jansen <[email protected]> Committed: Fri Jul 22 07:40:35 2016 -0700 ---------------------------------------------------------------------- examples/cpp/CMakeLists.txt | 1 + examples/cpp/README.dox | 7 + examples/cpp/service_bus.cpp | 329 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 337 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d24bd64b/examples/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index 5ec2889..017ef3f 100644 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -53,6 +53,7 @@ foreach(example flow_control ssl ssl_client_cert + service_bus encode_decode) add_executable(${example} ${example}.cpp) endforeach() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d24bd64b/examples/cpp/README.dox ---------------------------------------------------------------------- diff --git a/examples/cpp/README.dox b/examples/cpp/README.dox index be088be..4d28293 100644 --- a/examples/cpp/README.dox +++ b/examples/cpp/README.dox @@ -154,3 +154,10 @@ C++03 compatible way. See @ref schedule_send.cpp for a more convenient approach using std::function if you have C++11. */ + +/** @example service_bus.cpp + +A working example for accessing Service Bus session-enabled queues. +Also provides some general notes on Service Bus usage. + +*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d24bd64b/examples/cpp/service_bus.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/service_bus.cpp b/examples/cpp/service_bus.cpp new file mode 100644 index 0000000..ae6accc --- /dev/null +++ b/examples/cpp/service_bus.cpp @@ -0,0 +1,329 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * Service Bus example. + * + * This is an example of using "Service Bus sessions" (not the same thing as an + * AMQP session) to selectively retrieve messages from a queue. The queue must + * be configured within Service Bus to support sessions. Service Bus uses the + * AMQP group_id message property to associate messages with a particular + * Service Bus session. It uses AMQP filters to specify which session is + * associated with a receiver. + * + * The mechanics for sending and receiving to other types of service bus queue + * are broadly the same, as long as the step using the + * receiver.source().filters() is omitted. + * + * Other Service Bus notes: There is no drain support, hence the need to to use + * timeouts in this example to detect the end of the message stream. There is + * no browse support when setting the AMQP link distribution mode to COPY. + * Service Bus claims to support browsing, but it is unclear how to manage that + * with an AMQP client. Maximum message sizes (for body and headers) vary + * between queue types and fee tier ranging from 64KB to 1MB. Due to the + * distributed nature of Service Bus, queues do not automatically preserve FIFO + * order of messages unless the user takes steps to force the message stream to + * a single partition of the queue or creates the queue with partitioning disabled. + * + * This example shows use of the simpler SAS (Shared Access Signature) + * authentication scheme where the credentials are supplied on the connection. + * Service Bus does not actually check these credentials when setting up the + * connection, it merely caches the SAS key and policy (AKA key name) for later + * access authorization when creating senders and receivers. There is a second + * authentication scheme that allows for multiple tokens and even updating them + * within a long-lived connection which uses special management request-response + * queues in Service Bus. The format of this exchange may be documented + * somewhere but is also available by working through the CbsAsyncExample.cs + * program in the Amqp.Net Lite project. + * + * The sample output for this program is: + + sent message: message 0 in service bus session "red" + sent message: message 1 in service bus session "green" + sent message: message 2 in service bus session "blue" + sent message: message 3 in service bus session "red" + sent message: message 4 in service bus session "black" + sent message: message 5 in service bus session "blue" + sent message: message 6 in service bus session "yellow" +receiving messages with session identifier "green" from queue ses_q1 + received message: message 1 in service bus session "green" +receiving messages with session identifier "red" from queue ses_q1 + received message: message 0 in service bus session "red" + received message: message 3 in service bus session "red" +receiving messages with session identifier "blue" from queue ses_q1 + received message: message 2 in service bus session "blue" + received message: message 5 in service bus session "blue" +receiving messages with session identifier "black" from queue ses_q1 + received message: message 4 in service bus session "black" +receiving messages with session identifier "yellow" from queue ses_q1 + received message: message 6 in service bus session "yellow" +Done. No more messages. + + * + */ + +#include "options.hpp" + +#include <proton/connection.hpp> +#include <proton/connection_options.hpp> +#include <proton/default_container.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/sender.hpp> +#include <proton/tracker.hpp> +#include <proton/delivery.hpp> +#include <proton/url.hpp> +#include <proton/source_options.hpp> + +#include <iostream> +#include <sstream> + +#include "fake_cpp11.hpp" + +using proton::source_options; +using proton::connection_options; +using proton::sender_options; +using proton::receiver_options; + +void do_next_sequence(); + +void check_arg(const std::string &value, const std::string &name) { + if (value.empty()) + throw std::runtime_error("missing argument for \"" + name + "\""); +} + + +/// Connect to Service Bus queue and retrieve messages in a particular session. +class session_receiver : public proton::messaging_handler { + private: + const std::string &connection_url; + const std::string &entity; + proton::value session_identifier; // AMQP null type by default, matches any Service Bus sequence identifier + int message_count; + bool closed; + proton::duration read_timeout; + proton::timestamp last_read; + proton::container *container; + proton::receiver receiver; + + + struct process_timeout_fn : public proton::void_function0 { + session_receiver& parent; + process_timeout_fn(session_receiver& sr) : parent(sr) {} + void operator()() { parent.process_timeout(); } + }; + + process_timeout_fn do_process_timeout; + + + public: + session_receiver(const std::string &c, const std::string &e, + const char *sid) : connection_url(c), entity(e), message_count(0), closed(false), read_timeout(5000), + last_read(0), container(0), do_process_timeout(*this) { + if (sid) + session_identifier = std::string(sid); + // session_identifier is now either empty/null or an AMQP string type. + // If null, Service Bus will pick the first available message and create + // a filter at its end with that message's session identifier. + // Technically, an AMQP string is not a valid filter-set value unless it + // is annotated as an AMQP described type, so this may change. + + } + + void run (proton::container &c) { + message_count = 0; + closed = false; + c.connect(connection_url, connection_options().handler(*this)); + container = &c; + } + + void on_connection_open(proton::connection &connection) OVERRIDE { + proton::source::filter_map sb_filter_map; + proton::symbol key("com.microsoft:session-filter"); + sb_filter_map.put(key, session_identifier); + receiver = connection.open_receiver(entity, receiver_options().source(source_options().filters(sb_filter_map))); + + // Start timeout processing here. If Service Bus has no pending + // messages, it may defer completing the receiver open until a message + // becomes available (e.g. to be able to set the actual session + // identifier if none was specified). + last_read = proton::timestamp::now(); + // Call this->process_timeout after read_timeout. + container->schedule(read_timeout, do_process_timeout); + } + + void on_receiver_open(proton::receiver &r) OVERRIDE { + if (closed) return; // PROTON-1264 + proton::value actual_session_id = r.source().filters().get("com.microsoft:session-filter"); + std::cout << "receiving messages with session identifier \"" << actual_session_id + << "\" from queue " << entity << std::endl; + last_read = proton::timestamp::now(); + } + + void on_message(proton::delivery &d, proton::message &m) OVERRIDE { + message_count++; + std::cout << " received message: " << m.body() << std::endl; + last_read = proton::timestamp::now(); + } + + void process_timeout() { + proton::timestamp deadline = last_read + read_timeout; + proton::timestamp now = proton::timestamp::now(); + if (now >= deadline) { + receiver.close(); + closed = true; + receiver.connection().close(); + if (message_count) + do_next_sequence(); + else + std::cout << "Done. No more messages." << std::endl; + } else { + proton::duration next = deadline - now; + container->schedule(next, do_process_timeout); + } + } +}; + + +/// Connect to Service Bus queue and send messages divided into different sessions. +class session_sender : public proton::messaging_handler { + private: + const std::string &connection_url; + const std::string &entity; + int msg_count; + int total; + int accepts; + + public: + session_sender(const std::string &c, const std::string &e) : connection_url(c), entity(e), + msg_count(0), total(7), accepts(0) {} + + void run(proton::container &c) { + c.open_sender(connection_url + "/" + entity, sender_options(), connection_options().handler(*this)); + } + + void send_remaining_messages(proton::sender &s) { + std::string gid; + for (; msg_count < total && s.credit() > 0; msg_count++) { + switch (msg_count) { + case 0: gid = "red"; break; + case 1: gid = "green"; break; + case 2: gid = "blue"; break; + case 3: gid = "red"; break; + case 4: gid = "black"; break; + case 5: gid = "blue"; break; + case 6: gid = "yellow"; break; + } + + std::ostringstream mbody; + mbody << "message " << msg_count << " in service bus session \"" << gid << "\""; + proton::message m(mbody.str()); + m.group_id(gid); // Service Bus uses the group_id property to as the session identifier. + s.send(m); + std::cout << " sent message: " << m.body() << std::endl; + } + } + + void on_sendable(proton::sender &s) OVERRIDE { + send_remaining_messages(s); + } + + void on_tracker_accept(proton::tracker &t) { + accepts++; + if (accepts == total) { + // upload complete + t.sender().close(); + t.sender().connection().close(); + do_next_sequence(); + } + } +}; + + +/// Orchestrate the sequential actions of sending and receiving session-based messages. +class sequence : public proton::messaging_handler { + private: + const std::string &connection_url, &entity; + proton::container *container; + int sequence_no; + session_sender snd; + session_receiver rcv_red, rcv_green, rcv_null; + + public: + static sequence *the_sequence; + + sequence (const std::string &c, const std::string &e) : connection_url(c), entity(e), sequence_no(0), + snd(c, e), rcv_red(c, e, "red"), rcv_green(c, e, "green"), rcv_null(c, e, NULL) { + the_sequence = this; + } + + void on_container_start(proton::container &c) OVERRIDE { + container = &c; + next_sequence(); + } + + void next_sequence() { + switch (sequence_no++) { + // run these in order exactly once + case 0: snd.run(*container); break; + case 1: rcv_green.run(*container); break; + case 2: rcv_red.run(*container); break; + // Run this until the receiver decides there is no messages left to sequence through + default: rcv_null.run(*container); break; + } + } +}; + +sequence *sequence::the_sequence = NULL; + +void do_next_sequence() { sequence::the_sequence->next_sequence(); } + + +int main(int argc, char **argv) { + std::string sb_namespace; // i.e. "foo.servicebus.windows.net" + // Make sure the next two are urlencoded for Proton + std::string sb_key_name; // shared access key name for entity (AKA "Policy Name") + std::string sb_key; // shared access key + std::string sb_entity; // AKA the service bus queue. Must enable + // sessions on it for this example. + + example::options opts(argc, argv); + opts.add_value(sb_namespace, 'n', "namespace", "Service Bus full namespace", "NAMESPACE"); + opts.add_value(sb_key_name, 'p', "policy", "policy name that specifies access rights (key name)", "POLICY"); + opts.add_value(sb_key, 'k', "key", "secret key for the policy", "key"); + opts.add_value(sb_entity, 'e', "entity", "entity path (queue name)", "ENTITY"); + + try { + opts.parse(); + check_arg(sb_namespace, "namespace"); + check_arg(sb_key_name, "policy"); + check_arg(sb_key, "key"); + check_arg(sb_entity, "entity"); + std::string connection_string("amqps://" + sb_key_name + ":" + sb_key + "@" + sb_namespace); + + sequence seq(connection_string, sb_entity); + proton::default_container(seq).run(); + return 0; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
