PROTON-1847: [C++ binding] Ensure that excessive scheduled events can't starve other events
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9fd19bcf Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9fd19bcf Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9fd19bcf Branch: refs/heads/go1 Commit: 9fd19bcf960dacc046b2f6867c538149eceb8e5c Parents: 1764c4d Author: Andrew Stitcher <astitc...@apache.org> Authored: Wed May 16 20:15:08 2018 -0400 Committer: Andrew Stitcher <astitc...@apache.org> Committed: Wed May 16 20:17:31 2018 -0400 ---------------------------------------------------------------------- cpp/src/proactor_container_impl.cpp | 80 +++++++++++++++++++++----------- cpp/src/proactor_container_impl.hpp | 3 +- 2 files changed, 54 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fd19bcf/cpp/src/proactor_container_impl.cpp ---------------------------------------------------------------------- diff --git a/cpp/src/proactor_container_impl.cpp b/cpp/src/proactor_container_impl.cpp index f865a16..3a89816 100644 --- a/cpp/src/proactor_container_impl.cpp +++ b/cpp/src/proactor_container_impl.cpp @@ -459,31 +459,54 @@ void container::impl::receiver_options(const proton::receiver_options &opts) { void container::impl::run_timer_jobs() { timestamp now = timestamp::now(); + std::vector<scheduled> tasks; - // Check head of timer queue - for (;;) { - work task; - { - GUARD(deferred_lock_); - if ( deferred_.size()==0 ) return; + // We first extract all the runnable tasks and then run them - this is to avoid having tasks + // injected as we are running them (which could potentially never end) + { + GUARD(deferred_lock_); - timestamp next_time = deferred_.front().time; + // Figure out how many tasks we need to execute and pop them to the back of the + // queue (in reverse order) + int i = 0; + for (;;) { + // Have we seen all the queued tasks? + if ( deferred_.size()-i==0 ) break; - if (next_time>now) { + // Is the next task in the future? + timestamp next_time = deferred_.front().time; + if ( next_time>now ) { pn_proactor_set_timeout(proactor_, (next_time-now).milliseconds()); - return; + break; } - task = deferred_.front().task; - std::pop_heap(deferred_.begin(), deferred_.end()); - deferred_.pop_back(); + std::pop_heap(deferred_.begin(), deferred_.end()-i); + ++i; + } + // Nothing to do + if ( i==0 ) return; + + // Now we know how many tasks to run + if ( deferred_.size()==i ) { + // If we sorted the entire heap, then we're executing every task + // so don't need to copy and can just swap + tasks.swap(deferred_); + } else { + // Otherwise just copy the ones we sorted + tasks = std::vector<scheduled>(deferred_.end()-i, deferred_.end()); + + // Remove tasks to be executed + deferred_.resize(deferred_.size()-i); } - task(); } + // We've now taken the tasks to run from the deferred tasks + // so we can run them unlocked + // NB. We copied the due tasks in reverse order so execute from end + for (int i = tasks.size()-1; i>=0; --i) tasks[i].task(); } // Return true if this thread is finished -bool container::impl::handle(pn_event_t* event) { +container::impl::dispatch_result container::impl::dispatch(pn_event_t* event) { // If we have any pending connection work, do it now pn_connection_t* c = pn_event_connection(event); @@ -498,14 +521,14 @@ bool container::impl::handle(pn_event_t* event) { case PN_PROACTOR_INACTIVE: /* listener and all connections closed */ // If we're stopping interrupt all other threads still running if (auto_stop_) pn_proactor_interrupt(proactor_); - return false; + return ContinueLoop; // We only interrupt to stop threads case PN_PROACTOR_INTERRUPT: { // Interrupt any other threads still running GUARD(lock_); if (threads_>1) pn_proactor_interrupt(proactor_); - return true; + return EndLoop; } case PN_PROACTOR_TIMEOUT: { @@ -523,7 +546,7 @@ bool container::impl::handle(pn_event_t* event) { for (work_queues::iterator queue = queues.begin(); queue!=queues.end(); ++queue) { (*queue)->run_all_jobs(); } - return false; + return EndBatch; } case PN_LISTENER_OPEN: { pn_listener_t* l = pn_event_listener(event); @@ -537,7 +560,7 @@ bool container::impl::handle(pn_event_t* event) { listener lstnr(l); handler->on_open(lstnr); } - return false; + return ContinueLoop; } case PN_LISTENER_ACCEPT: { pn_listener_t* l = pn_event_listener(event); @@ -568,7 +591,7 @@ bool container::impl::handle(pn_event_t* event) { pn_transport_set_server(pnt); opts.apply_unbound_server(pnt); pn_listener_accept2(l, c, pnt); - return false; + return ContinueLoop; } case PN_LISTENER_CLOSE: { pn_listener_t* l = pn_event_listener(event); @@ -586,15 +609,15 @@ bool container::impl::handle(pn_event_t* event) { } handler->on_close(lstnr); } - return false; + return ContinueLoop; } // Connection driver will bind a new transport to the connection at this point case PN_CONNECTION_INIT: - return false; + return ContinueLoop; // We've already applied options, so don't need to do it here case PN_CONNECTION_BOUND: - return false; + return ContinueLoop; case PN_CONNECTION_REMOTE_OPEN: { // This is the only event that we get indicating that the connection succeeded so @@ -618,7 +641,7 @@ bool container::impl::handle(pn_event_t* event) { pn_condition_t* tc = pn_transport_condition(t); pn_condition_copy(tc, cc); pn_connection_close(c); - return false; + return ContinueLoop; } break; } @@ -628,7 +651,7 @@ bool container::impl::handle(pn_event_t* event) { pn_transport_t* t = pn_event_transport(event); // If we successfully schedule a re-connect then hide the event from // user handlers by returning here. - if (pn_condition_is_set(pn_transport_condition(t)) && setup_reconnect(c)) return false; + if (pn_condition_is_set(pn_transport_condition(t)) && setup_reconnect(c)) return ContinueLoop; // Otherwise, this connection will be freed by the proactor. // Mark its work_queue finished so it won't try to use the freed connection. connection_context::get(c).work_queue_.impl_.get()->finished(); @@ -657,10 +680,10 @@ bool container::impl::handle(pn_event_t* event) { // If we still have no handler don't do anything! // This is pretty unusual, but possible if we use the default constructor for container - if (!mh) return false; + if (!mh) return ContinueLoop; messaging_adapter::dispatch(*mh, event); - return false; + return ContinueLoop; } void container::impl::thread() { @@ -676,8 +699,9 @@ void container::impl::thread() { error_condition error; try { while ((e = pn_event_batch_next(events))) { - finished = handle(e); - if (finished) break; + dispatch_result r = dispatch(e); + finished = r==EndLoop; + if (r!=ContinueLoop) break; } } catch (const std::exception& e) { // If we caught an exception then shutdown the (other threads of the) container http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fd19bcf/cpp/src/proactor_container_impl.hpp ---------------------------------------------------------------------- diff --git a/cpp/src/proactor_container_impl.hpp b/cpp/src/proactor_container_impl.hpp index 43b695f..dcc9381 100644 --- a/cpp/src/proactor_container_impl.hpp +++ b/cpp/src/proactor_container_impl.hpp @@ -110,7 +110,8 @@ class container::impl { // Event loop to run in each container thread void thread(); - bool handle(pn_event_t*); + enum dispatch_result {ContinueLoop, EndBatch, EndLoop}; + dispatch_result dispatch(pn_event_t*); void run_timer_jobs(); int threads_; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org