Based on the "slow Apache 2.0" thread earlier today,
and my observation therein that it's possible for a
worker child process to block on a full file descriptor
queue (all threads busy) while other child procs have
idle threads, I decided to revive the idea of switching
the worker thread management to a leader/followers
pattern.
The way it works is:
  * There's no dedicated listener thread.  The workers
    take turns serving as the listener.
  * Idle threads are listed in a stack.  Each thread has
    a condition variable.  When the current listener
    accepts a connection, it pops the next idle thread
    from the stack and wakes it up using the condition
    variable.  The newly awakened thread becomes the
    new listener.
  * If there is no idle thread available to become
    the new listener, the next thread to finish handling
    its current connection takes over as listener.
    (Thus a process that's already saturated with
    connections won't call accept() until it actually
    has an idle thread available.)

In order to implement the patch quickly, I've used a
mutex to guard the stack for now, rather than using
atomic compare-and-swap operations like I'd once
proposed.  In order to improve scalability, though,
this mutex is *not* used for the condition variable
signaling.  Instead, each worker thread has a private
mutex for use with its condition variable.  This
thread-private mutex is locked at thread creation,
and the only subsequent operations on it are those
done implicitly by the cond_signal/cond_wait.  Thus
only the thread associated with that mutex ever locks
or unlocks it, which should help to reduce synchronization
overhead.  (The design is dependent on the semantics
of the one-listener-at-a-time model to synchronize
the cond_signal with the cond_wait.)

Can I get a few volunteers to test/review this?

Thanks,
--Brian

Index: server/mpm/worker/worker.c
===================================================================
RCS file: /home/cvs/httpd-2.0/server/mpm/worker/worker.c,v
retrieving revision 1.114
diff -u -r1.114 worker.c
--- server/mpm/worker/worker.c  8 Apr 2002 16:57:06 -0000       1.114
+++ server/mpm/worker/worker.c  10 Apr 2002 05:22:03 -0000
@@ -69,6 +69,7 @@
 #include "apr_file_io.h"
 #include "apr_thread_proc.h"
 #include "apr_signal.h"
+#include "apr_thread_cond.h"
 #include "apr_thread_mutex.h"
 #include "apr_proc_mutex.h"
 #define APR_WANT_STRFUNC
@@ -105,7 +106,6 @@
 #include "mpm_common.h"
 #include "ap_listen.h"
 #include "scoreboard.h" 
-#include "fdqueue.h"
 #include "mpm_default.h"
 
 #include <signal.h>
@@ -168,11 +168,9 @@
 static int dying = 0;
 static int workers_may_exit = 0;
 static int start_thread_may_exit = 0;
-static int listener_may_exit = 0;
 static int requests_this_child;
 static int num_listensocks = 0;
 static int resource_shortage = 0;
-static fd_queue_t *worker_queue;
 
 /* The structure used to pass unique initialization info to each thread */
 typedef struct {
@@ -181,12 +179,12 @@
     int sd;
 } proc_info;
 
+
 /* Structure used to pass information to the thread responsible for 
  * creating the rest of the threads.
  */
 typedef struct {
     apr_thread_t **threads;
-    apr_thread_t *listener;
     int child_num_arg;
     apr_threadattr_t *threadattr;
 } thread_starter;
@@ -232,7 +230,6 @@
 static pid_t ap_my_pid; /* Linux getpid() doesn't work except in main 
                            thread. Use this instead */
 static pid_t parent_pid;
-static apr_os_thread_t *listener_os_thread;
 
 /* Locks for accept serialization */
 static apr_proc_mutex_t *accept_mutex;
@@ -243,37 +240,103 @@
 #define SAFE_ACCEPT(stmt) (stmt)
 #endif
 
