Repository: qpid-proton Updated Branches: refs/heads/master 95d04400d -> f023c63b3
factored I/O code out into a separate handler and added bindings to allow pure python I/O Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/f023c63b Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/f023c63b Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/f023c63b Branch: refs/heads/master Commit: f023c63b3af26b27506fd2219811946e65a03e44 Parents: 95d0440 Author: Rafael Schloming <[email protected]> Authored: Wed Jan 21 12:19:31 2015 -0500 Committer: Rafael Schloming <[email protected]> Committed: Wed Jan 21 12:19:31 2015 -0500 ---------------------------------------------------------------------- proton-c/CMakeLists.txt | 1 + proton-c/bindings/python/proton/__init__.py | 25 +++--- proton-c/bindings/python/proton/reactors.py | 40 ++++++++-- proton-c/include/proton/handlers.h | 2 + proton-c/include/proton/reactor.h | 4 + proton-c/src/handlers/iohandler.c | 96 ++++++++++++++++++++++++ proton-c/src/reactor/reactor.c | 94 ++++++++++------------- proton-c/src/tests/reactor.c | 5 +- 8 files changed, 193 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt index 6a7feed..3dd5b86 100644 --- a/proton-c/CMakeLists.txt +++ b/proton-c/CMakeLists.txt @@ -311,6 +311,7 @@ set (qpid-proton-core src/reactor/timer.c src/handlers/handshaker.c + src/handlers/iohandler.c src/handlers/flowcontroller.c src/messenger/messenger.c http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/bindings/python/proton/__init__.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py index 22ffa6d..17cc670 100644 --- a/proton-c/bindings/python/proton/__init__.py +++ b/proton-c/bindings/python/proton/__init__.py @@ -33,7 +33,7 @@ The proton APIs consist of the following classes: from cproton import * from wrapper import Wrapper -import weakref, re, socket +import weakref, re, socket, sys try: import uuid except ImportError: @@ -3448,15 +3448,22 @@ class Handler(object): class _cadapter: - def __init__(self, handler): + def __init__(self, handler, errors=None): self.handler = handler + self.errors = errors def __call__(self, cevent): ev = Event.wrap(cevent) - ev.dispatch(self.handler) - if hasattr(self.handler, "handlers"): - for h in self.handler.handlers: - ev.dispatch(h) + try: + ev.dispatch(self.handler) + if hasattr(self.handler, "handlers"): + for h in self.handler.handlers: + ev.dispatch(h) + except: + if self.errors is None: + raise + else: + self.errors.append(sys.exc_info()) class WrappedHandler(Wrapper): @@ -3464,11 +3471,11 @@ class WrappedHandler(Wrapper): Wrapper.__init__(self, impl_or_constructor) def add(self, handler): - impl = _chandler(handler) + impl = _chandler(handler, getattr(self, "errors", None)) pn_handler_add(self._impl, impl) pn_decref(impl) -def _chandler(obj): +def _chandler(obj, errors=None): if obj is None: return None elif isinstance(obj, WrappedHandler): @@ -3476,7 +3483,7 @@ def _chandler(obj): pn_incref(impl) return impl else: - return pn_pyhandler(_cadapter(obj)) + return pn_pyhandler(_cadapter(obj, errors)) ### # Driver http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/bindings/python/proton/reactors.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py index 612c38b..493af71 100644 --- a/proton-c/bindings/python/proton/reactors.py +++ b/proton-c/bindings/python/proton/reactors.py @@ -835,7 +835,8 @@ class Container(object): def do_work(self, timeout=None): return self.loop.do_work(timeout) -from proton import WrappedHandler, _chandler, Connection, secs2millis, Selectable +import traceback +from proton import WrappedHandler, _chandler, Connection, secs2millis, millis2secs, Selectable from wrapper import Wrapper from cproton import * @@ -862,6 +863,11 @@ class Acceptor(Wrapper): def close(self): pn_acceptor_close(self._impl) +def _wrap_handler(reactor, impl): + wrapped = WrappedHandler(impl) + wrapped.__dict__["errors"] = reactor.errors + return wrapped + class Reactor(Wrapper): @staticmethod @@ -877,31 +883,51 @@ class Reactor(Wrapper): self.handler.add(h) def _init(self): - pass + self.errors = [] + + def global_(self, handler): + impl = _chandler(handler, self.errors) + pn_reactor_global(self._impl, impl) + pn_decref(impl) + + @property + def timeout(self): + return millis2secs(pn_reactor_timeout(self._impl)) + + def yield_(self): + pn_reactor_yield(self._impl) + + def mark(self): + pn_reactor_mark(self._impl) @property def handler(self): - return WrappedHandler(pn_reactor_handler(self._impl)) + return _wrap_handler(self, pn_reactor_handler(self._impl)) def run(self): pn_reactor_start(self._impl) - while pn_reactor_work(self._impl, 3142): pass + while pn_reactor_work(self._impl, 3142): + if self.errors: + for exc, value, tb in self.errors[:-1]: + traceback.print_exception(exc, value, tb) + exc, value, tb = self.errors[-1] + raise exc, value, tb pn_reactor_stop(self._impl) def schedule(self, delay, task): - impl = _chandler(task) + impl = _chandler(task, self.errors) task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl)) pn_decref(impl) return task def acceptor(self, host, port, handler=None): - impl = _chandler(handler) + impl = _chandler(handler, self.errors) result = Acceptor(pn_reactor_acceptor(self._impl, host, port, impl)) pn_decref(impl) return result def connection(self, handler=None): - impl = _chandler(handler) + impl = _chandler(handler, self.errors) result = Connection.wrap(pn_reactor_connection(self._impl, impl)) pn_decref(impl) return result http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/include/proton/handlers.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/handlers.h b/proton-c/include/proton/handlers.h index 99242d1..304d0e6 100644 --- a/proton-c/include/proton/handlers.h +++ b/proton-c/include/proton/handlers.h @@ -41,9 +41,11 @@ extern "C" { */ typedef pn_handler_t pn_handshaker_t; +typedef pn_handler_t pn_iohandler_t; typedef pn_handler_t pn_flowcontroller_t; PN_EXTERN pn_handshaker_t *pn_handshaker(void); +PN_EXTERN pn_iohandler_t *pn_iohandler(void); PN_EXTERN pn_flowcontroller_t *pn_flowcontroller(int window); /** @} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/include/proton/reactor.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/reactor.h b/proton-c/include/proton/reactor.h index e04ba6d..8447355 100644 --- a/proton-c/include/proton/reactor.h +++ b/proton-c/include/proton/reactor.h @@ -57,8 +57,12 @@ PN_EXTERN void pn_handler_dispatch(pn_handler_t *handler, pn_event_t *event); PN_EXTERN pn_reactor_t *pn_reactor(void); PN_EXTERN pn_record_t *pn_reactor_attachments(pn_reactor_t *reactor); +PN_EXTERN int pn_reactor_timeout(pn_reactor_t *reactor); +PN_EXTERN void pn_reactor_mark(pn_reactor_t *reactor); +PN_EXTERN void pn_reactor_yield(pn_reactor_t *reactor); PN_EXTERN void pn_reactor_free(pn_reactor_t *reactor); PN_EXTERN pn_collector_t *pn_reactor_collector(pn_reactor_t *reactor); +PN_EXTERN void pn_reactor_global(pn_reactor_t *reactor, pn_handler_t *handler); PN_EXTERN pn_handler_t *pn_reactor_handler(pn_reactor_t *reactor); PN_EXTERN pn_io_t *pn_reactor_io(pn_reactor_t *reactor); PN_EXTERN pn_list_t *pn_reactor_children(pn_reactor_t *reactor); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/src/handlers/iohandler.c ---------------------------------------------------------------------- diff --git a/proton-c/src/handlers/iohandler.c b/proton-c/src/handlers/iohandler.c new file mode 100644 index 0000000..02e0d0b --- /dev/null +++ b/proton-c/src/handlers/iohandler.c @@ -0,0 +1,96 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/handlers.h> +#include <proton/selector.h> +#include <assert.h> + +static void *pni_selector_handle = NULL; + +#define PN_SELECTOR ((pn_handle_t) &pni_selector_handle) + +void pni_handle_quiesced(pn_reactor_t *reactor, pn_selector_t *selector) { + pn_selector_select(selector, pn_reactor_timeout(reactor)); + pn_selectable_t *sel; + int events; + pn_reactor_mark(reactor); + while ((sel = pn_selector_next(selector, &events))) { + if (events & PN_READABLE) { + pn_selectable_readable(sel); + } + if (events & PN_WRITABLE) { + pn_selectable_writable(sel); + } + if (events & PN_EXPIRED) { + pn_selectable_expired(sel); + } + } + pn_reactor_yield(reactor); +} + +void pni_handle_transport(pn_reactor_t *reactor, pn_event_t *event); +void pni_handle_open(pn_reactor_t *reactor, pn_event_t *event); + +static void pn_iodispatch(pn_iohandler_t *handler, pn_event_t *event) { + pn_reactor_t *reactor = pn_event_reactor(event); + pn_record_t *record = pn_reactor_attachments(reactor); + pn_selector_t *selector = (pn_selector_t *) pn_record_get(record, PN_SELECTOR); + if (!selector) { + selector = pn_io_selector(pn_reactor_io(reactor)); + pn_record_def(record, PN_SELECTOR, PN_OBJECT); + pn_record_set(record, PN_SELECTOR, selector); + pn_decref(selector); + } + switch (pn_event_type(event)) { + case PN_SELECTABLE_INIT: + { + pn_selectable_t *sel = (pn_selectable_t *) pn_event_context(event); + pn_selector_add(selector, sel); + } + break; + case PN_SELECTABLE_UPDATED: + { + pn_selectable_t *sel = (pn_selectable_t *) pn_event_context(event); + if (pn_selectable_is_terminal(sel)) { + pn_selector_remove(selector, sel); + pn_selectable_release(sel); + } else { + pn_selector_update(selector, sel); + } + } + break; + case PN_CONNECTION_LOCAL_OPEN: + pni_handle_open(reactor, event); + break; + case PN_TRANSPORT: + pni_handle_transport(reactor, event); + break; + case PN_REACTOR_QUIESCED: + pni_handle_quiesced(reactor, selector); + break; + default: + break; + } +} + +pn_iohandler_t *pn_iohandler(void) { + return pn_handler(pn_iodispatch); +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/src/reactor/reactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c index 4f90865..536bb11 100644 --- a/proton-c/src/reactor/reactor.c +++ b/proton-c/src/reactor/reactor.c @@ -20,8 +20,8 @@ */ #include <proton/object.h> +#include <proton/handlers.h> #include <proton/io.h> -#include <proton/selector.h> #include <proton/event.h> #include <proton/transport.h> #include <proton/connection.h> @@ -39,8 +39,8 @@ struct pn_reactor_t { pn_record_t *attachments; pn_io_t *io; - pn_selector_t *selector; pn_collector_t *collector; + pn_handler_t *global; pn_handler_t *handler; pn_list_t *children; pn_timer_t *timer; @@ -49,9 +49,10 @@ struct pn_reactor_t { pn_timestamp_t now; int selectables; int timeout; + bool yield; }; -static void pn_reactor_mark(pn_reactor_t *reactor) { +void pn_reactor_mark(pn_reactor_t *reactor) { assert(reactor); reactor->now = pn_i_now(); } @@ -59,8 +60,8 @@ static void pn_reactor_mark(pn_reactor_t *reactor) { static void pn_reactor_initialize(pn_reactor_t *reactor) { reactor->attachments = pn_record(); reactor->io = pn_io(); - reactor->selector = pn_io_selector(reactor->io); reactor->collector = pn_collector(); + reactor->global = pn_iohandler(); reactor->handler = pn_handler(NULL); reactor->children = pn_list(PN_OBJECT, 0); reactor->timer = pn_timer(reactor->collector); @@ -68,16 +69,17 @@ static void pn_reactor_initialize(pn_reactor_t *reactor) { reactor->previous = PN_EVENT_NONE; reactor->selectables = 0; reactor->timeout = 0; + reactor->yield = false; pn_reactor_mark(reactor); } static void pn_reactor_finalize(pn_reactor_t *reactor) { pn_decref(reactor->attachments); pn_decref(reactor->collector); + pn_decref(reactor->global); pn_decref(reactor->handler); pn_decref(reactor->children); pn_decref(reactor->timer); - pn_decref(reactor->selector); pn_decref(reactor->io); } @@ -96,6 +98,11 @@ pn_record_t *pn_reactor_attachments(pn_reactor_t *reactor) { return reactor->attachments; } +int pn_reactor_timeout(pn_reactor_t *reactor) { + assert(reactor); + return reactor->timeout; +} + void pn_reactor_free(pn_reactor_t *reactor) { if (reactor) { pn_collector_release(reactor->collector); @@ -105,6 +112,13 @@ void pn_reactor_free(pn_reactor_t *reactor) { } } +void pn_reactor_global(pn_reactor_t *reactor, pn_handler_t *handler) { + assert(reactor); + pn_decref(reactor->global); + reactor->global = handler; + pn_incref(reactor->global); +} + pn_handler_t *pn_reactor_handler(pn_reactor_t *reactor) { assert(reactor); return reactor->handler; @@ -150,28 +164,6 @@ void pn_reactor_update(pn_reactor_t *reactor, pn_selectable_t *selectable) { pn_collector_put(reactor->collector, PN_OBJECT, selectable, PN_SELECTABLE_UPDATED); } -void pni_handle_quiesced(pn_reactor_t *reactor) { - pn_selector_select(reactor->selector, reactor->timeout); - pn_selectable_t *sel; - int events; - pn_reactor_mark(reactor); - while ((sel = pn_selector_next(reactor->selector, &events))) { - if (events & PN_READABLE) { - pn_selectable_readable(sel); - } - if (events & PN_WRITABLE) { - pn_selectable_writable(sel); - } - if (events & PN_EXPIRED) { - pn_selectable_expired(sel); - } - } -} - -void pni_handle_transport(pn_reactor_t *reactor, pn_event_t *event); -void pni_handle_open(pn_reactor_t *reactor, pn_event_t *event); -void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event); - static void pni_reactor_dispatch_pre(pn_reactor_t *reactor, pn_event_t *event) { assert(reactor); assert(event); @@ -184,39 +176,15 @@ static void pni_reactor_dispatch_pre(pn_reactor_t *reactor, pn_event_t *event) { } } +void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event); + static void pni_reactor_dispatch_post(pn_reactor_t *reactor, pn_event_t *event) { assert(reactor); assert(event); switch (pn_event_type(event)) { - case PN_SELECTABLE_INIT: - { - pn_selectable_t *sel = (pn_selectable_t *) pn_event_context(event); - pn_selector_add(reactor->selector, sel); - } - break; - case PN_SELECTABLE_UPDATED: - { - pn_selectable_t *sel = (pn_selectable_t *) pn_event_context(event); - if (pn_selectable_is_terminal(sel)) { - pn_selector_remove(reactor->selector, sel); - pn_selectable_release(sel); - } else { - pn_selector_update(reactor->selector, sel); - } - } - break; - case PN_TRANSPORT: - pni_handle_transport(reactor, event); - break; - case PN_CONNECTION_LOCAL_OPEN: - pni_handle_open(reactor, event); - break; case PN_CONNECTION_FINAL: pni_handle_final(reactor, event); break; - case PN_REACTOR_QUIESCED: - pni_handle_quiesced(reactor); - break; default: break; } @@ -344,22 +312,34 @@ bool pni_reactor_more(pn_reactor_t *reactor) { return pn_timer_tasks(reactor->timer) || reactor->selectables > 1; } +void pn_reactor_yield(pn_reactor_t *reactor) { + assert(reactor); + reactor->yield = true; +} + bool pn_reactor_process(pn_reactor_t *reactor) { assert(reactor); pn_reactor_mark(reactor); + pn_event_type_t previous = PN_EVENT_NONE; while (true) { pn_event_t *event = pn_collector_peek(reactor->collector); - // pni_event_print(event); + //pni_event_print(event); if (event) { + if (reactor->yield) { + reactor->yield = false; + return true; + } + reactor->yield = false; pni_reactor_dispatch_pre(reactor, event); pn_handler_t *handler = pn_event_handler(event, reactor->handler); pn_handler_dispatch(handler, event); + pn_handler_dispatch(reactor->global, event); pni_reactor_dispatch_post(reactor, event); - reactor->previous = pn_event_type(event); + previous = reactor->previous = pn_event_type(event); pn_collector_pop(reactor->collector); } else { if (pni_reactor_more(reactor)) { - if (reactor->previous != PN_REACTOR_QUIESCED && reactor->previous != PN_REACTOR_FINAL) { + if (previous != PN_REACTOR_QUIESCED && reactor->previous != PN_REACTOR_FINAL) { pn_collector_put(reactor->collector, PN_OBJECT, reactor, PN_REACTOR_QUIESCED); } else { return true; @@ -380,6 +360,8 @@ bool pn_reactor_process(pn_reactor_t *reactor) { static void pni_timer_expired(pn_selectable_t *sel) { pn_reactor_t *reactor = pni_reactor(sel); pn_timer_tick(reactor->timer, reactor->now); + pn_selectable_set_deadline(sel, pn_timer_deadline(reactor->timer)); + pn_reactor_update(reactor, sel); } pn_selectable_t *pni_timer_selectable(pn_reactor_t *reactor) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f023c63b/proton-c/src/tests/reactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c index ff83fa7..5c30b1e 100644 --- a/proton-c/src/tests/reactor.c +++ b/proton-c/src/tests/reactor.c @@ -420,7 +420,8 @@ static void test_reactor_schedule(void) { pn_reactor_run(reactor); pn_reactor_free(reactor); expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, PN_REACTOR_QUIESCED, - PN_TIMER_TASK, PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END); + PN_TIMER_TASK, PN_SELECTABLE_UPDATED, PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, + END); pn_free(events); } @@ -436,7 +437,7 @@ static void test_reactor_schedule_handler(void) { pn_reactor_free(reactor); pn_handler_free(th); expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, PN_REACTOR_QUIESCED, - PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END); + PN_SELECTABLE_UPDATED, PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END); expect(tevents, PN_TIMER_TASK, END); pn_free(events); pn_free(tevents); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
