Repository: qpid-proton Updated Branches: refs/heads/master c4d16584a -> b0be770d7
PROTON-1460: epoll additional error checks Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b0be770d Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b0be770d Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b0be770d Branch: refs/heads/master Commit: b0be770d752c5067fb806fb081f1693418b52d02 Parents: c4d1658 Author: Clifford Jansen <[email protected]> Authored: Sun May 14 18:21:44 2017 -0700 Committer: Clifford Jansen <[email protected]> Committed: Sun May 14 18:21:44 2017 -0700 ---------------------------------------------------------------------- proton-c/src/proactor/epoll.c | 35 +++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0be770d/proton-c/src/proactor/epoll.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c index fae68e9..9fd7d06 100644 --- a/proton-c/src/proactor/epoll.c +++ b/proton-c/src/proactor/epoll.c @@ -54,7 +54,7 @@ #include <time.h> // TODO: replace timerfd per connection with global lightweight timer mechanism. -// logging in general, listener events in particular +// logging in general // SIGPIPE? // Can some of the mutexes be spinlocks (any benefit over adaptive pthread mutex)? // Maybe futex is even better? @@ -64,6 +64,13 @@ // looking for pending wakes before a kernel call to epoll_wait(), or there // could be several eventfds with random assignment of wakeables. +/* Internal error, no recovery */ +#define EPOLL_FATAL(EXPR, SYSERRNO) \ + do { \ + fprintf(stderr, "epoll proactor failure in %s:%d: %s: %s\n", \ + __FILE__, __LINE__ , #EXPR, strerror(SYSERRNO)); \ + abort(); \ + } while (0) // ======================================================================== // First define a proactor mutex (pmutex) and timer mechanism (ptimer) to taste. @@ -103,6 +110,7 @@ typedef struct epoll_extended_t { int fd; epoll_type_t type; // io/timer/wakeup uint32_t wanted; // events to poll for + bool polling; } epoll_extended_t; /* @@ -130,6 +138,7 @@ static bool ptimer_init(ptimer_t *pt, struct psocket_t *ps) { pt->epoll_io.fd = pt->timerfd; pt->epoll_io.type = type; pt->epoll_io.wanted = EPOLLIN; + pt->epoll_io.polling = false; return true; } @@ -213,6 +222,9 @@ PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor) PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener) static bool start_polling(epoll_extended_t *ee, int epollfd) { + if (ee->polling) + return false; + ee->polling = true; struct epoll_event ev; ev.data.ptr = ee; ev.events = ee->wanted | EPOLLONESHOT; @@ -221,13 +233,15 @@ static bool start_polling(epoll_extended_t *ee, int epollfd) { static void stop_polling(epoll_extended_t *ee, int epollfd) { // TODO: check for error, return bool or just log? - if (ee->fd == -1) + if (ee->fd == -1 || !ee->polling || epollfd == -1) return; struct epoll_event ev; ev.data.ptr = ee; ev.events = 0; - epoll_ctl(epollfd, EPOLL_CTL_DEL, ee->fd, &ev); // TODO: check for error + if (epoll_ctl(epollfd, EPOLL_CTL_DEL, ee->fd, &ev) == -1) + EPOLL_FATAL("EPOLL_CTL_DEL", errno); ee->fd = -1; + ee->polling = false; } /* @@ -362,9 +376,11 @@ static bool wake(pcontext_t *ctx) { // part2: make OS call without lock held static inline void wake_notify(pcontext_t *ctx) { + if (ctx->proactor->eventfd == -1) + return; uint64_t increment = 1; - int err = write(ctx->proactor->eventfd, &increment, sizeof(uint64_t)); - (void)err; // TODO: check for error + if (write(ctx->proactor->eventfd, &increment, sizeof(uint64_t)) != sizeof(uint64_t)) + EPOLL_FATAL("setting eventfd", errno); } // call with no locks @@ -407,6 +423,7 @@ static void psocket_init(psocket_t* ps, pn_proactor_t* p, pn_listener_t *listene ps->epoll_io.fd = -1; ps->epoll_io.type = listener ? LISTENER_IO : PCONNECTION_IO; ps->epoll_io.wanted = 0; + ps->epoll_io.polling = false; ps->proactor = p; ps->listener = listener; ps->sockfd = -1; @@ -540,7 +557,8 @@ static void rearm(pn_proactor_t *p, epoll_extended_t *ee) { struct epoll_event ev; ev.data.ptr = ee; ev.events = ee->wanted | EPOLLONESHOT; - epoll_ctl(p->epollfd, EPOLL_CTL_MOD, ee->fd, &ev); // TODO: check for error + if (epoll_ctl(p->epollfd, EPOLL_CTL_MOD, ee->fd, &ev) == -1) + EPOLL_FATAL("arming polled file descriptor", errno); } // ======================================================================== @@ -1007,6 +1025,7 @@ static void pconnection_start(pconnection_t *pc) { epoll_extended_t *ee = &pc->psocket.epoll_io; ee->fd = pc->psocket.sockfd; ee->wanted = EPOLLIN | EPOLLOUT; + ee->polling = false; start_polling(ee, efd); // TODO: check for error } @@ -1172,6 +1191,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in ps->sockfd = fd; ps->epoll_io.fd = fd; ps->epoll_io.wanted = EPOLLIN; + ps->epoll_io.polling = false; start_polling(&ps->epoll_io, ps->proactor->epollfd); // TODO: check for error } } @@ -1413,6 +1433,7 @@ pn_proactor_t *pn_proactor() { p->epoll_wake.fd = p->eventfd; p->epoll_wake.type = WAKE; p->epoll_wake.wanted = EPOLLIN; + p->epoll_wake.polling = false; start_polling(&p->epoll_wake, p->epollfd); // TODO: check for error return p; } @@ -1429,7 +1450,9 @@ pn_proactor_t *pn_proactor() { void pn_proactor_free(pn_proactor_t *p) { // No competing threads, not even a pending timer close(p->epollfd); + p->epollfd = -1; close(p->eventfd); + p->eventfd = -1; ptimer_finalize(&p->timer); while (p->contexts) { pcontext_t *ctx = p->contexts; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
