Repository: qpid-proton Updated Branches: refs/heads/master b143db26f -> cfa366356
PROTON-1443: c proactor - pn_proactor_set_timeout(0) to mean immediate Also added pn_proactor_cancel_timeout() Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/cfa36635 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/cfa36635 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/cfa36635 Branch: refs/heads/master Commit: cfa3663568e3717e7978a3c0223a8c3b0f945656 Parents: b143db2 Author: Alan Conway <[email protected]> Authored: Wed Mar 22 21:41:19 2017 -0400 Committer: Alan Conway <[email protected]> Committed: Wed Mar 22 21:41:19 2017 -0400 ---------------------------------------------------------------------- proton-c/include/proton/proactor.h | 8 +++++-- proton-c/src/proactor/libuv.c | 40 ++++++++++++++++++++------------- proton-c/src/tests/proactor.c | 14 +++++++++++- 3 files changed, 43 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cfa36635/proton-c/include/proton/proactor.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h index 185a1af..345e3fc 100644 --- a/proton-c/include/proton/proactor.h +++ b/proton-c/include/proton/proactor.h @@ -164,8 +164,7 @@ PNP_EXTERN void pn_proactor_interrupt(pn_proactor_t *proactor); * Note: calling pn_proactor_set_timeout() again before the * PN_PROACTOR_TIMEOUT is delivered will cancel the previous timeout * and deliver an event only after the new - * timeout. `pn_proactor_set_timeout(0)` will cancel the timeout - * without setting a new one. + * timeout. * * Note: PN_PROACTOR_TIMEOUT events will be delivered in series, never * concurrently. @@ -173,6 +172,11 @@ PNP_EXTERN void pn_proactor_interrupt(pn_proactor_t *proactor); PNP_EXTERN void pn_proactor_set_timeout(pn_proactor_t *proactor, pn_millis_t timeout); /** + * Cancel the pending timeout set by pn_proactor_set_timeout() if there is one. + */ +PNP_EXTERN void pn_proactor_cancel_timeout(pn_proactor_t *proactor); + +/** * Cause a PN_CONNECTION_WAKE event to be returned by the proactor, even if * there are no IO events pending for the connection. * http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cfa36635/proton-c/src/proactor/libuv.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c index 8fc3eaf..1ba840a 100644 --- a/proton-c/src/proactor/libuv.c +++ b/proton-c/src/proactor/libuv.c @@ -217,6 +217,8 @@ struct pn_listener_t { listener_state state; }; +typedef enum { TM_NONE, TM_REQUEST, TM_PENDING, TM_FIRED } timeout_state_t; + struct pn_proactor_t { /* Leader thread */ uv_cond_t cond; @@ -230,16 +232,15 @@ struct pn_proactor_t { /* Protected by lock */ uv_mutex_t lock; - work_queue_t worker_q; /* ready for work, to be returned via pn_proactor_wait() */ - work_queue_t leader_q; /* waiting for attention by the leader thread */ - size_t interrupt; /* pending interrupts */ + work_queue_t worker_q; /* ready for work, to be returned via pn_proactor_wait() */ + work_queue_t leader_q; /* waiting for attention by the leader thread */ + size_t interrupt; /* pending interrupts */ + timeout_state_t timeout_state; pn_millis_t timeout; - size_t count; /* connection/listener count for INACTIVE events */ + size_t count; /* connection/listener count for INACTIVE events */ bool inactive; - bool timeout_request; - bool timeout_elapsed; bool has_leader; - bool batch_working; /* batch is being processed in a worker thread */ + bool batch_working; /* batch is being processed in a worker thread */ }; @@ -736,7 +737,9 @@ static void on_write(uv_write_t* write, int err) { static void on_timeout(uv_timer_t *timer) { pn_proactor_t *p = (pn_proactor_t*)timer->data; uv_mutex_lock(&p->lock); - p->timeout_elapsed = true; + if (p->timeout_state == TM_PENDING) { /* Only fire if still pending */ + p->timeout_state = TM_FIRED; + } uv_stop(&p->loop); /* UV does not always stop after on_timeout without this */ uv_mutex_unlock(&p->lock); } @@ -787,8 +790,8 @@ static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) { --p->interrupt; return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT); } - if (p->timeout_elapsed) { - p->timeout_elapsed = false; + if (p->timeout_state == TM_FIRED) { + p->timeout_state = TM_NONE; return proactor_batch_lh(p, PN_PROACTOR_TIMEOUT); } } @@ -883,12 +886,10 @@ void pconnection_detach(pconnection_t *pc) { /* Process the leader_q and the UV loop, in the leader thread */ static pn_event_batch_t *leader_lead_lh(pn_proactor_t *p, uv_run_mode mode) { /* Set timeout timer if there was a request, let it count down while we process work */ - if (p->timeout_request) { - p->timeout_request = false; + if (p->timeout_state == TM_REQUEST) { + p->timeout_state = TM_PENDING; uv_timer_stop(&p->timer); - if (p->timeout) { - uv_timer_start(&p->timer, on_timeout, p->timeout, 0); - } + uv_timer_start(&p->timer, on_timeout, p->timeout, 0); } pn_event_batch_t *batch = NULL; for (work_t *w = work_pop(&p->leader_q); w; w = work_pop(&p->leader_q)) { @@ -1014,7 +1015,14 @@ void pn_proactor_interrupt(pn_proactor_t *p) { void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) { uv_mutex_lock(&p->lock); p->timeout = t; - p->timeout_request = true; + p->timeout_state = TM_REQUEST; + uv_mutex_unlock(&p->lock); + notify(p); +} + +void pn_proactor_cancel_timeout(pn_proactor_t *p) { + uv_mutex_lock(&p->lock); + p->timeout_state = TM_NONE; uv_mutex_unlock(&p->lock); notify(p); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cfa36635/proton-c/src/tests/proactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c index 38237b0..34a1880 100644 --- a/proton-c/src/tests/proactor.c +++ b/proton-c/src/tests/proactor.c @@ -121,8 +121,20 @@ static void test_interrupt_timeout(test_t *t) { pn_proactor_interrupt(p); TEST_ETYPE_EQUAL(t, PN_PROACTOR_INTERRUPT, wait_next(p)); TEST_CHECK(t, pn_proactor_get(p) == NULL); /* idle */ - pn_proactor_set_timeout(p, 1); /* very short timeout */ + + /* Set an immediate timeout */ + pn_proactor_set_timeout(p, 0); + TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, wait_next(p)); + + /* Set a (very short) timeout */ + pn_proactor_set_timeout(p, 10); TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, wait_next(p)); + + /* Set and cancel a timeout, make sure we don't get the timeout event */ + pn_proactor_set_timeout(p, 10); + pn_proactor_cancel_timeout(p); + TEST_CHECK(t, pn_proactor_get(p) == NULL); /* idle */ + pn_proactor_free(p); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