-/* The LISTENER_SIGNAL signal will be sent from the main thread to the 
- * listener thread to wake it up for graceful termination (what a child 
- * process from an old generation does when the admin does "apachectl 
- * graceful").  This signal will be blocked in all threads of a child
- * process except for the listener thread.
+
+/* Structure used to wake up an idle worker thread
+ */
+typedef struct {
+    apr_thread_cond_t *cond;
+    apr_thread_mutex_t *mutex;
+} worker_wakeup_info;
+
+/* Structure used to hold a stack of idle worker threads 
  */
-#define LISTENER_SIGNAL     SIGHUP
+typedef struct {
+    apr_thread_mutex_t *mutex;
+    int no_listener;
+    worker_wakeup_info **stack;
+    apr_size_t nelts;
+    apr_size_t nalloc;
+} worker_stack;
+
+static worker_stack* worker_stack_create(apr_pool_t *pool, apr_size_t max)
+{
+    apr_status_t rv;
+    worker_stack *stack = (worker_stack *)apr_palloc(pool, sizeof(*stack));
+
+    if ((rv = apr_thread_mutex_create(&stack->mutex, APR_THREAD_MUTEX_DEFAULT,
+                                      pool)) != APR_SUCCESS) {
+        return NULL;
+    }
+    stack->no_listener = 1;
+    stack->nelts = 0;
+    stack->nalloc = max;
+    stack->stack =
+        (worker_wakeup_info **)apr_palloc(pool, stack->nalloc *
+                                          sizeof(worker_wakeup_info *));
+    return stack;
+}
 
-static void wakeup_listener(void)
+static apr_status_t worker_stack_wait(worker_stack *stack,
+                                      worker_wakeup_info *wakeup)
 {
-    listener_may_exit = 1;
-    if (!listener_os_thread) {
-        /* XXX there is an obscure path that this doesn't handle perfectly:
-         *     right after listener thread is created but before 
-         *     listener_os_thread is set, the first worker thread hits an
-         *     error and starts graceful termination
+    apr_status_t rv;
+    if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) {
+        return rv;
+    }
+    if (stack->no_listener) {
+        /* this thread should become the new listener immediately */
+        stack->no_listener = 0;
+        if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
+            return rv;
+        }
+        return APR_SUCCESS;
+    }
+    else {
+        /* push this thread onto the stack of idle workers, and block
+         * on the condition variable until awoken
          */
-        return;
+        if (stack->nelts == stack->nalloc) {
+            return APR_ENOSPC;
+        }
+        stack->stack[stack->nelts++] = wakeup;
+        if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
+            return rv;
+        }
+        if ((rv = apr_thread_cond_wait(wakeup->cond, wakeup->mutex)) !=
+            APR_SUCCESS) {
+            return rv;
+        }
+        return APR_SUCCESS;
     }
-    /*
-     * we should just be able to "kill(ap_my_pid, LISTENER_SIGNAL)" on all
-     * platforms and wake up the listener thread since it is the only thread 
-     * with SIGHUP unblocked, but that doesn't work on Linux
-     */
-#ifdef HAVE_PTHREAD_KILL
-    pthread_kill(*listener_os_thread, LISTENER_SIGNAL);
-#else
-    kill(ap_my_pid, LISTENER_SIGNAL);
-#endif
 }
 
+static apr_status_t worker_stack_awaken_next(worker_stack *stack)
+{
+    apr_status_t rv;
+    if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) {
+        return rv;
+    }
+    if (stack->nelts) {
+        worker_wakeup_info *wakeup = stack->stack[--stack->nelts];
+        if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
+            return rv;
+        }
+        if ((rv = apr_thread_cond_signal(wakeup->cond)) != APR_SUCCESS) {
+            apr_thread_mutex_unlock(stack->mutex);
+            return rv;
+        }
+    }
+    else {
+        stack->no_listener = 1;
+        if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
+            return rv;
+        }
+    }
+    return APR_SUCCESS;
+}
+
+static worker_stack *idle_worker_stack;
+
 #define ST_INIT              0
 #define ST_GRACEFUL          1
 #define ST_UNGRACEFUL        2
