Repository: qpid-proton Updated Branches: refs/heads/master a72320ea0 -> 92b822c10
PROTON-1243: Stop pn_reactor_stop from causing an infinite loop when called from within callback Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/92b822c1 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/92b822c1 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/92b822c1 Branch: refs/heads/master Commit: 92b822c1077f08e68934a630ddef1c17bce24171 Parents: a72320e Author: Andrew Stitcher <[email protected]> Authored: Thu Jul 28 23:49:27 2016 -0400 Committer: Andrew Stitcher <[email protected]> Committed: Wed Aug 3 09:37:11 2016 -0400 ---------------------------------------------------------------------- proton-c/bindings/cpp/src/container_impl.cpp | 1 + proton-c/bindings/cpp/src/container_test.cpp | 72 ++++++++++++++++---- proton-c/bindings/python/proton/reactor.py | 5 +- proton-c/src/reactor/reactor.c | 28 ++++---- .../qpid/proton/reactor/impl/ReactorImpl.java | 11 +-- 5 files changed, 84 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b822c1/proton-c/bindings/cpp/src/container_impl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container_impl.cpp b/proton-c/bindings/cpp/src/container_impl.cpp index 52ae42b..f2cad7f 100644 --- a/proton-c/bindings/cpp/src/container_impl.cpp +++ b/proton-c/bindings/cpp/src/container_impl.cpp @@ -354,6 +354,7 @@ void container_impl::run() { void container_impl::stop(const error_condition&) { reactor_.stop(); + auto_stop_ = true; } void container_impl::auto_stop(bool set) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b822c1/proton-c/bindings/cpp/src/container_test.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container_test.cpp b/proton-c/bindings/cpp/src/container_test.cpp index e6e02a0..1a3e2c4 100644 --- a/proton-c/bindings/cpp/src/container_test.cpp +++ b/proton-c/bindings/cpp/src/container_test.cpp @@ -40,6 +40,22 @@ static std::string int2string(int n) { return strm.str(); } +int listen_on_random_port(proton::container& c, proton::listener& l) { + int port; + // I'm going to hell for this: + srand((unsigned int)time(0)); + while (true) { + port = 20000 + (rand() % 30000); + try { + l = c.listen("0.0.0.0:" + int2string(port)); + break; + } catch (...) { + // keep trying + } + } + return port; +} + class test_handler : public proton::messaging_handler { public: const std::string host; @@ -55,19 +71,7 @@ class test_handler : public proton::messaging_handler { {} void on_container_start(proton::container &c) PN_CPP_OVERRIDE { - int port; - - // I'm going to hell for this: - srand((unsigned int)time(0)); - while (true) { - port = 20000 + (rand() % 30000); - try { - listener = c.listen("0.0.0.0:" + int2string(port)); - break; - } catch (...) { - // keep trying - } - } + int port = listen_on_random_port(c, listener); proton::connection conn = c.connect(host + ":" + int2string(port), opts); } @@ -143,6 +147,47 @@ int test_container_bad_address() { return 0; } +class stop_tester : public proton::messaging_handler { + proton::listener listener; + + // Set up a listener which would block forever + void on_container_start(proton::container& c) PN_CPP_OVERRIDE { + ASSERT(state==0); + int port = listen_on_random_port(c, listener); + c.connect("127.0.0.1:" + int2string(port)); + c.auto_stop(false); + state = 1; + } + + // Get here twice - once for listener, once for connector + void on_connection_open(proton::connection &c) PN_CPP_OVERRIDE { + c.close(); + state++; + } + + void on_connection_close(proton::connection &c) PN_CPP_OVERRIDE { + ASSERT(state==3); + c.container().stop(); + state = 4; + } + void on_container_stop(proton::container & ) PN_CPP_OVERRIDE { + ASSERT(state==4); + state = 5; + } + +public: + stop_tester(): state(0) {} + + int state; +}; + +int test_container_stop() { + stop_tester t; + proton::default_container(t).run(); + ASSERT(t.state==5); + return 0; +} + } int main(int, char**) { @@ -151,6 +196,7 @@ int main(int, char**) { RUN_TEST(failed, test_container_default_vhost()); RUN_TEST(failed, test_container_no_vhost()); RUN_TEST(failed, test_container_bad_address()); + RUN_TEST(failed, test_container_stop()); return failed; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b822c1/proton-c/bindings/python/proton/reactor.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/reactor.py b/proton-c/bindings/python/proton/reactor.py index 202820c..ee9cfde 100644 --- a/proton-c/bindings/python/proton/reactor.py +++ b/proton-c/bindings/python/proton/reactor.py @@ -132,6 +132,9 @@ class Reactor(Wrapper): self.start() while self.process(): pass self.stop() + self.process() + self.global_handler = None + self.handler = None def wakeup(self): n = pn_reactor_wakeup(self._impl) @@ -159,8 +162,6 @@ class Reactor(Wrapper): def stop(self): pn_reactor_stop(self._impl) self._check_errors() - self.global_handler = None - self.handler = None def schedule(self, delay, task): impl = _chandler(task, self.on_error) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b822c1/proton-c/src/reactor/reactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c index 3397e40..31cce08 100644 --- a/proton-c/src/reactor/reactor.c +++ b/proton-c/src/reactor/reactor.c @@ -51,6 +51,7 @@ struct pn_reactor_t { int selectables; int timeout; bool yield; + bool stop; }; pn_timestamp_t pn_reactor_mark(pn_reactor_t *reactor) { @@ -79,6 +80,7 @@ static void pn_reactor_initialize(pn_reactor_t *reactor) { reactor->selectables = 0; reactor->timeout = 0; reactor->yield = false; + reactor->stop = false; pn_reactor_mark(reactor); } @@ -406,14 +408,21 @@ bool pn_reactor_process(pn_reactor_t *reactor) { previous = reactor->previous = type; pn_decref(event); pn_collector_pop(reactor->collector); - } else if (pni_reactor_more(reactor)) { + } else if (!reactor->stop && pni_reactor_more(reactor)) { if (previous != PN_REACTOR_QUIESCED && reactor->previous != PN_REACTOR_FINAL) { pn_collector_put(reactor->collector, PN_OBJECT, reactor, PN_REACTOR_QUIESCED); } else { return true; } } else { - return false; + if (reactor->selectable) { + pn_selectable_terminate(reactor->selectable); + pn_reactor_update(reactor, reactor->selectable); + reactor->selectable = NULL; + } else { + pn_collector_put(reactor->collector, PN_OBJECT, reactor, PN_REACTOR_FINAL); + return false; + } } } } @@ -458,19 +467,11 @@ void pn_reactor_start(pn_reactor_t *reactor) { assert(reactor); pn_collector_put(reactor->collector, PN_OBJECT, reactor, PN_REACTOR_INIT); reactor->selectable = pni_timer_selectable(reactor); - } +} void pn_reactor_stop(pn_reactor_t *reactor) { assert(reactor); - if (reactor->selectable) { - pn_selectable_terminate(reactor->selectable); - pn_reactor_update(reactor, reactor->selectable); - reactor->selectable = NULL; - } - pn_collector_put(reactor->collector, PN_OBJECT, reactor, PN_REACTOR_FINAL); - // XXX: should consider removing this from stop to avoid reentrance - pn_reactor_process(reactor); - pn_collector_release(reactor->collector); + reactor->stop = true; } void pn_reactor_run(pn_reactor_t *reactor) { @@ -478,5 +479,6 @@ void pn_reactor_run(pn_reactor_t *reactor) { pn_reactor_set_timeout(reactor, 3141); pn_reactor_start(reactor); while (pn_reactor_process(reactor)) {} - pn_reactor_stop(reactor); + pn_reactor_process(reactor); + pn_collector_release(reactor->collector); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b822c1/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java index 5949ae8..7448648 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java @@ -64,6 +64,7 @@ public class ReactorImpl implements Reactor, Extendable { private Set<ReactorChild> children; private int selectables; private boolean yield; + private boolean stop; private Selectable selectable; private EventType previous; private Timer timer; @@ -283,7 +284,7 @@ public class ReactorImpl implements Reactor, Extendable { collector.pop(); } else { - if (more()) { + if (!stop && more()) { if (previous != Type.REACTOR_QUIESCED && this.previous != Type.REACTOR_FINAL) { collector.put(Type.REACTOR_QUIESCED, this); } else { @@ -295,6 +296,7 @@ public class ReactorImpl implements Reactor, Extendable { update(selectable); selectable = null; } else { + collector.put(Type.REACTOR_FINAL, this); return false; } } @@ -326,10 +328,7 @@ public class ReactorImpl implements Reactor, Extendable { @Override public void stop() throws HandlerException { - collector.put(Type.REACTOR_FINAL, this); - // (Comment from C code) XXX: should consider removing this from stop to avoid reentrance - process(); - collector = null; + stop = true; } private boolean more() { @@ -342,6 +341,8 @@ public class ReactorImpl implements Reactor, Extendable { start(); while(process()) {} stop(); + process(); + collector = null; } // pn_reactor_schedule from reactor.c --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
