Attached is a Patch for apr_pollset_*.
Changes Include:
- Replace HAS_* with USE_* to remove some complex #ifdef stuff.
- Partially thread safe (*)
- Removes the limitation on the number of sockets that you can add to a
pollset (**)
* The patch allows multiple threads to concurrently call _add() or
_remove(). It is still only safe for one thread to call _poll() at a
time. This is only for EPoll and KQueue.
** It will still only return the the maximum number of pollfds you
declared in _create() to any single _poll() call. (pollset->nalloc).
This is only for EPoll and KQueue.
The patch removes the query_set array, and instead uses a set of 3
APR_RINGs to manage the pollfds:
query_ring - pollfds that are in active use.
dead_ring - pollfds that need to be free`ed after the next _poll()
free_ring - pollfds that we are free to reuse
in _add():
- If there is an extra pollfd in the free_ring, reuse it.
- If not, palloc() one.
in _remove():
- Move the pollfd from the query_ring to the dead_ring
in _poll():
- After polling is complete, move any pollfds inside the dead_ring to
the free_ring, as it is safe to re-use them now.
I would like to use these changes in the 'event' MPM for httpd.
My primary motivations for the pollset changes where:
- Not acceptable for an _add() to fail because the query_set array was
too small.
- Not acceptable to have to do high level locking of the different
pollset functions.
With these changes, 'Worker' threads can _add a socket directly back
into the Event Threads pollset, without having to wake it the Event
Thread, or preform any extra locking.
I have tested these changed on both 2.6 Linux (EPoll) and Mac OS X
10.3.5 (KQueue).
Currently, the file poll.c has turned into an "#ifdef' hell. In the
near future, I would like to break out the current poll.c file into
multiple files, one for each major implementation, eg:
pollset_epoll.c
pollset_kqueue.c
pollset_select.c
pollset_poll.c
If there are no objects to breaking it up into multiple files, I will
take a swing at it this weekend.
I almost consider the Current Patch to be un-reviewable, because of all
the things I had to change. If I can break pollset into multiple files,
hopefully, I can make a much more reviewable patch. :)
A word of thanks to Rici Lake for helping me debug these changes via
IRC.
As always, comments are very welcome.
-Paul Querna
Index: configure.in
===================================================================
RCS file: /home/cvspublic/apr/configure.in,v
retrieving revision 1.597
diff -u -r1.597 configure.in
--- configure.in 1 Aug 2004 00:52:20 -0000 1.597
+++ configure.in 23 Sep 2004 02:09:52 -0000
@@ -633,7 +633,8 @@
AC_SUBST(have_sigsuspend)
AC_SUBST(have_sigwait)
-AC_CHECK_FUNCS(poll kqueue)
+AC_CHECK_FUNCS(poll)
+AC_CHECK_FUNCS(kqueue, [ apr_cv_kqueue=yes ], [ apr_cv_kqueue=no ])
# Check for the Linux epoll interface; epoll* may be available in libc
# but return ENOSYS on a pre-2.6 kernel, so do a run-time check.
@@ -651,6 +652,12 @@
AC_DEFINE([HAVE_EPOLL], 1, [Define if the epoll interface is supported])
fi
+if test "$apr_cv_epoll" = "yes" -o "$apr_cv_kqueue" = "yes"; then
+ addpolling=1
+else
+ addpolling=0
+fi
+
dnl ----------------------------- Checking for missing POSIX thread functions
AC_CHECK_FUNCS([getpwnam_r getpwuid_r getgrnam_r getgrgid_r])
@@ -1341,6 +1348,7 @@
AC_SUBST(stdint)
AC_SUBST(bigendian)
AC_SUBST(aprlfs)
+AC_SUBST(addpolling)
dnl ----------------------------- Checking for string functions
AC_CHECK_FUNCS(strnicmp, have_strnicmp="1", have_strnicmp="0")
Index: include/apr.h.in
===================================================================
RCS file: /home/cvspublic/apr/include/apr.h.in,v
retrieving revision 1.137
diff -u -r1.137 apr.h.in
--- include/apr.h.in 5 Jun 2004 11:52:43 -0000 1.137
+++ include/apr.h.in 23 Sep 2004 02:09:52 -0000
@@ -220,6 +220,7 @@
#define APR_HAS_LARGE_FILES @aprlfs@
#define APR_HAS_XTHREAD_FILES 0
#define APR_HAS_OS_UUID 0
+#define APR_HAS_ADD_WHILE_POLLING @addpolling@
/* APR sets APR_FILES_AS_SOCKETS to 1 on systems where it is possible
* to poll on files/pipes.
Index: poll/unix/poll.c
===================================================================
RCS file: /home/cvspublic/apr/poll/unix/poll.c,v
retrieving revision 1.46
diff -u -r1.46 poll.c
--- poll/unix/poll.c 7 Jul 2004 07:40:12 -0000 1.46
+++ poll/unix/poll.c 23 Sep 2004 02:09:52 -0000
@@ -41,7 +41,24 @@
#define HAS_PIPES(dt) (dt == APR_POLL_FILE) ? 1 : 0
#endif
+/* Choose the best method platform specific to use in apr_pollset */
#ifdef HAVE_KQUEUE
+#define USE_KQUEUE
+#elif defined(HAVE_EPOLL)
+#define USE_EPOLL
+#elif defined(HAVE_POLL)
+#define USE_POLL
+#else
+#define USE_SELECT
+#endif
+
+#if defined(USE_KQUEUE) || defined(USE_EPOLL)
+#define USE_RING_SETS
+#include "apr_ring.h"
+#endif
+
+
+#ifdef USE_KQUEUE
static apr_int16_t get_kqueue_revent(apr_int16_t event, apr_int16_t flags)
{
apr_int16_t rv = 0;
@@ -55,10 +72,9 @@
return rv;
}
-
#endif
-#ifdef HAVE_EPOLL
+#ifdef USE_EPOLL
static apr_int16_t get_epoll_event(apr_int16_t event)
{
apr_int16_t rv = 0;
@@ -98,7 +114,7 @@
}
#endif
-#ifdef HAVE_POLL /* We can just use poll to do our socket polling. */
+#ifdef HAVE_POLL /* apr_poll() always uses Poll() when it is available */
static apr_int16_t get_event(apr_int16_t event)
{
@@ -212,11 +228,11 @@
return APR_SUCCESS;
}
+#elif defined(USE_SELECT) /* Use select to mimic poll */
-#else /* Use select to mimic poll */
-
-APR_DECLARE(apr_status_t) apr_poll(apr_pollfd_t *aprset, int num, apr_int32_t *nsds,
- apr_interval_time_t timeout)
+APR_DECLARE(apr_status_t) apr_poll(apr_pollfd_t *aprset, int num,
+ apr_int32_t *nsds,
+ apr_interval_time_t timeout)
{
fd_set readset, writeset, exceptset;
int rv, i;
@@ -345,54 +361,84 @@
return APR_SUCCESS;
}
-
+#else
+#error apr_poll() requires poll() or select()
#endif
+#ifdef USE_RING_SETS
+typedef struct pfd_elem_t pfd_elem_t;
+
+struct pfd_elem_t {
+ APR_RING_ENTRY(pfd_elem_t) link;
+ apr_pollfd_t pfd;
+};
+#endif
+
struct apr_pollset_t {
apr_pool_t *pool;
-
- apr_uint32_t nelts;
- apr_uint32_t nalloc;
-#ifdef HAVE_KQUEUE
+ apr_uint32_t nelts; /* number of sockets in the query set */
+ apr_uint32_t nalloc; /* maximum size of the result set */
+#ifdef USE_RING_SETS
+ /** A thread mutex to protect operations on the rings */
+ apr_thread_mutex_t *ring_lock;
+
+ /** A ring containing all of the pollfd_t that are active */
+ APR_RING_HEAD(pfd_query_ring_t, pfd_elem_t) query_ring;
+
+ /** A ring of pollfd_t that have been used, and then _remove()'d */
+ APR_RING_HEAD(pfd_free_ring_t, pfd_elem_t) free_ring;
+
+ /** A ring of pollfd_t where rings that have been _remove()`ed but
+ might still be inside a _poll() */
+ APR_RING_HEAD(pfd_dead_ring_t, pfd_elem_t) dead_ring;
+#else
+ apr_pollfd_t *query_set;
+#endif
+ apr_pollfd_t *result_set;
+#ifdef USE_KQUEUE
int kqueue_fd;
struct kevent kevent;
struct kevent *ke_set;
-#elif defined(HAVE_EPOLL)
+#endif
+#ifdef USE_EPOLL
int epoll_fd;
struct epoll_event *pollset;
-#elif defined(HAVE_POLL)
+#endif
+#ifdef USE_POLL
struct pollfd *pollset;
-#else
+#endif
+#ifdef USE_SELECT
fd_set readset, writeset, exceptset;
int maxfd;
-#endif
- apr_pollfd_t *query_set;
- apr_pollfd_t *result_set;
-
#ifdef NETWARE
int set_type;
#endif
+#endif
};
-#if defined(HAVE_KQUEUE) || defined(HAVE_EPOLL)
+
+#if defined(USE_KQUEUE) || defined(USE_EPOLL)
static apr_status_t backend_cleanup(void *p_)
{
apr_pollset_t *pollset = (apr_pollset_t *)p_;
-#ifdef HAVE_KQUEUE
+#ifdef USE_KQUEUE
close(pollset->kqueue_fd);
-#elif defined(HAVE_EPOLL)
+#elif defined(USE_EPOLL)
close(pollset->epoll_fd);
#endif
return APR_SUCCESS;
}
-#endif /* HAVE_KQUEUE || HAVE_EPOLL */
+#endif /* USE_KQUEUE || USE_EPOLL */
APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
apr_uint32_t size,
apr_pool_t *p,
apr_uint32_t flags)
{
-#if !defined(HAVE_KQUEUE) && !defined(HAVE_EPOLL) && !defined(HAVE_POLL) && defined(FD_SETSIZE)
+#ifdef USE_RING_SETS
+ apr_status_t rv;
+#endif
+#if defined(USE_SELECT) && defined(FD_SETSIZE)
if (size > FD_SETSIZE) {
*pollset = NULL;
return APR_EINVAL;
@@ -402,8 +448,15 @@
(*pollset)->nelts = 0;
(*pollset)->nalloc = size;
(*pollset)->pool = p;
-#ifdef HAVE_KQUEUE
- (*pollset)->ke_set = (struct kevent*)apr_palloc(p, size * sizeof(struct kevent));
+#ifdef USE_RING_SETS
+ if ((rv = apr_thread_mutex_create(&(*pollset)->ring_lock,
+ APR_THREAD_MUTEX_DEFAULT, (*pollset)->pool)) != APR_SUCCESS) {
+ return rv;
+ }
+#endif
+
+#ifdef USE_KQUEUE
+ (*pollset)->ke_set = (struct kevent*)apr_palloc(p, size * sizeof(struct kevent));
memset((*pollset)->ke_set, 0, size * sizeof(struct kevent));
(*pollset)->kqueue_fd = kqueue();
if ((*pollset)->kqueue_fd == -1) {
@@ -411,14 +464,14 @@
}
apr_pool_cleanup_register(p, (void*)(*pollset), backend_cleanup,
apr_pool_cleanup_null);
-#elif defined(HAVE_EPOLL)
+#elif defined(USE_EPOLL)
(*pollset)->epoll_fd = epoll_create(size);
(*pollset)->pollset = apr_palloc(p, size * sizeof(struct epoll_event));
apr_pool_cleanup_register(p, (void*)(*pollset), backend_cleanup,
apr_pool_cleanup_null);
-#elif defined(HAVE_POLL)
+#elif defined(USE_POLL)
(*pollset)->pollset = apr_palloc(p, size * sizeof(struct pollfd));
-#else
+#elif defined(USE_SELECT)
FD_ZERO(&((*pollset)->readset));
FD_ZERO(&((*pollset)->writeset));
FD_ZERO(&((*pollset)->exceptset));
@@ -426,8 +479,15 @@
#ifdef NETWARE
(*pollset)->set_type = APR_NO_DESC;
#endif
-#endif
+#endif /* USE_SELECT */
+
+#ifdef USE_RING_SETS
+ APR_RING_INIT(&(*pollset)->query_ring, pfd_elem_t, link);
+ APR_RING_INIT(&(*pollset)->free_ring, pfd_elem_t, link);
+ APR_RING_INIT(&(*pollset)->dead_ring, pfd_elem_t, link);
+#else
(*pollset)->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
+#endif
(*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
return APR_SUCCESS;
@@ -435,7 +495,7 @@
APR_DECLARE(apr_status_t) apr_pollset_destroy(apr_pollset_t *pollset)
{
-#if defined(HAVE_KQUEUE) || defined(HAVE_EPOLL)
+#if defined(USE_KQUEUE) || defined(USE_EPOLL)
return apr_pool_cleanup_run(pollset->pool, pollset, backend_cleanup);
#else
return APR_SUCCESS;
@@ -445,24 +505,53 @@
APR_DECLARE(apr_status_t) apr_pollset_add(apr_pollset_t *pollset,
const apr_pollfd_t *descriptor)
{
-#ifdef HAVE_KQUEUE
+#if defined(USE_KQUEUE) || defined(USE_SELECT)
apr_os_sock_t fd;
-#elif defined(HAVE_EPOLL)
+#elif defined(USE_EPOLL)
struct epoll_event ev;
int ret = -1;
-#else
-#if !defined(HAVE_POLL)
- apr_os_sock_t fd;
-#endif
#endif
-
+#ifdef USE_RING_SETS
+ pfd_elem_t* elem;
+ apr_status_t rv = APR_SUCCESS;
+#endif
+
+#ifdef USE_RING_SETS
+ /**
+ * Doing _add() in a Thread Safe Manner:
+ * - Lock the Rings to protect us from other threads
+ * doing an _add() or _remove().
+ * - Get a Ring Element to use:
+ * - From the free_ring.
+ * - Allocate a new element.
+ * - Copy the pollfd into the element.
+ * - Add it to the query_ring
+ * - Do the KQueue/EPoll Add.
+ * - If the Add Fails:
+ * - Move the Element to the Free Ring from the Query Ring
+ * - Unlock the Rings
+ */
+
+ apr_thread_mutex_lock(pollset->ring_lock);
+
+ if(!APR_RING_EMPTY(&(pollset->free_ring), pfd_elem_t, link)) {
+ elem = APR_RING_FIRST(&(pollset->free_ring));
+ APR_RING_REMOVE(elem, link);
+ }
+ else {
+ elem = (pfd_elem_t*) apr_palloc(pollset->pool, sizeof(pfd_elem_t));
+ APR_RING_ELEM_INIT(elem, link);
+ }
+ elem->pfd = *descriptor;
+ APR_RING_INSERT_TAIL(&(pollset->query_ring), elem, pfd_elem_t, link);
+#else
if (pollset->nelts == pollset->nalloc) {
return APR_ENOMEM;
}
-
pollset->query_set[pollset->nelts] = *descriptor;
+#endif
-#ifdef HAVE_KQUEUE
+#ifdef USE_KQUEUE
if (descriptor->desc_type == APR_POLL_SOCKET) {
fd = descriptor->desc.s->socketdes;
}
@@ -471,39 +560,40 @@
}
if (descriptor->reqevents & APR_POLLIN) {
- EV_SET(&pollset->kevent, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
+ EV_SET(&pollset->kevent, fd, EVFILT_READ, EV_ADD,
+ 0, 0, elem);
if (kevent(pollset->kqueue_fd, &pollset->kevent, 1, NULL, 0,
NULL) == -1) {
- return APR_ENOMEM;
+ rv = APR_ENOMEM;
}
}
- if (descriptor->reqevents & APR_POLLOUT) {
- EV_SET(&pollset->kevent, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
+ if (descriptor->reqevents & APR_POLLOUT && rv == APR_SUCCESS) {
+ EV_SET(&pollset->kevent, fd, EVFILT_WRITE, EV_ADD,
+ 0, 0, elem);
if (kevent(pollset->kqueue_fd, &pollset->kevent, 1, NULL, 0,
NULL) == -1) {
- return APR_ENOMEM;
+ rv = APR_ENOMEM;
}
}
-#elif defined(HAVE_EPOLL)
+#elif defined(USE_EPOLL)
ev.events = get_epoll_event(descriptor->reqevents);
+ ev.data.ptr = (void*) elem;
if (descriptor->desc_type == APR_POLL_SOCKET) {
- ev.data.fd = descriptor->desc.s->socketdes;
ret = epoll_ctl(pollset->epoll_fd, EPOLL_CTL_ADD,
descriptor->desc.s->socketdes, &ev);
}
else {
- ev.data.fd = descriptor->desc.f->filedes;
ret = epoll_ctl(pollset->epoll_fd, EPOLL_CTL_ADD,
descriptor->desc.f->filedes, &ev);
}
if (0 != ret) {
- return APR_EBADF;
+ rv = APR_EBADF;
}
-#elif defined(HAVE_POLL)
+#elif defined(USE_POLL)
if (descriptor->desc_type == APR_POLL_SOCKET) {
pollset->pollset[pollset->nelts].fd = descriptor->desc.s->socketdes;
@@ -513,7 +603,7 @@
}
pollset->pollset[pollset->nelts].events = get_event(descriptor->reqevents);
-#else
+#elif defined(USE_SELECT)
if (descriptor->desc_type == APR_POLL_SOCKET) {
#ifdef NETWARE
/* NetWare can't handle mixed descriptor types in select() */
@@ -541,8 +631,8 @@
}
#else
fd = descriptor->desc.f->filedes;
-#endif
-#endif
+#endif /* NETWARE */
+#endif /* !APR_FILES_AS_SOCKETS */
}
#if !defined(WIN32) && !defined(NETWARE) /* socket sets handled with array of handles */
if (fd >= FD_SETSIZE) {
@@ -563,6 +653,20 @@
if ((int)fd > pollset->maxfd) {
pollset->maxfd = (int)fd;
}
+#endif /* USE_SELECT */
+
+#ifdef USE_RING_SETS
+ if(rv != APR_SUCCESS) {
+ /** _add() Failed. Move the Element into the free_ring. */
+ APR_RING_REMOVE(elem, link);
+ APR_RING_INSERT_TAIL(&(pollset->free_ring), elem, pfd_elem_t, link);
+ }
+ else {
+ pollset->nelts++;
+ }
+
+ apr_thread_mutex_unlock(pollset->ring_lock);
+ return rv;
#endif
pollset->nelts++;
return APR_SUCCESS;
@@ -571,98 +675,95 @@
APR_DECLARE(apr_status_t) apr_pollset_remove(apr_pollset_t *pollset,
const apr_pollfd_t *descriptor)
{
- apr_uint32_t i;
-#ifdef HAVE_KQUEUE
+#if defined(USE_KQUEUE) || defined(USE_SELECT)
apr_os_sock_t fd;
-#elif defined(HAVE_EPOLL)
+#elif defined(USE_EPOLL)
struct epoll_event ev;
int ret = -1;
-#elif !defined(HAVE_POLL)
- apr_os_sock_t fd;
#endif
+#ifdef USE_RING_SETS
+ pfd_elem_t* ep;
+ apr_status_t rv = APR_SUCCESS;
+#endif
+
+#ifdef USE_RING_SETS
+ /**
+ * Doing _remove() in a Thread Safe Manner:
+ * - Lock the Rings to protect us from other threads
+ * doing an _add() or _remove().
+ * - Remove the Socket from the epoll/kqueue
+ *
+ * !!!! Warning: There is a possible race condition here:
+ *
+ * If after we remove an FD from the epoll/kqueue
+ * they still show up in _poll() we are FUBAR.
+ *
+ * What we assume is that after removing them,
+ * they will _never_ turn up again in _poll().
+ *
+ * However, to avoid this possible Race condition, there is a
+ * 3rd ring, the 'dead_ring', where sockets are moved to be
+ * free`ed. At the end of _poll() we will clear the dead_ring.
+ *
+ * - Search for the correct element in the query_ring
+ * - Move the Matching Elements to the Free Ring from the Query Ring
+ * - Unlock the Rings
+ */
-#ifdef HAVE_KQUEUE
- for (i = 0; i < pollset->nelts; i++) {
- if (descriptor->desc.s == pollset->query_set[i].desc.s) {
- /* Found an instance of the fd: remove this and any other copies */
- apr_uint32_t dst = i;
- apr_uint32_t old_nelts = pollset->nelts;
- pollset->nelts--;
- for (i++; i < old_nelts; i++) {
- if (descriptor->desc.s == pollset->query_set[i].desc.s) {
- pollset->nelts--;
- }
- else {
- pollset->query_set[dst] = pollset->query_set[i];
- dst++;
- }
- }
+ apr_thread_mutex_lock(pollset->ring_lock);
- if (descriptor->desc_type == APR_POLL_SOCKET) {
- fd = descriptor->desc.s->socketdes;
- }
- else {
- fd = descriptor->desc.f->filedes;
- }
-
- if (descriptor->reqevents & APR_POLLIN) {
- EV_SET(&pollset->kevent, fd,
- EVFILT_READ, EV_DELETE, 0, 0, NULL);
+#endif
- if (kevent(pollset->kqueue_fd, &pollset->kevent, 1, NULL, 0,
- NULL) == -1) {
- return APR_EBADF;
- }
+#ifdef USE_KQUEUE
+ if (descriptor->desc_type == APR_POLL_SOCKET) {
+ fd = descriptor->desc.s->socketdes;
+ }
+ else {
+ fd = descriptor->desc.f->filedes;
+ }
+
+ do {
+ if (descriptor->reqevents & APR_POLLIN) {
+ EV_SET(&pollset->kevent, fd,
+ EVFILT_READ, EV_DELETE, 0, 0, NULL);
+
+ if (kevent(pollset->kqueue_fd, &pollset->kevent, 1, NULL, 0,
+ NULL) == -1) {
+ rv = APR_NOTFOUND;
+ break;
+ }
+ }
+
+ if (descriptor->reqevents & APR_POLLOUT) {
+ EV_SET(&pollset->kevent, fd,
+ EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
+
+ if (kevent(pollset->kqueue_fd, &pollset->kevent, 1, NULL, 0,
+ NULL) == -1) {
+ rv = APR_NOTFOUND;
+ break;
}
+ }
+ } while(0);
- if (descriptor->reqevents & APR_POLLOUT) {
- EV_SET(&pollset->kevent, fd,
- EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
-
- if (kevent(pollset->kqueue_fd, &pollset->kevent, 1, NULL, 0,
- NULL) == -1) {
- return APR_EBADF;
- }
- }
+#elif defined(USE_EPOLL)
- return APR_SUCCESS;
- }
+ ev.events = get_epoll_event(descriptor->reqevents);
+ if (descriptor->desc_type == APR_POLL_SOCKET) {
+ ret = epoll_ctl(pollset->epoll_fd, EPOLL_CTL_DEL,
+ descriptor->desc.s->socketdes, &ev);
}
-#elif defined(HAVE_EPOLL)
- for (i = 0; i < pollset->nelts; i++) {
- if (descriptor->desc.s == pollset->query_set[i].desc.s) {
- /* Found an instance of the fd: remove this and any other copies */
- apr_uint32_t dst = i;
- apr_uint32_t old_nelts = pollset->nelts;
- pollset->nelts--;
- for (i++; i < old_nelts; i++) {
- if (descriptor->desc.s == pollset->query_set[i].desc.s) {
- pollset->nelts--;
- }
- else {
- pollset->query_set[dst] = pollset->query_set[i];
- dst++;
- }
- }
- ev.events = get_epoll_event(descriptor->reqevents);
- if (descriptor->desc_type == APR_POLL_SOCKET) {
- ev.data.fd = descriptor->desc.s->socketdes;
- ret = epoll_ctl(pollset->epoll_fd, EPOLL_CTL_DEL,
- descriptor->desc.s->socketdes, &ev);
- }
- else {
- ev.data.fd = descriptor->desc.f->filedes;
- ret = epoll_ctl(pollset->epoll_fd, EPOLL_CTL_DEL,
- descriptor->desc.f->filedes, &ev);
- }
- if (ret < 0) {
- return APR_EBADF;
- }
-
- return APR_SUCCESS;
- }
+ else {
+ ret = epoll_ctl(pollset->epoll_fd, EPOLL_CTL_DEL,
+ descriptor->desc.f->filedes, &ev);
}
-#elif defined(HAVE_POLL)
+
+ if (ret < 0) {
+ rv = APR_NOTFOUND;
+ }
+
+#elif defined(USE_POLL)
+
for (i = 0; i < pollset->nelts; i++) {
if (descriptor->desc.s == pollset->query_set[i].desc.s) {
/* Found an instance of the fd: remove this and any other copies */
@@ -683,7 +784,7 @@
}
}
-#else /* no poll */
+#elif defined(USE_SELECT)
if (descriptor->desc_type == APR_POLL_SOCKET) {
fd = descriptor->desc.s->socketdes;
}
@@ -719,18 +820,40 @@
return APR_SUCCESS;
}
}
-#endif /* no poll */
+#endif /* USE_SELECT */
+
+#ifdef USE_RING_SETS
+ if(!APR_RING_EMPTY(&(pollset->query_ring), pfd_elem_t, link)) {
+ for(ep = APR_RING_FIRST(&(pollset->query_ring));
+ ep != APR_RING_SENTINEL(&(pollset->query_ring),
+ pfd_elem_t, link);
+ ep = APR_RING_NEXT(ep, link)) {
+
+ if(descriptor->desc.s == ep->pfd.desc.s) {
+ APR_RING_REMOVE(ep, link);
+ APR_RING_INSERT_TAIL(&(pollset->dead_ring),
+ ep, pfd_elem_t, link);
+ break;
+ }
+ }
+ }
+
+ apr_thread_mutex_unlock(pollset->ring_lock);
+
+ return rv;
+#endif
return APR_NOTFOUND;
}
-#ifdef HAVE_KQUEUE
+
+#ifdef USE_KQUEUE
+
APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
apr_interval_time_t timeout,
apr_int32_t *num,
const apr_pollfd_t **descriptors)
{
- int rv;
- apr_uint32_t i, j, r = 0;
+ int rv, i;
struct timespec tv, *tvptr;
if (timeout < 0) {
@@ -742,60 +865,56 @@
tvptr = &tv;
}
- rv = kevent(pollset->kqueue_fd, NULL, 0, pollset->ke_set, pollset->nelts,
+ rv = kevent(pollset->kqueue_fd, NULL, 0, pollset->ke_set, pollset->nalloc,
tvptr);
(*num) = rv;
if (rv < 0) {
return apr_get_netos_error();
}
+
if (rv == 0) {
return APR_TIMEUP;
}
- /* TODO: Is there a better way to re-associate our data? */
- for (i = 0; i < pollset->nelts; i++) {
- apr_os_sock_t fd;
- if (pollset->query_set[i].desc_type == APR_POLL_SOCKET) {
- fd = pollset->query_set[i].desc.s->socketdes;
- }
- else {
- fd = pollset->query_set[i].desc.f->filedes;
- }
- for (j = 0; j < rv; j++) {
- if (pollset->ke_set[j].ident == fd ) {
- pollset->result_set[r] = pollset->query_set[i];
- pollset->result_set[r].rtnevents =
- get_kqueue_revent(pollset->ke_set[j].filter,
- pollset->ke_set[j].flags);
- r++;
- }
- }
+ for (i = 0; i < rv; i++) {
+ pollset->result_set[i] =
+ (((pfd_elem_t*)(pollset->ke_set[i].udata))->pfd);
+ pollset->result_set[i].rtnevents =
+ get_kqueue_revent(pollset->ke_set[i].filter,
+ pollset->ke_set[i].flags);
}
- (*num) = r;
-
if (descriptors) {
*descriptors = pollset->result_set;
}
+ apr_thread_mutex_lock(pollset->ring_lock);
+ /* Shift anything in the Dead Ring to be Free Ring */
+ while (!APR_RING_EMPTY(&(pollset->dead_ring), pfd_elem_t, link)) {
+ ep = APR_RING_FIRST(&(pollset->dead_ring));
+ APR_RING_REMOVE(ep, link);
+ APR_RING_INSERT_TAIL(&(pollset->free_ring), ep, pfd_elem_t, link);
+ }
+ apr_thread_mutex_unlock(pollset->ring_lock);
+
return APR_SUCCESS;
}
-#elif defined(HAVE_EPOLL)
+#elif defined(USE_EPOLL)
APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
apr_interval_time_t timeout,
apr_int32_t *num,
const apr_pollfd_t **descriptors)
{
- int rv;
- apr_uint32_t i, j, k;
-
+ int rv, i;
+ pfd_elem_t* ep;
+
if (timeout > 0) {
timeout /= 1000;
}
- rv = epoll_wait(pollset->epoll_fd, pollset->pollset, pollset->nelts,
+ rv = epoll_wait(pollset->epoll_fd, pollset->pollset, pollset->nalloc,
timeout);
(*num) = rv;
if (rv < 0) {
@@ -804,38 +923,32 @@
if (rv == 0) {
return APR_TIMEUP;
}
- j = 0;
- for (i = 0; i < pollset->nelts; i++) {
- if (pollset->pollset[i].events != 0) {
- /* TODO: Is there a better way to re-associate our data? */
- for (k = 0; k < pollset->nelts; k++) {
- if (pollset->query_set[k].desc_type == APR_POLL_SOCKET &&
- pollset->query_set[k].desc.s->socketdes ==
- pollset->pollset[i].data.fd) {
- pollset->result_set[j] = pollset->query_set[k];
- pollset->result_set[j].rtnevents =
- get_epoll_revent(pollset->pollset[i].events);
- j++;
- break;
- }
- else if (pollset->query_set[k].desc_type == APR_POLL_FILE
- && pollset->query_set[k].desc.f->filedes ==
- pollset->pollset[i].data.fd) {
- pollset->result_set[j] = pollset->query_set[k];
- pollset->result_set[j].rtnevents =
- get_epoll_revent(pollset->pollset[i].events);
- j++;
- break;
- }
- }
- }
+
+ for (i = 0; i < rv; i++) {
+ pollset->result_set[i] =
+ (((pfd_elem_t*)(pollset->pollset[i].data.ptr))->pfd);
+ pollset->result_set[i].rtnevents =
+ get_epoll_revent(pollset->pollset[i].events);
}
+
if (descriptors) {
*descriptors = pollset->result_set;
}
+
+ apr_thread_mutex_lock(pollset->ring_lock);
+ /* Shift anything in the Dead Ring to be Free Ring */
+ while (!APR_RING_EMPTY(&(pollset->dead_ring), pfd_elem_t, link)) {
+ ep = APR_RING_FIRST(&(pollset->dead_ring));
+ APR_RING_REMOVE(ep, link);
+ APR_RING_INSERT_TAIL(&(pollset->free_ring), ep, pfd_elem_t, link);
+ }
+ apr_thread_mutex_unlock(pollset->ring_lock);
+
return APR_SUCCESS;
}
-#elif defined(HAVE_POLL)
+
+#elif defined(USE_POLL)
+
APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
apr_interval_time_t timeout,
apr_int32_t *num,
@@ -869,7 +982,7 @@
return APR_SUCCESS;
}
-#else /* no poll */
+#elif defined(USE_SELECT)
APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
apr_interval_time_t timeout,
@@ -944,4 +1057,4 @@
return APR_SUCCESS;
}
-#endif /* no poll */
+#endif /* USE_SELECT */