Repository: qpid-proton Updated Branches: refs/heads/master d16794ba0 -> 80def6f9a
PROTON-1460: epoll test fixes: log_event, listener setup, EOS detection Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/80def6f9 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/80def6f9 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/80def6f9 Branch: refs/heads/master Commit: 80def6f9a5ae4c37b69d5da2b6db650ec1b63821 Parents: d16794b Author: Clifford Jansen <[email protected]> Authored: Sat Apr 22 13:49:22 2017 -0700 Committer: Clifford Jansen <[email protected]> Committed: Sat Apr 22 13:49:22 2017 -0700 ---------------------------------------------------------------------- proton-c/src/proactor/epoll.c | 53 +++++++++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/80def6f9/proton-c/src/proactor/epoll.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c index 7e2ee2c..126a7e2 100644 --- a/proton-c/src/proactor/epoll.c +++ b/proton-c/src/proactor/epoll.c @@ -232,11 +232,16 @@ static void stop_polling(epoll_extended_t *ee, int epollfd) { * connection, each listener, the proactor itself. The serialization * is presented to the application via each associated event batch. * + * Multiple threads can be trying to do work on a single context + * (i.e. socket IO is ready and wakeup at same time). Mutexes are used + * to manage contention. Some vars are only ever touched by one + * "working" thread and are accessed without holding the mutex. + * * Currently internal wakeups (via wake()/wake_notify()) are used to * force a context to check if it has work to do. To minimize trips - * through the kernel, wake() is a no-op if the context is working. - * Conversely, a context must never stop working without checking if - * it has newly arrived work. + * through the kernel, wake() is a no-op if the context has a working + * thread. Conversely, a thread must never stop working without + * checking if it has newly arrived work. * * External wake operations, like pn_connection_wake() and * pn_proactor_interrupt(), are built on top of the internal wake @@ -506,6 +511,13 @@ static inline bool proactor_has_event(pn_proactor_t *p) { return (p->cached_event || (p->cached_event = pn_collector_next(p->collector))); } +static pn_event_t *log_event(void* p, pn_event_t *e) { + if (e) { + pn_logf("[%p]:(%s)", (void*)p, pn_event_type_name(pn_event_type(e))); + } + return e; +} + static void psocket_error(psocket_t *ps, int err, const char* what) { if (ps->is_conn) { pn_connection_driver_t *driver = &as_pconnection(ps)->driver; @@ -622,6 +634,7 @@ void pconnection_begin_close(pconnection_t *pc) { pc->psocket.closing = true; pc->read_closed = pc->write_closed = true; stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd); + pc->current_arm = 0; pn_connection_driver_close(&pc->driver); ptimer_set(&pc->timer, 0); } @@ -856,7 +869,11 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, if (!pc->read_closed) { pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver); - if (rbuf.size > 0 && (!pc->read_blocked)) { + if (rbuf.size == 0) { + if (pn_connection_driver_read_closed(&pc->driver)) + pc->read_closed = true; + } + else if (!pc->read_blocked) { ssize_t n = read(pc->psocket.sockfd, rbuf.start, rbuf.size); if (n > 0) { @@ -932,6 +949,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, goto retry; // TODO: get rid of goto without adding more locking pc->context.working = false; + pc->hog_count = 0; bool rearm = pconnection_rearm_check(pc); unlock(&pc->context.mutex); @@ -971,6 +989,7 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) // TODO: check case of proactor shutting down lock(&pc->context.mutex); proactor_add(&pc->psocket); + pn_connection_open(pc->driver.connection); /* Auto-open */ struct addrinfo *ai = NULL; int fd = -1; @@ -1026,6 +1045,12 @@ void pn_connection_wake(pn_connection_t* c) { if (notify) wake_notify(&pc->context); } +void pn_proactor_release_connection(pn_connection_t *c) { + pconnection_t *pc = get_pconnection(c); + if (pc) { + pn_connection_driver_release_connection(&pc->driver); + } +} // ======================================================================== // listener @@ -1046,6 +1071,8 @@ pn_listener_t *pn_listener() { pn_listener_free(l); return NULL; } + pn_proactor_t *unknown = NULL; // won't know until pn_proactor_listen + pcontext_init(&l->context, LISTENER, unknown, l); } return l; } @@ -1056,12 +1083,15 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in assert(buf); // TODO: memory safety char *scheme, *user, *pass, *host, *port, *path; pni_parse_url(buf, &scheme, &user, &pass, &host, &port, &path); - pcontext_init(&l->context, LISTENER, p, l); + // TODO: check listener not already listening for this or another proactor + lock(&l->context.mutex); + l->context.proactor = p;; psocket_init(&l->psocket, p, false, host, port); l->backlog = backlog; proactor_add(&l->psocket); /* Always put an OPEN event for symmetry, even if we immediately close with err */ pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN); + bool notify = wake(&l->context); struct addrinfo *ai = NULL; int fd = -1; @@ -1076,6 +1106,8 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in l->psocket.epoll_io.fd = fd; l->psocket.epoll_io.wanted = EPOLLIN; start_polling(&l->psocket.epoll_io, l->psocket.proactor->epollfd); // TODO: check for error + unlock(&l->context.mutex); + if (notify) wake_notify(&l->context); free(buf); return; } @@ -1083,8 +1115,9 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in } psocket_error(&l->psocket, errno, "listen on"); + unlock(&l->context.mutex); + if (notify) wake_notify(&l->context); if (ai) freeaddrinfo(ai); - wake(&l->context); free(buf); return; } @@ -1190,7 +1223,7 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) { if (e && pn_event_type(e) == PN_LISTENER_CLOSE) l->close_dispatched = true; unlock(&l->context.mutex); - return e; + return log_event(l, e); } static void listener_done(pn_listener_t *l) { @@ -1396,7 +1429,7 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) { e = p->cached_event; unlock(&p->context.mutex); p->cached_event = NULL; - return e; + return log_event(p, e); } static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool timeout) { @@ -1711,7 +1744,3 @@ size_t pn_proactor_addr_str(const struct pn_proactor_addr_t* addr, char *buf, si } } -/* FIXME aconway 2017-04-21: dummy to make test link, needs implementation */ -void pn_proactor_release_connection(pn_connection_t *connection) { - abort(); -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