@@ -282,23 +345,15 @@
 
 static void signal_threads(int mode)
 {
+    int i;
     if (terminate_mode == mode) {
         return;
     }
     terminate_mode = mode;
 
-    /* in case we weren't called from the listener thread, wake up the
-     * listener thread
-     */
-    wakeup_listener();
-
-    /* for ungraceful termination, let the workers exit now;
-     * for graceful termination, the listener thread will notify the
-     * workers to exit once it has stopped accepting new connections
-     */
-    if (mode == ST_UNGRACEFUL) {
-        workers_may_exit = 1;
-        ap_queue_interrupt_all(worker_queue);
+    workers_may_exit = 1;
+    for (i = 0; i < ap_threads_per_child; i++) {
+        (void)worker_stack_awaken_next(idle_worker_stack);
     }
 }
 
@@ -576,10 +631,7 @@
      * maybe it should be ap_mpm_process_exiting?
      */
 {
-    /* note: for a graceful termination, listener_may_exit will be set before
-     *       workers_may_exit, so check listener_may_exit
-     */
-    return listener_may_exit;
+    return workers_may_exit;
 }
 
 /*****************************************************************
@@ -659,45 +711,83 @@
      */
 }
 
-static void *listener_thread(apr_thread_t *thd, void * dummy)
+static void *worker_thread(apr_thread_t *thd, void * dummy)
 {
     proc_info * ti = dummy;
     int process_slot = ti->pid;
+    int thread_slot = ti->tid;
     apr_pool_t *tpool = apr_thread_pool_get(thd);
     void *csd = NULL;
+    apr_allocator_t *allocator;
     apr_pool_t *ptrans;                /* Pool for per-transaction stuff */
-    apr_pool_t *recycled_pool = NULL;
+    apr_bucket_alloc_t *bucket_alloc;
     int n;
     apr_pollfd_t *pollset;
     apr_status_t rv;
     ap_listen_rec *lr, *last_lr = ap_listeners;
+    worker_wakeup_info *wakeup;
+    int is_listener;
+
+    ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_STARTING, 
+NULL);
 
     free(ti);
 
+    apr_allocator_create(&allocator);
+    apr_pool_create_ex(&ptrans, NULL, NULL, allocator);
+    apr_allocator_set_owner(allocator, ptrans);
+    bucket_alloc = apr_bucket_alloc_create(tpool);
+
+    wakeup = (worker_wakeup_info *)apr_palloc(tpool, sizeof(*wakeup));
+    if ((rv = apr_thread_cond_create(&wakeup->cond, tpool)) != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                     "apr_thread_cond_create failed. Attempting to shutdown "
+                     "process gracefully.");
+        signal_threads(ST_GRACEFUL);
+        goto done;
+    }
+    if ((rv = apr_thread_mutex_create(&wakeup->mutex, APR_THREAD_MUTEX_DEFAULT,
+                                      tpool)) != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                     "apr_thread_mutex_create failed. Attempting to shutdown "
+                     "process gracefully.");
+        signal_threads(ST_GRACEFUL);
+        goto done;
+    }
+    apr_thread_mutex_lock(wakeup->mutex);
+
     apr_poll_setup(&pollset, num_listensocks, tpool);
     for(lr = ap_listeners ; lr != NULL ; lr = lr->next)
         apr_poll_socket_add(pollset, lr->sd, APR_POLLIN);
 
-    /* Unblock the signal used to wake this thread up, and set a handler for
-     * it.
-     */
-    unblock_signal(LISTENER_SIGNAL);
-    apr_signal(LISTENER_SIGNAL, dummy_signal_handler);
-
     /* TODO: Switch to a system where threads reuse the results from earlier
        poll calls - manoj */
