Repository: qpid-proton Updated Branches: refs/heads/master 6df8ad351 -> 95d04400d
re-factored select as processing of the quesced event Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/95d04400 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/95d04400 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/95d04400 Branch: refs/heads/master Commit: 95d04400db75759a92a8618971f0b8b52070427e Parents: 255946d Author: Rafael Schloming <[email protected]> Authored: Wed Jan 21 09:03:19 2015 -0500 Committer: Rafael Schloming <[email protected]> Committed: Wed Jan 21 09:03:44 2015 -0500 ---------------------------------------------------------------------- proton-c/src/reactor/reactor.c | 116 ++++++++++++++++++------------------ proton-c/src/tests/reactor.c | 18 +++--- 2 files changed, 67 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/95d04400/proton-c/src/reactor/reactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c index a4f4b31..4f90865 100644 --- a/proton-c/src/reactor/reactor.c +++ b/proton-c/src/reactor/reactor.c @@ -47,7 +47,8 @@ struct pn_reactor_t { pn_selectable_t *selectable; pn_event_type_t previous; pn_timestamp_t now; - bool selected; + int selectables; + int timeout; }; static void pn_reactor_mark(pn_reactor_t *reactor) { @@ -55,24 +56,18 @@ static void pn_reactor_mark(pn_reactor_t *reactor) { reactor->now = pn_i_now(); } -static void pn_dummy_dispatch(pn_handler_t *handler, pn_event_t *event) { - /*pn_string_t *str = pn_string(NULL); - pn_inspect(event, str); - printf("%s\n", pn_string_get(str)); - pn_free(str);*/ -} - static void pn_reactor_initialize(pn_reactor_t *reactor) { reactor->attachments = pn_record(); reactor->io = pn_io(); reactor->selector = pn_io_selector(reactor->io); reactor->collector = pn_collector(); - reactor->handler = pn_handler(pn_dummy_dispatch); + reactor->handler = pn_handler(NULL); reactor->children = pn_list(PN_OBJECT, 0); reactor->timer = pn_timer(reactor->collector); reactor->selectable = NULL; reactor->previous = PN_EVENT_NONE; - reactor->selected = false; + reactor->selectables = 0; + reactor->timeout = 0; pn_reactor_mark(reactor); } @@ -115,11 +110,6 @@ pn_handler_t *pn_reactor_handler(pn_reactor_t *reactor) { return reactor->handler; } -pn_selector_t *pn_reactor_selector(pn_reactor_t *reactor) { - assert(reactor); - return reactor->selector; -} - pn_io_t *pn_reactor_io(pn_reactor_t *reactor) { assert(reactor); return reactor->io; @@ -139,6 +129,7 @@ static void pni_selectable_release(pn_selectable_t *selectable) { pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(selectable); pn_collector_put(reactor->collector, PN_OBJECT, selectable, PN_SELECTABLE_FINAL); pn_list_remove(reactor->children, selectable); + reactor->selectables--; } pn_selectable_t *pn_reactor_selectable(pn_reactor_t *reactor) { @@ -150,6 +141,7 @@ pn_selectable_t *pn_reactor_selectable(pn_reactor_t *reactor) { pn_list_add(reactor->children, sel); pn_selectable_on_release(sel, pni_selectable_release); pn_decref(sel); + reactor->selectables++; return sel; } @@ -158,6 +150,24 @@ void pn_reactor_update(pn_reactor_t *reactor, pn_selectable_t *selectable) { pn_collector_put(reactor->collector, PN_OBJECT, selectable, PN_SELECTABLE_UPDATED); } +void pni_handle_quiesced(pn_reactor_t *reactor) { + pn_selector_select(reactor->selector, reactor->timeout); + pn_selectable_t *sel; + int events; + pn_reactor_mark(reactor); + while ((sel = pn_selector_next(reactor->selector, &events))) { + if (events & PN_READABLE) { + pn_selectable_readable(sel); + } + if (events & PN_WRITABLE) { + pn_selectable_writable(sel); + } + if (events & PN_EXPIRED) { + pn_selectable_expired(sel); + } + } +} + void pni_handle_transport(pn_reactor_t *reactor, pn_event_t *event); void pni_handle_open(pn_reactor_t *reactor, pn_event_t *event); void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event); @@ -204,6 +214,9 @@ static void pni_reactor_dispatch_post(pn_reactor_t *reactor, pn_event_t *event) case PN_CONNECTION_FINAL: pni_handle_final(reactor, event); break; + case PN_REACTOR_QUIESCED: + pni_handle_quiesced(reactor); + break; default: break; } @@ -319,11 +332,24 @@ pn_task_t *pn_reactor_schedule(pn_reactor_t *reactor, int delay, pn_handler_t *h return task; } -void pn_reactor_process(pn_reactor_t *reactor) { +void pni_event_print(pn_event_t *event) { + pn_string_t *str = pn_string(NULL); + pn_inspect(event, str); + printf("%s\n", pn_string_get(str)); + pn_free(str); +} + +bool pni_reactor_more(pn_reactor_t *reactor) { + assert(reactor); + return pn_timer_tasks(reactor->timer) || reactor->selectables > 1; +} + +bool pn_reactor_process(pn_reactor_t *reactor) { assert(reactor); pn_reactor_mark(reactor); while (true) { pn_event_t *event = pn_collector_peek(reactor->collector); + // pni_event_print(event); if (event) { pni_reactor_dispatch_pre(reactor, event); pn_handler_t *handler = pn_event_handler(event, reactor->handler); @@ -331,10 +357,22 @@ void pn_reactor_process(pn_reactor_t *reactor) { pni_reactor_dispatch_post(reactor, event); reactor->previous = pn_event_type(event); pn_collector_pop(reactor->collector); - } else if (reactor->previous != PN_REACTOR_QUIESCED && reactor->previous != PN_REACTOR_FINAL) { - pn_collector_put(reactor->collector, PN_OBJECT, reactor, PN_REACTOR_QUIESCED); } else { - break; + if (pni_reactor_more(reactor)) { + if (reactor->previous != PN_REACTOR_QUIESCED && reactor->previous != PN_REACTOR_FINAL) { + pn_collector_put(reactor->collector, PN_OBJECT, reactor, PN_REACTOR_QUIESCED); + } else { + return true; + } + } else { + if (reactor->selectable) { + pn_selectable_terminate(reactor->selectable); + pn_reactor_update(reactor, reactor->selectable); + reactor->selectable = NULL; + } else { + return false; + } + } } } } @@ -360,44 +398,8 @@ void pn_reactor_start(pn_reactor_t *reactor) { bool pn_reactor_work(pn_reactor_t *reactor, int timeout) { assert(reactor); - pn_reactor_process(reactor); - - if (pn_selector_size(reactor->selector) == 1) { - if (reactor->selected) { - if (!pn_timer_tasks(reactor->timer) && reactor->selectable) { - pn_selectable_terminate(reactor->selectable); - pn_reactor_update(reactor, reactor->selectable); - reactor->selectable = NULL; - return true; - } - } else { - timeout = 0; - } - } - - if (!pn_selector_size(reactor->selector)) { - return false; - } - - pn_selector_select(reactor->selector, timeout); - pn_selectable_t *sel; - int events; - pn_reactor_mark(reactor); - while ((sel = pn_selector_next(reactor->selector, &events))) { - if (events & PN_READABLE) { - pn_selectable_readable(sel); - } - if (events & PN_WRITABLE) { - pn_selectable_writable(sel); - } - if (events & PN_EXPIRED) { - pn_selectable_expired(sel); - } - } - - reactor->selected = true; - - return true; + reactor->timeout = timeout; + return pn_reactor_process(reactor); } void pn_reactor_stop(pn_reactor_t *reactor) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/95d04400/proton-c/src/tests/reactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c index 1365f88..ff83fa7 100644 --- a/proton-c/src/tests/reactor.c +++ b/proton-c/src/tests/reactor.c @@ -137,8 +137,8 @@ static void test_reactor_handler_run(void) { pn_handler_t *th = test_handler(reactor, events); pn_handler_add(handler, th); pn_reactor_run(reactor); - expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, PN_REACTOR_QUIESCED, - PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_QUIESCED, PN_REACTOR_FINAL, END); + expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, PN_SELECTABLE_UPDATED, + PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END); pn_free(reactor); pn_free(th); pn_free(events); @@ -152,8 +152,8 @@ static void test_reactor_handler_run_free(void) { pn_list_t *events = pn_list(PN_VOID, 0); pn_handler_add(handler, test_handler(reactor, events)); pn_reactor_run(reactor); - expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, PN_REACTOR_QUIESCED, - PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_QUIESCED, PN_REACTOR_FINAL, END); + expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, PN_SELECTABLE_UPDATED, + PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END); pn_reactor_free(reactor); pn_free(events); } @@ -169,8 +169,8 @@ static void test_reactor_connection(void) { pn_list_t *revents = pn_list(PN_VOID, 0); pn_handler_add(root, test_handler(reactor, revents)); pn_reactor_run(reactor); - expect(revents, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, PN_REACTOR_QUIESCED, - PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_QUIESCED, PN_REACTOR_FINAL, END); + expect(revents, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, PN_SELECTABLE_UPDATED, + PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END); expect(cevents, PN_CONNECTION_INIT, END); pn_reactor_free(reactor); pn_handler_free(tch); @@ -420,8 +420,7 @@ static void test_reactor_schedule(void) { pn_reactor_run(reactor); pn_reactor_free(reactor); expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, PN_REACTOR_QUIESCED, - PN_TIMER_TASK, PN_REACTOR_QUIESCED, PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_QUIESCED, - PN_REACTOR_FINAL, END); + PN_TIMER_TASK, PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END); pn_free(events); } @@ -437,8 +436,7 @@ static void test_reactor_schedule_handler(void) { pn_reactor_free(reactor); pn_handler_free(th); expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, PN_REACTOR_QUIESCED, - PN_REACTOR_QUIESCED, PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_QUIESCED, - PN_REACTOR_FINAL, END); + PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END); expect(tevents, PN_TIMER_TASK, END); pn_free(events); pn_free(tevents); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
