Here is an updated worker patch.  It fixes a race condition
that the first patch didn't: it was possible for the listener
thread to push multiple connections onto the fd queue between
the cond_signal and the subsequent wakeup of the next worker.
This caused problems because the idle worker count, which the
listener used to decide when to accept more connections, didn't
get decremented until the worker woke up.  Thus the listener
could overflow the connection queue.

The fix, as implemented in this new patch, is to make the listener
block if either: 1) there are no idle workers, or 2) the queue is
full.

This patch isn't a complete fix, as there is now an error case
in which the listener thread fails to exit during shutdown.  It
needs some more testing and cleanup work.

Meanwhile, I'm going to take another look at leader/follower,
because IMHO the worker synchronization logic is getting far too
complicated.

--Brian

Index: server/mpm/worker/fdqueue.h
===================================================================
RCS file: /home/cvs/httpd-2.0/server/mpm/worker/fdqueue.h,v
retrieving revision 1.19
diff -u -r1.19 fdqueue.h
--- server/mpm/worker/fdqueue.h 28 Apr 2002 23:12:35 -0000      1.19
+++ server/mpm/worker/fdqueue.h 21 May 2002 23:57:16 -0000
@@ -71,16 +71,6 @@
 #endif
 #include <apr_errno.h>
 
-typedef struct fd_queue_info_t fd_queue_info_t;
-
-apr_status_t ap_queue_info_create(fd_queue_info_t **queue_info,
-                                  apr_pool_t *pool, int max_idlers);
-apr_status_t ap_queue_info_set_idle(fd_queue_info_t *queue_info,
-                                    apr_pool_t *pool_to_recycle);
-apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t *queue_info,
-                                          apr_pool_t **recycled_pool);
-apr_status_t ap_queue_info_term(fd_queue_info_t *queue_info);
-
 struct fd_queue_elem_t {
     apr_socket_t      *sd;
     apr_pool_t        *p;
@@ -94,13 +84,19 @@
     apr_thread_mutex_t *one_big_mutex;
     apr_thread_cond_t  *not_empty;
     int                 terminated;
+    int                 idlers;
+    apr_thread_cond_t  *idlers_available;
+    apr_pool_t        **recycled_pools;
+    int num_recycled;
 };
 typedef struct fd_queue_t fd_queue_t;
 
 apr_status_t ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a);
 apr_status_t ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p);
-apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p);
+apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p,
+                          apr_pool_t **recycled);
 apr_status_t ap_queue_interrupt_all(fd_queue_t *queue);
+apr_status_t ap_queue_wait_for_idler(fd_queue_t *queue, apr_pool_t **recycled);
 apr_status_t ap_queue_term(fd_queue_t *queue);
 
 #endif /* FDQUEUE_H */
Index: server/mpm/worker/fdqueue.c
===================================================================
RCS file: /home/cvs/httpd-2.0/server/mpm/worker/fdqueue.c,v
retrieving revision 1.22
diff -u -r1.22 fdqueue.c
--- server/mpm/worker/fdqueue.c 29 Apr 2002 01:57:39 -0000      1.22
+++ server/mpm/worker/fdqueue.c 21 May 2002 23:57:16 -0000
@@ -58,138 +58,6 @@
 
 #include "fdqueue.h"
 
