Hi,

Since the WIN32 imposes pretty nasty limit on FD_SETSIZE to 64, that
is way too low for any serious usage, I developed an alternative
implementation.
Also the code support the APR_POLLSET_THREADSAFE flag.

A simple patch to apr_arch_poll_private.h allows to have
multiple implementations compilable.

Any comments?

Regards,
Mladen.

/* Copyright 2000-2004 The Apache Software Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "apr.h"
#include "apr_poll.h"
#include "apr_time.h"
#include "apr_portable.h"
#include "apr_arch_networkio.h"
#include "apr_arch_file_io.h"
#include "apr_arch_poll_private.h"

#if POLL_USES_WEVENT

struct apr_pollset_t
{
    apr_pool_t *pool;
    apr_interval_time_t timeout;
    apr_uint32_t nelts;
    apr_uint32_t nalloc;
    HANDLE       *hevents;
    SOCKET       *sockets;
    int maxfd;
    apr_pollfd_t *query_set;
    apr_pollfd_t *result_set;
    LPCRITICAL_SECTION mutex;
};

static apr_status_t pollset_cleanup(void *pollset)
{
    apr_uint32_t i;
    apr_pollset_t *ps = pollset;
    if (ps->mutex)
        DeleteCriticalSection(ps->mutex);
    ps->mutex = NULL;
    for (i = 0; i < ps->nelts; i++) {
        if (ps->hevents[i])
            WSACloseEvent(ps->hevents[i]);
    }
    return APR_SUCCESS;
}

APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
                                             apr_uint32_t size,
                                             apr_pool_t *p,
                                             apr_uint32_t flags)
{
    *pollset = apr_palloc(p, sizeof(**pollset));
    (*pollset)->nelts = 0;
    (*pollset)->nalloc = size;
    (*pollset)->pool = p;
    (*pollset)->maxfd = 0;
    (*pollset)->timeout = 0;
    (*pollset)->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
    (*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
    (*pollset)->hevents = apr_palloc(p, size * sizeof(HANDLE));
    (*pollset)->sockets = apr_palloc(p, size * sizeof(SOCKET));
    if (flags & APR_POLLSET_THREADSAFE) {
        (*pollset)->mutex = apr_palloc(p, sizeof(CRITICAL_SECTION));
        InitializeCriticalSection((*pollset)->mutex);
    }
    else
        (*pollset)->mutex = NULL;
    apr_pool_cleanup_register((*pollset)->pool, (void *)(*pollset),
                              pollset_cleanup, apr_pool_cleanup_null);

    return APR_SUCCESS;
}

APR_DECLARE(apr_status_t) apr_pollset_destroy(apr_pollset_t * pollset)
{
    apr_pool_cleanup_kill(pollset->pool, pollset, pollset_cleanup);
    return pollset_cleanup(pollset);
}

APR_DECLARE(apr_status_t) apr_pollset_add(apr_pollset_t *pollset,
                                          const apr_pollfd_t *descriptor)
{
    apr_os_sock_t fd;
    DWORD reqevents = FD_CLOSE;
    if (pollset->nelts == pollset->nalloc) {
        return APR_ENOMEM;
    }
    if (descriptor->desc_type == APR_POLL_SOCKET)
        fd = descriptor->desc.s->socketdes;
    else
        return APR_EBADF;

    pollset->query_set[pollset->nelts] = *descriptor;
    pollset->sockets[pollset->nelts] = (SOCKET)fd;
    if (descriptor->reqevents & APR_POLLIN)
        reqevents |= (FD_READ | FD_ACCEPT | FD_CONNECT);
    if (descriptor->reqevents & APR_POLLOUT)
        reqevents |= FD_WRITE;
    if (descriptor->reqevents & APR_POLLPRI)
        reqevents |= FD_QOS;
    if ((pollset->hevents[pollset->nelts] = WSACreateEvent()) == 
WSA_INVALID_EVENT) {
        pollset->hevents[pollset->nelts] = NULL;
        return apr_get_netos_error();
    }

    if (WSAEventSelect(fd, pollset->hevents[pollset->nelts],
                       reqevents) == SOCKET_ERROR) {
        apr_status_t err = apr_get_netos_error();
        WSACloseEvent(pollset->hevents[pollset->nelts]);
        printf("WSAEventSelect failed ");
        return err;
    }
    if (pollset->mutex)
        EnterCriticalSection(pollset->mutex);
    if ((int) fd > pollset->maxfd) {
        pollset->maxfd = (int) fd;
    }
    pollset->nelts++;
    if (pollset->mutex)
        LeaveCriticalSection(pollset->mutex);
    return APR_SUCCESS;
}

APR_DECLARE(apr_status_t) apr_pollset_remove(apr_pollset_t * pollset,
                                             const apr_pollfd_t * descriptor)
{
    apr_uint32_t i;
    apr_os_sock_t fd;
    apr_status_t rv = APR_NOTFOUND;

    if (descriptor->desc_type == APR_POLL_SOCKET)
        fd = descriptor->desc.s->socketdes;
    else
        return APR_EBADF;
    if (pollset->mutex)
        EnterCriticalSection(pollset->mutex);
    for (i = 0; i < pollset->nelts; i++) {
        if (descriptor->desc.s == pollset->query_set[i].desc.s) {
            /* Found an instance of the fd: remove this and any other copies */
            apr_uint32_t dst = i;
            apr_uint32_t old_nelts = pollset->nelts;
            pollset->nelts--;
            /* Close the socket event */
            if (pollset->hevents[i])
                WSACloseEvent(pollset->hevents[i]);
            pollset->hevents[i] = NULL;
            for (i++; i < old_nelts; i++) {
                if (descriptor->desc.s == pollset->query_set[i].desc.s) {
                    /* Close duplicate events */
                    if (pollset->hevents[i])
                        WSACloseEvent(pollset->hevents[i]);
                    pollset->hevents[i] = NULL;
                    pollset->nelts--;
                }
                else {
                    pollset->query_set[dst] = pollset->query_set[i];
                    dst++;
                }
            }
            if (((int) fd == pollset->maxfd) && (pollset->maxfd > 0)) {
                pollset->maxfd--;
            }
            rv = APR_SUCCESS;
            break;
        }
    }
    if (pollset->mutex)
        LeaveCriticalSection(pollset->mutex);

    return rv;
}

APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
                                           apr_interval_time_t timeout,
                                           apr_int32_t *num,
                                           const apr_pollfd_t **descriptors)
{
    apr_uint32_t i, j = 0;
    apr_time_t stop_time;
    DWORD dw = WAIT_TIMEOUT;
    DWORD sleep_time = 0;
    if (timeout < 0)
        timeout = 0;
    stop_time = apr_time_now() + timeout;
    do {
        j = 0;
        if (sleep_time) {
            Sleep(sleep_time);
            /* Progresively increase sleep timeout
             * by 10%.
             */
            if (sleep_time < 1000)
                sleep_time += 10;
        }
        else
            sleep_time = 10;
        if (pollset->mutex)
            EnterCriticalSection(pollset->mutex);
        for (i = 0; i < pollset->nelts; i++) {
            WSANETWORKEVENTS ne;
            apr_int16_t rtnevents = 0;
            /* remove might invalidate the handle */
            if (pollset->hevents[i] == NULL ||
                (WSAEnumNetworkEvents(pollset->sockets[i],
                                     pollset->hevents[i],
                                     &ne) == SOCKET_ERROR)) {
                rtnevents = APR_POLLERR;
                ne.lNetworkEvents = 0;
            }
            if (ne.lNetworkEvents & FD_ACCEPT)
                rtnevents |= APR_POLLIN;
            if (ne.lNetworkEvents & FD_READ)
                rtnevents |= APR_POLLIN;
            if (ne.lNetworkEvents & FD_WRITE)
                rtnevents |= APR_POLLOUT;
            if (ne.lNetworkEvents & FD_QOS)
                rtnevents |= APR_POLLPRI;
            if (ne.lNetworkEvents & FD_CLOSE)
                rtnevents |= APR_POLLHUP;
            if (rtnevents) {
                pollset->result_set[j] = pollset->query_set[i];
                pollset->result_set[j].rtnevents = rtnevents;
                j++;
            }
        }
        if (pollset->mutex)
            LeaveCriticalSection(pollset->mutex);
        if (j > 0)
            break;
    } while ((apr_time_now() < stop_time));
    if (descriptors)
        *descriptors = pollset->result_set;
    (*num) = j;

    return APR_SUCCESS;
}

#endif /* POLL_USES_WEVENT */

Reply via email to