Repository: qpid-proton Updated Branches: refs/heads/master 2b5cfc817 -> a22b17896
PROTON-1479: Fix scheduled_send examples to send and close in a safe context Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a22b1789 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a22b1789 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a22b1789 Branch: refs/heads/master Commit: a22b17896ab0bd0a065b1718a5bbbedc1eeca229 Parents: 2b5cfc8 Author: Andrew Stitcher <[email protected]> Authored: Wed May 3 23:42:56 2017 -0400 Committer: Andrew Stitcher <[email protected]> Committed: Fri May 12 18:34:42 2017 -0400 ---------------------------------------------------------------------- examples/cpp/scheduled_send.cpp | 11 ++++-- examples/cpp/scheduled_send_03.cpp | 62 +++++++++++++++++++++++---------- 2 files changed, 52 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a22b1789/examples/cpp/scheduled_send.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/scheduled_send.cpp b/examples/cpp/scheduled_send.cpp index ef6cd27..de04c3b 100644 --- a/examples/cpp/scheduled_send.cpp +++ b/examples/cpp/scheduled_send.cpp @@ -23,6 +23,7 @@ #include <proton/container.hpp> #include <proton/default_container.hpp> +#include <proton/event_loop.hpp> #include <proton/message.hpp> #include <proton/messaging_handler.hpp> #include <proton/sender.hpp> @@ -39,6 +40,7 @@ class scheduled_sender : public proton::messaging_handler { std::string url; proton::sender sender; proton::duration interval, timeout; + proton::event_loop* event_loop; bool ready, canceled; public: @@ -51,12 +53,15 @@ class scheduled_sender : public proton::messaging_handler { canceled(false) // Canceled. {} + // The awkward looking double lambda is necessary because the scheduled lambdas run in the container context + // and must arrange lambdas for send and close to happen in the connection context. void on_container_start(proton::container &c) OVERRIDE { sender = c.open_sender(url); + event_loop = &proton::make_thread_safe(sender).get()->event_loop(); // Call this->cancel after timeout. - c.schedule(timeout, [this]() { this->cancel(); }); + c.schedule(timeout, [this]() { this->event_loop->inject( [this]() { this->cancel(); }); }); // Start regular ticks every interval. - c.schedule(interval, [this]() { this->tick(); }); + c.schedule(interval, [this]() { this->event_loop->inject( [this]() { this->tick(); }); }); } void cancel() { @@ -67,7 +72,7 @@ class scheduled_sender : public proton::messaging_handler { void tick() { // Schedule the next tick unless we have been cancelled. if (!canceled) - sender.container().schedule(interval, [this]() { this->tick(); }); + sender.container().schedule(interval, [this]() { this->event_loop->inject( [this]() { this->tick(); }); }); if (sender.credit() > 0) // Only send if we have credit send(); else http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a22b1789/examples/cpp/scheduled_send_03.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/scheduled_send_03.cpp b/examples/cpp/scheduled_send_03.cpp index 92e5767..d106d29 100644 --- a/examples/cpp/scheduled_send_03.cpp +++ b/examples/cpp/scheduled_send_03.cpp @@ -25,6 +25,7 @@ #include <proton/connection.hpp> #include <proton/default_container.hpp> #include <proton/duration.hpp> +#include <proton/event_loop.hpp> #include <proton/function.hpp> #include <proton/message.hpp> #include <proton/messaging_handler.hpp> @@ -41,24 +42,42 @@ class scheduled_sender : public proton::messaging_handler { private: std::string url; - proton::sender sender; proton::duration interval, timeout; + proton::event_loop *event_loop; bool ready, canceled; + struct cancel_fn : public proton::void_function0 { + scheduled_sender* parent; + proton::sender sender; + cancel_fn(): parent(0) {} + cancel_fn(scheduled_sender& ss, proton::sender& s) : parent(&ss), sender(s) {} + void operator()() { if (parent) parent->cancel(sender); } + }; + struct tick_fn : public proton::void_function0 { + scheduled_sender* parent; + proton::sender sender; + tick_fn(): parent(0) {} + tick_fn(scheduled_sender& ss, proton::sender& s) : parent(&ss), sender(s) {} + void operator()() { if (parent) parent->tick(sender); } + }; + + struct defer_cancel_fn : public proton::void_function0 { scheduled_sender& parent; - tick_fn(scheduled_sender& ss) : parent(ss) {} - void operator()() { parent.tick(); } + defer_cancel_fn(scheduled_sender& ss) : parent(ss) {} + void operator()() { parent.event_loop->inject(parent.do_cancel); } }; - struct cancel_fn : public proton::void_function0 { + struct defer_tick_fn : public proton::void_function0 { scheduled_sender& parent; - cancel_fn(scheduled_sender& ss) : parent(ss) {} - void operator()() { parent.cancel(); } + defer_tick_fn(scheduled_sender& ss) : parent(ss) {} + void operator()() { parent.event_loop->inject(parent.do_tick); } }; tick_fn do_tick; cancel_fn do_cancel; + defer_tick_fn defer_tick; + defer_cancel_fn defer_cancel; public: @@ -68,37 +87,44 @@ class scheduled_sender : public proton::messaging_handler { timeout(int(t*proton::duration::SECOND.milliseconds())), // Cancel after timeout. ready(true), // Ready to send. canceled(false), // Canceled. - do_tick(*this), - do_cancel(*this) + defer_tick(*this), + defer_cancel(*this) {} void on_container_start(proton::container &c) OVERRIDE { - sender = c.open_sender(url); - c.schedule(timeout, do_cancel); // Call this->cancel after timeout. - c.schedule(interval, do_tick); // Start regular ticks every interval. + c.open_sender(url); + } + + void on_sender_open(proton::sender & s) OVERRIDE { + event_loop = &proton::make_thread_safe(s).get()->event_loop(); + + do_cancel = cancel_fn(*this, s); + do_tick = tick_fn(*this, s); + s.container().schedule(timeout, defer_cancel); // Call this->cancel after timeout. + s.container().schedule(interval, defer_tick); // Start regular ticks every interval. } - void cancel() { + void cancel(proton::sender& sender) { canceled = true; sender.connection().close(); } - void tick() { + void tick(proton::sender& sender) { if (!canceled) { - sender.container().schedule(interval, do_tick); // Next tick + sender.container().schedule(interval, defer_tick); // Next tick if (sender.credit() > 0) // Only send if we have credit - send(); + send(sender); else ready = true; // Set the ready flag, send as soon as we get credit. } } - void on_sendable(proton::sender &) OVERRIDE { + void on_sendable(proton::sender &sender) OVERRIDE { if (ready) // We have been ticked since the last send. - send(); + send(sender); } - void send() { + void send(proton::sender& sender) { std::cout << "send" << std::endl; sender.send(proton::message("ping")); ready = false; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