-struct fd_queue_info_t {
-    int idlers;
-    apr_thread_mutex_t *idlers_mutex;
-    apr_thread_cond_t *wait_for_idler;
-    int terminated;
-    int max_idlers;
-    apr_pool_t        **recycled_pools;
-    int num_recycled;
-};
-
-static apr_status_t queue_info_cleanup(void *data_)
-{
-    fd_queue_info_t *qi = data_;
-    int i;
-    apr_thread_cond_destroy(qi->wait_for_idler);
-    apr_thread_mutex_destroy(qi->idlers_mutex);
-    for (i = 0; i < qi->num_recycled; i++) {
-        apr_pool_destroy(qi->recycled_pools[i]);
-    }
-    return APR_SUCCESS;
-}
-
-apr_status_t ap_queue_info_create(fd_queue_info_t **queue_info,
-                                  apr_pool_t *pool, int max_idlers)
-{
-    apr_status_t rv;
-    fd_queue_info_t *qi;
-
-    qi = apr_palloc(pool, sizeof(*qi));
-    memset(qi, 0, sizeof(*qi));
-
-    rv = apr_thread_mutex_create(&qi->idlers_mutex, APR_THREAD_MUTEX_DEFAULT,
-                                 pool);
-    if (rv != APR_SUCCESS) {
-        return rv;
-    }
-    rv = apr_thread_cond_create(&qi->wait_for_idler, pool);
-    if (rv != APR_SUCCESS) {
-        return rv;
-    }
-    qi->recycled_pools = (apr_pool_t **)apr_palloc(pool, max_idlers *
-                                                   sizeof(apr_pool_t *));
-    qi->num_recycled = 0;
-    qi->max_idlers = max_idlers;
-    apr_pool_cleanup_register(pool, qi, queue_info_cleanup,
-                              apr_pool_cleanup_null);
-
-    *queue_info = qi;
-
-    return APR_SUCCESS;
-}
-
-apr_status_t ap_queue_info_set_idle(fd_queue_info_t *queue_info,
-                                    apr_pool_t *pool_to_recycle)
-{
-    apr_status_t rv;
-    rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
-    if (rv != APR_SUCCESS) {
-        return rv;
-    }
-    AP_DEBUG_ASSERT(queue_info->idlers >= 0);
-    AP_DEBUG_ASSERT(queue_info->num_recycled < queue_info->max_idlers);
-    if (pool_to_recycle) {
-        queue_info->recycled_pools[queue_info->num_recycled++] =
-            pool_to_recycle;
-    }
-    if (queue_info->idlers++ == 0) {
-        /* Only signal if we had no idlers before. */
-        apr_thread_cond_signal(queue_info->wait_for_idler);
-    }
-    rv = apr_thread_mutex_unlock(queue_info->idlers_mutex);
-    if (rv != APR_SUCCESS) {
-        return rv;
-    }
-    return APR_SUCCESS;
-}
-
-apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t *queue_info,
-                                          apr_pool_t **recycled_pool)
-{
-    apr_status_t rv;
-    *recycled_pool = NULL;
-    rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
-    if (rv != APR_SUCCESS) {
-        return rv;
-    }
-    AP_DEBUG_ASSERT(queue_info->idlers >= 0);
-    while ((queue_info->idlers == 0) && (!queue_info->terminated)) {
-        rv = apr_thread_cond_wait(queue_info->wait_for_idler,
-                                  queue_info->idlers_mutex);
-        if (rv != APR_SUCCESS) {
-            apr_status_t rv2;
-            rv2 = apr_thread_mutex_unlock(queue_info->idlers_mutex);
-            if (rv2 != APR_SUCCESS) {
-                return rv2;
-            }
-            return rv;
-        }
-    }
-    queue_info->idlers--; /* Oh, and idler? Let's take 'em! */
-    if (queue_info->num_recycled) {
-        *recycled_pool =
-            queue_info->recycled_pools[--queue_info->num_recycled];
-    }
-    rv = apr_thread_mutex_unlock(queue_info->idlers_mutex);
-    if (rv != APR_SUCCESS) {
-        return rv;
-    }
-    else if (queue_info->terminated) {
-        return APR_EOF;
-    }
-    else {
-        return APR_SUCCESS;
-    }
-}
-
-apr_status_t ap_queue_info_term(fd_queue_info_t *queue_info)
-{
-    apr_status_t rv;
-    rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
-    if (rv != APR_SUCCESS) {
-        return rv;
-    }
-    queue_info->terminated = 1;
-    apr_thread_cond_broadcast(queue_info->wait_for_idler);
-    rv = apr_thread_mutex_unlock(queue_info->idlers_mutex);
-    if (rv != APR_SUCCESS) {
-        return rv;
-    }
-    return APR_SUCCESS;
-}
-
 /**
  * Detects when the fd_queue_t is full. This utility function is expected
  * to be called from within critical sections, and is not threadsafe.
@@ -209,13 +77,17 @@
 static apr_status_t ap_queue_destroy(void *data) 
 {
     fd_queue_t *queue = data;
+    int i;
 
     /* Ignore errors here, we can't do anything about them anyway.
      * XXX: We should at least try to signal an error here, it is
      * indicative of a programmer error. -aaron */
     apr_thread_cond_destroy(queue->not_empty);
