On Wed, Nov 16, 2011 at 1:20 AM, Rüdiger Plüm <[email protected]> wrote: > > > -------- Original-Nachricht -------- > Betreff: svn commit: r1202257 - in /httpd/httpd/trunk/server/mpm/event: > config3.m4 equeue.c equeue.h event.c > Datum: Tue, 15 Nov 2011 15:51:04 GMT > Von: [email protected] > > Author: pquerna > Date: Tue Nov 15 15:51:03 2011 > New Revision: 1202257 > > URL: http://svn.apache.org/viewvc?rev=1202257&view=rev > Log: > Create a new lock free circular queue, and use it in the EventMPM to remove > the timeout mutex > that was wrapping both timeout queue operations and pollset operations. > > Added: > httpd/httpd/trunk/server/mpm/event/equeue.c (with props) > httpd/httpd/trunk/server/mpm/event/equeue.h (with props) > Modified: > httpd/httpd/trunk/server/mpm/event/config3.m4 > httpd/httpd/trunk/server/mpm/ > event/event.c > > Modified: httpd/httpd/trunk/server/mpm/event/config3.m4 > URL: > http://svn.apache.org/viewvc/httpd/httpd/trunk/server/mpm/event/equeue.c?rev=1202257&view=auto > ============================================================================== > --- httpd/httpd/trunk/server/mpm/event/equeue.c (added) > +++ httpd/httpd/trunk/server/mpm/event/equeue.c Tue Nov 15 15:51:03 2011 > @@ -0,0 +1,125 @@ > +/* Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +#include "equeue.h" > + > +#include <apr_atomic.h> > +#include <sched.h> > + > +struct ap_equeue_t { > + apr_uint32_t nelem; > + apr_size_t elem_size; > + uint8_t *bytes; > + volatile apr_uint32_t writeCount; > + volatile apr_uint32_t readCount; > +}; > + > + > +static APR_INLINE apr_uint32_t count_to_index(ap_equeue_t *eq, apr_uint32_t > count) > +{ > + return (count & (eq->nelem - 1)); > +} > + > +static APR_INLINE void* index_to_bytes(ap_equeue_t *eq, apr_uint32_t idx) > +{ > + apr_size_t offset = idx * eq->elem_size; > + return (void*)&eq->bytes[offset]; > +} > + > +static APR_INLINE apr_uint32_t nearest_power(apr_uint32_t num) > +{ > + apr_uint32_t n = 1; > + while (n < num) { > + n <<= 1; > + } > + > + return n; > +} > + > +#if 0 > +static void dump_queue(ap_equeue_t *eq) > +{ > + apr_uint32_t i; > + > + fprintf(stderr, "dumping %p\n", eq); > + fprintf(stderr, " nelem: %u\n", eq->nelem); > + fprintf(stderr, " esize: %"APR_SIZE_T_FMT"\n", eq->elem_size); > + fprintf(stderr, " wcnt: %u\n", eq->writeCount); > + fprintf(stderr, " rcnt: %u\n", eq->writeCount); > + fprintf(stderr, " bytes: %p\n", eq->bytes); > + for (i = 0; i < eq->nelem; i++) { > + fprintf(stderr, " [%u] = %p\n", i, index_to_bytes(eq, i)); > + } > + > + fprintf(stderr, "\n"); > + fflush(stderr); > +} > +#endif > + > +apr_status_t > +ap_equeue_create(apr_pool_t *p, apr_uint32_t nelem, apr_size_t elem_size, > ap_equeue_t **eqout) > +{ > + ap_equeue_t *eq; > + > + *eqout = NULL; > + > + eq = apr_palloc(p, sizeof(ap_equeue_t)); > + eq->bytes = apr_palloc(p, (1 + nelem) * elem_size); > + eq->nelem = nearest_power(nelem); > + eq->elem_size = elem_size; > + eq->writeCount = 0; > + eq->readCount = 0; > + *eqout = eq; > + > + return APR_SUCCESS; > +} > + > +void * > +ap_equeue_reader_next(ap_equeue_t *eq) > +{ > + if (apr_atomic_read32(&eq->writeCount) == eq->readCount) { > + return NULL; > + } > + else { > + apr_uint32_t idx = count_to_index(eq, > apr_atomic_inc32(&eq->readCount)); > + return index_to_bytes(eq, idx); > + } > +} > + > +void * > +ap_equeue_writer_value(ap_equeue_t *eq) > +{ > + apr_uint32_t idx; > + > + while (1) { > + apr_uint32_t readCount = apr_atomic_read32(&eq->readCount); > + > + if (count_to_index(eq, eq->writeCount + 1) != count_to_index(eq, > readCount)) { > + break; > + } > + /* TODO: research if sched_yield is even worth doing */ > + sched_yield(); > + } > + > + idx = count_to_index(eq, eq->writeCount); > + return index_to_bytes(eq, idx); > +} > > I assume that only a single thread tries write operations on this queue, > correct? > Otherwise it seems unsafe if another thread calls > ap_equeue_writer_value in parallel as it would return the same slot until > ap_equeue_writer_onward was called.
Correct, it only supports a single writer and a single reader, which lets you get away with only a few atomic operations and no mutexes.