-    while (1) {
+    is_listener = 0;
+    while (!workers_may_exit) {
+
+        ap_update_child_status_from_indexes(process_slot, thread_slot,
+                                            SERVER_READY, NULL);
+        if (!is_listener) {
+            /* Wait until it's our turn to become the listener */
+            if ((rv = worker_stack_wait(idle_worker_stack, wakeup)) !=
+                APR_SUCCESS) {
+                ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                             "worker_stack_wait failed. Shutting down");
+                break;
+            }
+            is_listener = 1;
+        }
+
         /* TODO: requests_this_child should be synchronized - aaron */
         if (requests_this_child <= 0) {
             check_infinite_requests();
         }
-        if (listener_may_exit) break;
+        if (workers_may_exit) break;
 
         if ((rv = SAFE_ACCEPT(apr_proc_mutex_lock(accept_mutex)))
             != APR_SUCCESS) {
             int level = APLOG_EMERG;
 
-            if (listener_may_exit) {
+            if (workers_may_exit) {
                 break;
             }
             if (ap_scoreboard_image->parent[process_slot].generation != 
@@ -716,7 +806,7 @@
             lr = ap_listeners;
         }
         else {
-            while (!listener_may_exit) {
+            while (!workers_may_exit) {
                 apr_status_t ret;
                 apr_int16_t event;
 
@@ -733,7 +823,7 @@
                     signal_threads(ST_GRACEFUL);
                 }
 
-                if (listener_may_exit) break;
+                if (workers_may_exit) break;
 
                 /* find a listener */
                 lr = last_lr;
@@ -752,19 +842,7 @@
             }
         }
     got_fd:
-        if (!listener_may_exit) {
-            /* create a new transaction pool for each accepted socket */
-            if (recycled_pool == NULL) {
-                apr_allocator_t *allocator;
-
-                apr_allocator_create(&allocator);
-                apr_pool_create_ex(&ptrans, NULL, NULL, allocator);
-                apr_allocator_set_owner(allocator, ptrans);
-            }
-            else {
-                ptrans = recycled_pool;
-            }
-            apr_pool_tag(ptrans, "transaction");
+        if (!workers_may_exit) {
             rv = lr->accept_func(&csd, lr, ptrans);
 
             /* If we were interrupted for whatever reason, just start
@@ -782,7 +860,7 @@
                 != APR_SUCCESS) {
                 int level = APLOG_EMERG;
 
-                if (listener_may_exit) {
+                if (workers_may_exit) {
                     break;
                 }
                 if (ap_scoreboard_image->parent[process_slot].generation != 
@@ -795,16 +873,13 @@
                 signal_threads(ST_GRACEFUL);
             }
             if (csd != NULL) {
-                rv = ap_queue_push(worker_queue, csd, ptrans,
-                                   &recycled_pool);
-                if (rv) {
-                    /* trash the connection; we couldn't queue the connected
-                     * socket to a worker 
-                     */
-                    apr_socket_close(csd);
-                    ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf,
-                                 "ap_queue_push failed");
-                }
+                is_listener = 0;
+                worker_stack_awaken_next(idle_worker_stack);
+                process_socket(ptrans, csd, process_slot,
+                               thread_slot, bucket_alloc);
+                apr_pool_clear(ptrans);
+                requests_this_child--;
+                apr_socket_close(csd);  /* Debug only */
             }
         }
         else {
@@ -819,75 +894,10 @@
         }
     }
 
-    ap_queue_term(worker_queue);
+ done:
     dying = 1;
     ap_scoreboard_image->parent[process_slot].quiescing = 1;
