added timer events; made tests not use default ports
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/23dc0a61 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/23dc0a61 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/23dc0a61 Branch: refs/heads/master Commit: 23dc0a6188070845d37e82ef673fbdddf0994f34 Parents: e2f5844 Author: Rafael Schloming <[email protected]> Authored: Sun Jan 11 08:43:46 2015 -0500 Committer: Rafael Schloming <[email protected]> Committed: Sun Jan 11 08:43:46 2015 -0500 ---------------------------------------------------------------------- proton-c/CMakeLists.txt | 1 + proton-c/include/proton/cid.h | 2 + proton-c/include/proton/event.h | 5 ++ proton-c/include/proton/reactor.h | 13 ++- proton-c/src/events/event.c | 2 + proton-c/src/reactor/acceptor.c | 9 +-- proton-c/src/reactor/connection.c | 6 +- proton-c/src/reactor/reactor.c | 86 +++++++++++++++++--- proton-c/src/reactor/reactor.h | 30 +++++++ proton-c/src/reactor/timer.c | 144 +++++++++++++++++++++++++++++++++ proton-c/src/selectable.c | 4 +- proton-c/src/tests/reactor.c | 39 ++++++++- 12 files changed, 314 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/23dc0a61/proton-c/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt index 35df99f..6a7feed 100644 --- a/proton-c/CMakeLists.txt +++ b/proton-c/CMakeLists.txt @@ -308,6 +308,7 @@ set (qpid-proton-core src/reactor/handler.c src/reactor/connection.c src/reactor/acceptor.c + src/reactor/timer.c src/handlers/handshaker.c src/handlers/flowcontroller.c http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/23dc0a61/proton-c/include/proton/cid.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/cid.h b/proton-c/include/proton/cid.h index ca6172f..1e4715f 100644 --- a/proton-c/include/proton/cid.h +++ b/proton-c/include/proton/cid.h @@ -49,6 +49,8 @@ typedef enum { CID_pn_reactor, CID_pn_handler, + CID_pn_timer, + CID_pn_task, CID_pn_io, CID_pn_selector, http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/23dc0a61/proton-c/include/proton/event.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h index 28c3313..60bce86 100644 --- a/proton-c/include/proton/event.h +++ b/proton-c/include/proton/event.h @@ -98,6 +98,11 @@ typedef enum { PN_REACTOR_FINAL, /** + * A timer event has occurred. + */ + PN_TIMER, + + /** * The connection has been created. This is the first event that * will ever be issued for a connection. Events of this type point * to the relevant connection. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/23dc0a61/proton-c/include/proton/reactor.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/reactor.h b/proton-c/include/proton/reactor.h index f75e0aa..e5d953f 100644 --- a/proton-c/include/proton/reactor.h +++ b/proton-c/include/proton/reactor.h @@ -44,6 +44,8 @@ extern "C" { typedef struct pn_handler_t pn_handler_t; typedef struct pn_reactor_t pn_reactor_t; typedef struct pn_acceptor_t pn_acceptor_t; +typedef struct pn_timer_t pn_timer_t; +typedef struct pn_task_t pn_task_t; PN_EXTERN pn_handler_t *pn_handler(void (*dispatch)(pn_handler_t *, pn_event_t *)); PN_EXTERN pn_handler_t *pn_handler_new(void (*dispatch)(pn_handler_t *, pn_event_t *), size_t size, @@ -70,11 +72,18 @@ PN_EXTERN void pn_reactor_start(pn_reactor_t *reactor); PN_EXTERN bool pn_reactor_work(pn_reactor_t *reactor, int timeout); PN_EXTERN void pn_reactor_stop(pn_reactor_t *reactor); PN_EXTERN void pn_reactor_run(pn_reactor_t *reactor); +PN_EXTERN pn_task_t *pn_reactor_schedule(pn_reactor_t *reactor, int delay, pn_handler_t *handler); + PN_EXTERN void pn_acceptor_close(pn_reactor_t *reactor, pn_acceptor_t *acceptor); -PN_EXTERN extern void *pni_handler; -#define PN_HANDLER ((pn_handle_t) &pni_handler) +PN_EXTERN pn_timer_t *pn_timer(pn_collector_t *collector); +PN_EXTERN pn_timestamp_t pn_timer_deadline(pn_timer_t *timer); +PN_EXTERN void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now); +PN_EXTERN pn_task_t *pn_timer_schedule(pn_timer_t *timer, pn_timestamp_t deadline); +PN_EXTERN int pn_timer_tasks(pn_timer_t *timer); + +PN_EXTERN pn_record_t *pn_task_attachments(pn_task_t *task); /** @} */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/23dc0a61/proton-c/src/events/event.c ---------------------------------------------------------------------- diff --git a/proton-c/src/events/event.c b/proton-c/src/events/event.c index f90c2cd..2fa4e06 100644 --- a/proton-c/src/events/event.c +++ b/proton-c/src/events/event.c @@ -236,6 +236,8 @@ const char *pn_event_type_name(pn_event_type_t type) return "PN_REACTOR_INIT"; case PN_REACTOR_FINAL: return "PN_REACTOR_FINAL"; + case PN_TIMER: + return "PN_TIMER"; case PN_CONNECTION_INIT: return "PN_CONNECTION_INIT"; case PN_CONNECTION_BOUND: http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/23dc0a61/proton-c/src/reactor/acceptor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/acceptor.c b/proton-c/src/reactor/acceptor.c index 8e49772..889016c 100644 --- a/proton-c/src/reactor/acceptor.c +++ b/proton-c/src/reactor/acceptor.c @@ -20,10 +20,10 @@ */ #include <proton/io.h> -#include <proton/reactor.h> #include <proton/sasl.h> #include <proton/selector.h> #include <proton/transport.h> +#include "reactor.h" #include "selectable.h" static ssize_t pni_acceptor_capacity(pn_selectable_t *sel) { @@ -36,8 +36,7 @@ void pni_acceptor_readable(pn_selectable_t *sel) { pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel); char name[1024]; pn_socket_t sock = pn_accept(pn_reactor_io(reactor), pn_selectable_fd(sel), name, 1024); - pn_record_t *record = pn_selectable_attachments(sel); - pn_handler_t *handler = (pn_handler_t *) pn_record_get(record, PN_HANDLER); + pn_handler_t *handler = pni_record_get_handler(pn_selectable_attachments(sel)); if (!handler) { handler = pn_reactor_handler(reactor); } pn_connection_t *conn = pn_reactor_connection(reactor, handler); pn_transport_t *trans = pn_transport(); @@ -64,9 +63,7 @@ pn_acceptor_t *pn_reactor_acceptor(pn_reactor_t *reactor, const char *host, cons pn_socket_t socket = pn_listen(pn_reactor_io(reactor), host, port); pni_selectable_set_fd(sel, socket); pni_selectable_set_context(sel, reactor); - pn_record_t *record = pn_selectable_attachments(sel); - pn_record_def(record, PN_HANDLER, PN_OBJECT); - pn_record_set(record, PN_HANDLER, handler); + pni_record_init_handler(pn_selectable_attachments(sel), handler); pn_reactor_update(reactor, sel); return (pn_acceptor_t *) sel; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/23dc0a61/proton-c/src/reactor/connection.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/connection.c b/proton-c/src/reactor/connection.c index 7bce15a..ca67cc2 100644 --- a/proton-c/src/reactor/connection.c +++ b/proton-c/src/reactor/connection.c @@ -21,13 +21,13 @@ #include <proton/connection.h> #include <proton/object.h> -#include <proton/reactor.h> #include <proton/sasl.h> #include <proton/transport.h> #include <assert.h> #include <stdio.h> #include <string.h> #include "selectable.h" +#include "reactor.h" // XXX: overloaded for both directions static void *pni_transportctx = NULL; @@ -202,9 +202,7 @@ pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor, pn_socke pn_connection_t *pn_reactor_connection(pn_reactor_t *reactor, pn_handler_t *handler) { assert(reactor); pn_connection_t *connection = pn_connection(); - pn_record_t *record = pn_connection_attachments(connection); - pn_record_def(record, PN_HANDLER, PN_OBJECT); - pn_record_set(record, PN_HANDLER, handler); + pni_record_init_handler(pn_connection_attachments(connection), handler); pn_connection_collect(connection, pn_reactor_collector(reactor)); pn_list_add(pn_reactor_children(reactor), connection); pn_decref(connection); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/23dc0a61/proton-c/src/reactor/reactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c index 141c2a7..8487822 100644 --- a/proton-c/src/reactor/reactor.c +++ b/proton-c/src/reactor/reactor.c @@ -33,6 +33,7 @@ #include <assert.h> #include "selectable.h" +#include "platform.h" struct pn_reactor_t { pn_record_t *attachments; @@ -41,10 +42,11 @@ struct pn_reactor_t { pn_collector_t *collector; pn_handler_t *handler; pn_list_t *children; + pn_selectable_t *timer; + pn_timestamp_t now; + bool selected; }; -void *pni_handler = NULL; - static void pn_dummy_dispatch(pn_handler_t *handler, pn_event_t *event) { /*pn_string_t *str = pn_string(NULL); pn_inspect(event, str); @@ -52,18 +54,19 @@ static void pn_dummy_dispatch(pn_handler_t *handler, pn_event_t *event) { pn_free(str);*/ } -static void pn_reactor_initialize(void *object) { - pn_reactor_t *reactor = (pn_reactor_t *) object; +static void pn_reactor_initialize(pn_reactor_t *reactor) { reactor->attachments = pn_record(); reactor->io = pn_io(); reactor->selector = pn_io_selector(reactor->io); reactor->collector = pn_collector(); reactor->handler = pn_handler(pn_dummy_dispatch); reactor->children = pn_list(PN_OBJECT, 0); + reactor->timer = NULL; + reactor->now = pn_i_now(); + reactor->selected = false; } -static void pn_reactor_finalize(void *object) { - pn_reactor_t *reactor = (pn_reactor_t *) object; +static void pn_reactor_finalize(pn_reactor_t *reactor) { pn_decref(reactor->attachments); pn_decref(reactor->selector); pn_decref(reactor->io); @@ -76,9 +79,42 @@ static void pn_reactor_finalize(void *object) { #define pn_reactor_compare NULL #define pn_reactor_inspect NULL +pn_timer_t *pni_timer(pn_selectable_t *sel) { + pn_record_t *record = pn_selectable_attachments(sel); + return (pn_timer_t *) pn_record_get(record, 0x1); +} + +static pn_timestamp_t pni_timer_deadline(pn_selectable_t *sel) { + pn_timer_t *timer = pni_timer(sel); + return pn_timer_deadline(timer); +} + +static void pni_timer_expired(pn_selectable_t *sel) { + pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel); + pn_timer_t *timer = pni_timer(sel); + pn_timer_tick(timer, reactor->now); +} + +pn_selectable_t *pni_selectable_timer(pn_reactor_t *reactor) { + pn_selectable_t *sel = pn_reactor_selectable(reactor); + pn_selectable_set_deadline(sel, pni_timer_deadline); + pn_selectable_set_expired(sel, pni_timer_expired); + pni_selectable_set_context(sel, reactor); + pn_record_t *record = pn_selectable_attachments(sel); + pn_record_def(record, 0x1, PN_OBJECT); + pn_timer_t *timer = pn_timer(reactor->collector); + pn_record_set(record, 0x1, timer); + pn_decref(timer); + pn_reactor_update(reactor, sel); + return sel; +} + +PN_CLASSDEF(pn_reactor) + pn_reactor_t *pn_reactor() { - static const pn_class_t clazz = PN_CLASS(pn_reactor); - return (pn_reactor_t *) pn_class_new(&clazz, sizeof(pn_reactor_t)); + pn_reactor_t *reactor = pn_reactor_new(); + reactor->timer = pni_selectable_timer(reactor); + return reactor; } pn_record_t *pn_reactor_attachments(pn_reactor_t *reactor) { @@ -154,10 +190,18 @@ static void pni_reactor_dispatch(pn_reactor_t *reactor, pn_event_t *event) { } } +static void *pni_handler = NULL; +#define PN_HANDLER ((pn_handle_t) &pni_handler) + pn_handler_t *pni_record_get_handler(pn_record_t *record) { return (pn_handler_t *) pn_record_get(record, PN_HANDLER); } +void pni_record_init_handler(pn_record_t *record, pn_handler_t *handler) { + pn_record_def(record, PN_HANDLER, PN_OBJECT); + pn_record_set(record, PN_HANDLER, handler); +} + pn_handler_t *pn_event_handler(pn_event_t *event, pn_handler_t *default_handler) { pn_handler_t *handler = NULL; pn_link_t *link = pn_event_link(event); @@ -175,9 +219,21 @@ pn_handler_t *pn_event_handler(pn_event_t *event, pn_handler_t *default_handler) handler = pni_record_get_handler(pn_connection_attachments(connection)); if (handler) { return handler; } } + if (pn_class_id(pn_event_class(event)) == CID_pn_task) { + handler = pni_record_get_handler(pn_task_attachments((pn_task_t *) pn_event_context(event))); + if (handler) { return handler; } + } return default_handler; } +pn_task_t *pn_reactor_schedule(pn_reactor_t *reactor, int delay, pn_handler_t *handler) { + pn_timer_t *timer = pni_timer(reactor->timer); + pn_task_t *task = pn_timer_schedule(timer, reactor->now + delay); + pni_record_init_handler(pn_task_attachments(task), handler); + pn_reactor_update(reactor, reactor->timer); + return task; +} + void pn_reactor_process(pn_reactor_t *reactor) { assert(reactor); pn_event_t *event; @@ -196,10 +252,18 @@ void pn_reactor_start(pn_reactor_t *reactor) { bool pn_reactor_work(pn_reactor_t *reactor, int timeout) { assert(reactor); + reactor->now = pn_i_now(); pn_reactor_process(reactor); - if (!pn_selector_size(reactor->selector)) { - return false; + if (pn_selector_size(reactor->selector) == 1) { + if (reactor->selected) { + pn_timer_t *timer = pni_timer(reactor->timer); + if (!pn_timer_tasks(timer)) { + return false; + } + } else { + timeout = 0; + } } pn_selector_select(reactor->selector, timeout); @@ -221,6 +285,8 @@ bool pn_reactor_work(pn_reactor_t *reactor, int timeout) { } } + reactor->selected = true; + return true; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/23dc0a61/proton-c/src/reactor/reactor.h ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/reactor.h b/proton-c/src/reactor/reactor.h new file mode 100644 index 0000000..e21274d --- /dev/null +++ b/proton-c/src/reactor/reactor.h @@ -0,0 +1,30 @@ +#ifndef _PROTON_SRC_REACTOR_H +#define _PROTON_SRC_REACTOR_H 1 + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/reactor.h> + +pn_handler_t *pni_record_get_handler(pn_record_t *record); +void pni_record_init_handler(pn_record_t *record, pn_handler_t *handler); + +#endif /* src/reactor.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/23dc0a61/proton-c/src/reactor/timer.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/timer.c b/proton-c/src/reactor/timer.c new file mode 100644 index 0000000..fd4b39e --- /dev/null +++ b/proton-c/src/reactor/timer.c @@ -0,0 +1,144 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/object.h> +#include <proton/reactor.h> +#include <assert.h> + +struct pn_task_t { + pn_list_t *pool; + pn_record_t *attachments; + pn_timestamp_t deadline; +}; + +void pn_task_initialize(pn_task_t *task) { + task->pool = NULL; + task->attachments = pn_record(); + task->deadline = 0; +} + +void pn_task_finalize(pn_task_t *task) { + // if we are the last reference to the pool then don't put ourselves + // into it + if (task->pool && pn_refcount(task->pool) > 1) { + pn_record_clear(task->attachments); + pn_list_add(task->pool, task); + pn_decref(task->pool); + task->pool = NULL; + } else { + pn_decref(task->pool); + pn_decref(task->attachments); + } +} + +intptr_t pn_task_compare(pn_task_t *a, pn_task_t *b) { + return a->deadline - b->deadline; +} + +#define pn_task_inspect NULL +#define pn_task_hashcode NULL + +PN_CLASSDEF(pn_task) + +pn_task_t *pn_task(void) { + pn_task_t *task = pn_task_new(); + return task; +} + +pn_record_t *pn_task_attachments(pn_task_t *task) { + assert(task); + return task->attachments; +} + +// +// timer +// + +struct pn_timer_t { + pn_list_t *pool; + pn_list_t *tasks; + pn_collector_t *collector; +}; + +static void pn_timer_initialize(pn_timer_t *timer) { + timer->pool = pn_list(PN_OBJECT, 0); + timer->tasks = pn_list(PN_OBJECT, 0); +} + +static void pn_timer_finalize(pn_timer_t *timer) { + pn_decref(timer->pool); + pn_free(timer->tasks); +} + +#define pn_timer_inspect NULL +#define pn_timer_compare NULL +#define pn_timer_hashcode NULL + +PN_CLASSDEF(pn_timer) + +pn_timer_t *pn_timer(pn_collector_t *collector) { + pn_timer_t *timer = pn_timer_new(); + timer->collector = collector; + return timer; +} + +pn_task_t *pn_timer_schedule(pn_timer_t *timer, pn_timestamp_t deadline) { + pn_task_t *task = (pn_task_t *) pn_list_pop(timer->pool); + if (!task) { + task = pn_task(); + } + task->pool = timer->pool; + pn_incref(task->pool); + task->deadline = deadline; + pn_list_minpush(timer->tasks, task); + pn_decref(task); + return task; +} + +pn_timestamp_t pn_timer_deadline(pn_timer_t *timer) { + assert(timer); + if (pn_list_size(timer->tasks)) { + pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0); + return task->deadline; + } else { + return 0; + } +} + +void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now) { + assert(timer); + while (pn_list_size(timer->tasks)) { + pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0); + if (now >= task->deadline) { + pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks); + assert(min == task); + pn_collector_put(timer->collector, PN_OBJECT, task, PN_TIMER); + pn_decref(task); + } else { + break; + } + } +} + +int pn_timer_tasks(pn_timer_t *timer) { + assert(timer); + return pn_list_size(timer->tasks); +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/23dc0a61/proton-c/src/selectable.c ---------------------------------------------------------------------- diff --git a/proton-c/src/selectable.c b/proton-c/src/selectable.c index a1759d3..b5b4426 100644 --- a/proton-c/src/selectable.c +++ b/proton-c/src/selectable.c @@ -75,7 +75,9 @@ void pn_selectable_initialize(void *obj) void pn_selectable_finalize(void *obj) { pn_selectable_t *sel = (pn_selectable_t *) obj; - sel->finalize(sel); + if (sel->finalize) { + sel->finalize(sel); + } pn_free(sel->context); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/23dc0a61/proton-c/src/tests/reactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c index 36a2686..8455fa8 100644 --- a/proton-c/src/tests/reactor.c +++ b/proton-c/src/tests/reactor.c @@ -159,7 +159,7 @@ static void test_reactor_connection(void) { static void test_reactor_acceptor(void) { pn_reactor_t *reactor = pn_reactor(); assert(reactor); - pn_acceptor_t *acceptor = pn_reactor_acceptor(reactor, "0.0.0.0", "5672", NULL); + pn_acceptor_t *acceptor = pn_reactor_acceptor(reactor, "0.0.0.0", "5678", NULL); assert(acceptor); pn_reactor_free(reactor); } @@ -193,7 +193,7 @@ static void test_reactor_acceptor_run(void) { assert(reactor); pn_handler_t *root = pn_reactor_handler(reactor); assert(root); - pn_acceptor_t *acceptor = pn_reactor_acceptor(reactor, "0.0.0.0", "5672", NULL); + pn_acceptor_t *acceptor = pn_reactor_acceptor(reactor, "0.0.0.0", "5678", NULL); assert(acceptor); pn_handler_add(root, tra_handler(acceptor)); pn_reactor_run(reactor); @@ -241,7 +241,7 @@ static void client_dispatch(pn_handler_t *handler, pn_event_t *event) { pn_connection_t *conn = pn_event_connection(event); switch (pn_event_type(event)) { case PN_CONNECTION_INIT: - pn_connection_set_hostname(conn, "127.0.0.1:5672"); + pn_connection_set_hostname(conn, "127.0.0.1:5678"); pn_connection_open(conn); break; case PN_CONNECTION_REMOTE_OPEN: @@ -259,7 +259,7 @@ static void test_reactor_connect(void) { pn_reactor_t *reactor = pn_reactor(); pn_handler_t *sh = pn_handler_new(server_dispatch, sizeof(server_t), NULL); server_t *srv = smem(sh); - pn_acceptor_t *acceptor = pn_reactor_acceptor(reactor, "0.0.0.0", "5672", sh); + pn_acceptor_t *acceptor = pn_reactor_acceptor(reactor, "0.0.0.0", "5678", sh); srv->reactor = reactor; srv->acceptor = acceptor; srv->events = pn_list(PN_VOID, 0); @@ -393,6 +393,35 @@ static void test_reactor_transfer(int count, int window) { pn_handler_free(ch); } +static void test_reactor_schedule(void) { + pn_reactor_t *reactor = pn_reactor(); + pn_handler_t *root = pn_reactor_handler(reactor); + pn_list_t *events = pn_list(PN_VOID, 0); + pn_handler_add(root, test_handler(events)); + pn_reactor_schedule(reactor, 0, NULL); + pn_reactor_run(reactor); + pn_reactor_free(reactor); + expect(events, PN_REACTOR_INIT, PN_TIMER, PN_REACTOR_FINAL, END); + pn_free(events); +} + +static void test_reactor_schedule_handler(void) { + pn_reactor_t *reactor = pn_reactor(); + pn_handler_t *root = pn_reactor_handler(reactor); + pn_list_t *events = pn_list(PN_VOID, 0); + pn_list_t *tevents = pn_list(PN_VOID, 0); + pn_handler_add(root, test_handler(events)); + pn_handler_t *th = test_handler(tevents); + pn_reactor_schedule(reactor, 0, th); + pn_reactor_run(reactor); + pn_reactor_free(reactor); + pn_handler_free(th); + expect(events, PN_REACTOR_INIT, PN_REACTOR_FINAL, END); + expect(tevents, PN_TIMER, END); + pn_free(events); + pn_free(tevents); +} + int main(int argc, char **argv) { test_reactor(); @@ -410,5 +439,7 @@ int main(int argc, char **argv) } test_reactor_transfer(1024, 64); test_reactor_transfer(4*1024, 1024); + test_reactor_schedule(); + test_reactor_schedule_handler(); return 0; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