+    apr_thread_cond_destroy(queue->idlers_available);
     apr_thread_mutex_destroy(queue->one_big_mutex);
-
+    for (i = 0; i < queue->num_recycled; i++) {
+        apr_pool_destroy(queue->recycled_pools[i]);
+    }
     return APR_SUCCESS;
 }
 
@@ -234,15 +106,25 @@
     if ((rv = apr_thread_cond_create(&queue->not_empty, a)) != APR_SUCCESS) {
         return rv;
     }
+    if ((rv = apr_thread_cond_create(&queue->idlers_available, a)) !=
+        APR_SUCCESS) {
+        return rv;
+    }
 
     queue->data = apr_palloc(a, queue_capacity * sizeof(fd_queue_elem_t));
     queue->bounds = queue_capacity;
     queue->nelts = 0;
+    queue->terminated = 0;
+    queue->idlers = 0;
 
     /* Set all the sockets in the queue to NULL */
     for (i = 0; i < queue_capacity; ++i)
         queue->data[i].sd = NULL;
 
+    queue->recycled_pools = (apr_pool_t **)apr_palloc(a, queue->bounds *
+                                                      sizeof(apr_pool_t *));
+    queue->num_recycled = 0;
+
     apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
 
     return APR_SUCCESS;
@@ -285,7 +167,8 @@
  * Once retrieved, the socket is placed into the address specified by
  * 'sd'.
  */
-apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p)
+apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p,
+                          apr_pool_t **recycled)
 {
     fd_queue_elem_t *elem;
     apr_status_t rv;
@@ -294,6 +177,21 @@
         return rv;
     }
 
+    if (recycled && *recycled) {
+        if (queue->num_recycled == queue->bounds) {
+            apr_pool_destroy(*recycled);
+        }
+        else {
+            queue->recycled_pools[queue->num_recycled++] = *recycled;
+        }
+        *recycled = NULL;
+    }
+
+    queue->idlers++;
+    if (queue->idlers == 1) {
+        apr_thread_cond_signal(queue->idlers_available);
+    }
+
     /* Keep waiting until we wake up and find that the queue is not empty. */
     if (ap_queue_empty(queue)) {
         if (!queue->terminated) {
@@ -301,6 +199,7 @@
         }
         /* If we wake up and it's still empty, then we were interrupted */
         if (ap_queue_empty(queue)) {
+            queue->idlers--;
             rv = apr_thread_mutex_unlock(queue->one_big_mutex);
             if (rv != APR_SUCCESS) {
                 return rv;
@@ -312,7 +211,10 @@
                 return APR_EINTR;
             }
         }
-    } 
+    }
+    else if (queue->nelts == queue->bounds) {
+        apr_thread_cond_signal(queue->idlers_available);
+    }
 
     elem = &queue->data[--queue->nelts];
     *sd = elem->sd;
@@ -322,6 +224,7 @@
     elem->p = NULL;
 #endif /* AP_DEBUG */
 
+    queue->idlers--;
     rv = apr_thread_mutex_unlock(queue->one_big_mutex);
     return rv;
 }
@@ -338,6 +241,28 @@
         return rv;
     }
     return APR_SUCCESS;