-
-    /* wake up the main thread */
-    kill(ap_my_pid, SIGTERM);
-
-    apr_thread_exit(thd, APR_SUCCESS);
-    return NULL;
-}
-
-/* XXX For ungraceful termination/restart, we definitely don't want to
- *     wait for active connections to finish but we may want to wait
- *     for idle workers to get out of the queue code and release mutexes,
- *     since those mutexes are cleaned up pretty soon and some systems
- *     may not react favorably (i.e., segfault) if operations are attempted
- *     on cleaned-up mutexes.
- */
-static void * APR_THREAD_FUNC worker_thread(apr_thread_t *thd, void * dummy)
-{
-    proc_info * ti = dummy;
-    int process_slot = ti->pid;
-    int thread_slot = ti->tid;
-    apr_socket_t *csd = NULL;
-    apr_bucket_alloc_t *bucket_alloc;
-    apr_pool_t *last_ptrans = NULL;
-    apr_pool_t *ptrans;                /* Pool for per-transaction stuff */
-    apr_status_t rv;
-
-    free(ti);
-
-    ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_STARTING, 
NULL);
-
-    bucket_alloc = apr_bucket_alloc_create(apr_thread_pool_get(thd));
-
-    while (!workers_may_exit) {
-        ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_READY, 
NULL);
-        rv = ap_queue_pop(worker_queue, &csd, &ptrans, last_ptrans);
-        last_ptrans = NULL;
-
-        if (rv != APR_SUCCESS) {
-            /* We get APR_EOF during a graceful shutdown once all the connections
-             * accepted by this server process have been handled.
-             */
-            if (rv == APR_EOF) {
-                break;
-            }
-            /* We get APR_EINTR whenever ap_queue_pop() has been interrupted
-             * from an explicit call to ap_queue_interrupt_all(). This allows
-             * us to unblock threads stuck in ap_queue_pop() when a shutdown
-             * is pending.
-             *
-             * If workers_may_exit is set and this is ungraceful termination/
-             * restart, we are bound to get an error on some systems (e.g.,
-             * AIX, which sanity-checks mutex operations) since the queue
-             * may have already been cleaned up.  Don't log the "error" if
-             * workers_may_exit is set.
-             */
-            if (rv != APR_EINTR && !workers_may_exit) {
-                ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf,
-                             "ap_queue_pop failed");
-            }
-            continue;
-        }
-        process_socket(ptrans, csd, process_slot, thread_slot, bucket_alloc);
-        requests_this_child--; /* FIXME: should be synchronized - aaron */
-        apr_pool_clear(ptrans);
-        last_ptrans = ptrans;
-    }
+    worker_stack_awaken_next(idle_worker_stack);
 
     ap_update_child_status_from_indexes(process_slot, thread_slot,
         (dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *) NULL);
@@ -908,34 +918,6 @@
     return 0;
 }
 
-static void create_listener_thread(thread_starter *ts)
-{
-    int my_child_num = ts->child_num_arg;
-    apr_threadattr_t *thread_attr = ts->threadattr;
-    proc_info *my_info;
-    apr_status_t rv;
-
-    my_info = (proc_info *)malloc(sizeof(proc_info));
-    my_info->pid = my_child_num;
-    my_info->tid = -1; /* listener thread doesn't have a thread slot */
-    my_info->sd = 0;
-    rv = apr_thread_create(&ts->listener, thread_attr, listener_thread,
-                           my_info, pchild);
-    if (rv != APR_SUCCESS) {
-        ap_log_error(APLOG_MARK, APLOG_ALERT, rv, ap_server_conf,
-                     "apr_thread_create: unable to create listener thread");
-        /* In case system resources are maxxed out, we don't want
-         * Apache running away with the CPU trying to fork over and
-         * over and over again if we exit.
-         * XXX Jeff doesn't see how Apache is going to try to fork again since
-         * the exit code is APEXIT_CHILDFATAL
-         */
-        apr_sleep(10 * APR_USEC_PER_SEC);
-        clean_child_exit(APEXIT_CHILDFATAL);
-    }
-    apr_os_thread_get(&listener_os_thread, ts->listener);
-}
-
 /* XXX under some circumstances not understood, children can get stuck
  *     in start_threads forever trying to take over slots which will
  *     never be cleaned up; for now there is an APLOG_DEBUG message issued
@@ -955,19 +937,15 @@
     int loops;
     int prev_threads_created;
 
-    /* We must create the fd queues before we start up the listener
-     * and worker threads. */
-    worker_queue = apr_pcalloc(pchild, sizeof(*worker_queue));
-    rv = ap_queue_init(worker_queue, ap_threads_per_child, pchild);
-    if (rv != APR_SUCCESS) {
+    idle_worker_stack = worker_stack_create(pchild, ap_threads_per_child);
+    if (idle_worker_stack == NULL) {
         ap_log_error(APLOG_MARK, APLOG_ALERT, rv, ap_server_conf,
-                     "ap_queue_init() failed");
+                     "worker_stack_create() failed");
         clean_child_exit(APEXIT_CHILDFATAL);
     }
 
     loops = prev_threads_created = 0;
     while (1) {
-        /* ap_threads_per_child does not include the listener thread */
         for (i = 0; i < ap_threads_per_child; i++) {
             int status = ap_scoreboard_image->servers[child_num_arg][i].status;
 
@@ -1003,12 +981,6 @@
                 clean_child_exit(APEXIT_CHILDFATAL);
             }
             threads_created++;
-            if (threads_created == 1) {
-                /* now that we have a worker thread, it makes sense to create
-                 * a listener thread (we don't want a listener without a worker!)
-                 */
-                create_listener_thread(ts);
-            }
         }
         if (start_thread_may_exit || threads_created == ap_threads_per_child) {
             break;
@@ -1041,48 +1013,11 @@
     return NULL;
 }
 
-static void join_workers(apr_thread_t *listener, apr_thread_t **threads)
+static void join_workers(apr_thread_t **threads)
 {
     int i;
     apr_status_t rv, thread_rv;
 
-    if (listener) {
-        int iter;
-        
-        /* deal with a rare timing window which affects waking up the
-         * listener thread...  if the signal sent to the listener thread
-         * is delivered between the time it verifies that the
-         * listener_may_exit flag is clear and the time it enters a
-         * blocking syscall, the signal didn't do any good...  work around
-         * that by sleeping briefly and sending it again
-         */
-
-        iter = 0;
-        while (iter < 10 && 
-#ifdef HAVE_PTHREAD_KILL
-               pthread_kill(*listener_os_thread, 0)
-#else
-               kill(ap_my_pid, 0)
-#endif
-               == 0) {
-            /* listener not dead yet */
-            apr_sleep(APR_USEC_PER_SEC / 2);
-            wakeup_listener();
-            ++iter;
-        }
-        if (iter >= 10) {
-            ap_log_error(APLOG_MARK, APLOG_CRIT, 0, ap_server_conf,
-                         "the listener thread didn't exit");
-        }
-        else {
-            rv = apr_thread_join(&thread_rv, listener);
-            if (rv != APR_SUCCESS) {
-                ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf,
-                             "apr_thread_join: unable to join listener thread");
-            }
-        }
-    }
-    
     for (i = 0; i < ap_threads_per_child; i++) {
         if (threads[i]) { /* if we ever created this thread */
             rv = apr_thread_join(&thread_rv, threads[i]);
@@ -1181,7 +1116,6 @@
     apr_threadattr_detach_set(thread_attr, 0);
 
     ts->threads = threads;
-    ts->listener = NULL;
     ts->child_num_arg = child_num_arg;
     ts->threadattr = thread_attr;
 
@@ -1221,7 +1155,7 @@
          *   If the worker hasn't exited, then this blocks until
          *   they have (then cleans up).
          */
-        join_workers(ts->listener, threads);
+        join_workers(threads);
     }
     else { /* !one_process */
         /* remove SIGTERM from the set of blocked signals...  if one of
@@ -1262,7 +1196,7 @@
              *   If the worker hasn't exited, then this blocks until
              *   they have (then cleans up).
              */
-            join_workers(ts->listener, threads);
+            join_workers(threads);
         }
     }
 

Reply via email to