+}
+
+apr_status_t ap_queue_wait_for_idler(fd_queue_t *queue,
+                                     apr_pool_t **recycled_pool)
+{
+    apr_status_t rv;
+    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
+        return rv;
+    }
+    while (!queue->terminated && (queue->idlers == 0 ||
+                                  queue->nelts == queue->bounds)) {
+        apr_thread_cond_wait(queue->idlers_available, queue->one_big_mutex);
+    }
+    if (recycled_pool && *recycled_pool) {
+        if (queue->num_recycled > 0) {
+            *recycled_pool = queue->recycled_pools[--queue->num_recycled];
+        }
+        else {
+            *recycled_pool = NULL;
+        }
+    }
+    return apr_thread_mutex_unlock(queue->one_big_mutex);
 }
 
 apr_status_t ap_queue_term(fd_queue_t *queue)
Index: server/mpm/worker/worker.c
===================================================================
RCS file: /home/cvs/httpd-2.0/server/mpm/worker/worker.c,v
retrieving revision 1.124
diff -u -r1.124 worker.c
--- server/mpm/worker/worker.c  17 May 2002 11:11:39 -0000      1.124
+++ server/mpm/worker/worker.c  21 May 2002 23:57:17 -0000
@@ -173,7 +173,6 @@
 static int num_listensocks = 0;
 static int resource_shortage = 0;
 static fd_queue_t *worker_queue;
-static fd_queue_info_t *worker_queue_info;
 
 /* The structure used to pass unique initialization info to each thread */
 typedef struct {
@@ -315,7 +314,6 @@
     if (mode == ST_UNGRACEFUL) {
         workers_may_exit = 1;
         ap_queue_interrupt_all(worker_queue);
-        ap_queue_info_term(worker_queue_info);
         close_worker_sockets(); /* forcefully kill all current connections */
     }
 }
@@ -711,8 +709,7 @@
         }
         if (listener_may_exit) break;
 
-        rv = ap_queue_info_wait_for_idler(worker_queue_info,
-                                          &recycled_pool);
+        rv = ap_queue_wait_for_idler(worker_queue, &recycled_pool);
         if (APR_STATUS_IS_EOF(rv)) {
             break; /* we've been signaled to die now */
         }
@@ -796,6 +793,7 @@
             }
             else {
                 ptrans = recycled_pool;
+                recycled_pool = NULL;
             }
             apr_pool_tag(ptrans, "transaction");
             rv = lr->accept_func(&csd, lr, ptrans);
@@ -883,22 +881,13 @@
     bucket_alloc = apr_bucket_alloc_create(apr_thread_pool_get(thd));
 
     while (!workers_may_exit) {
-        rv = ap_queue_info_set_idle(worker_queue_info, last_ptrans);
         last_ptrans = NULL;
-        if (rv != APR_SUCCESS) {
-            ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
-                         "ap_queue_info_set_idle failed. Attempting to "
-                         "shutdown process gracefully.");
-            signal_threads(ST_GRACEFUL);
-            break;
-        }
-
         ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_READY, 
NULL);
 worker_pop:
         if (workers_may_exit) {
             break;
         }
-        rv = ap_queue_pop(worker_queue, &csd, &ptrans);
+        rv = ap_queue_pop(worker_queue, &csd, &ptrans, &last_ptrans);
 
         if (rv != APR_SUCCESS) {
             /* We get APR_EOF during a graceful shutdown once all the connections
@@ -1010,14 +999,6 @@
     if (rv != APR_SUCCESS) {
         ap_log_error(APLOG_MARK, APLOG_ALERT, rv, ap_server_conf,
                      "ap_queue_init() failed");
-        clean_child_exit(APEXIT_CHILDFATAL);
-    }
-
-    rv = ap_queue_info_create(&worker_queue_info, pchild,
-                              ap_threads_per_child);
-    if (rv != APR_SUCCESS) {
-        ap_log_error(APLOG_MARK, APLOG_ALERT, rv, ap_server_conf,
-                     "ap_queue_info_create() failed");
         clean_child_exit(APEXIT_CHILDFATAL);
     }
 

Reply via email to