Based on the "slow Apache 2.0" thread earlier today, and my observation therein that it's possible for a worker child process to block on a full file descriptor queue (all threads busy) while other child procs have idle threads, I decided to revive the idea of switching the worker thread management to a leader/followers pattern. The way it works is: * There's no dedicated listener thread. The workers take turns serving as the listener. * Idle threads are listed in a stack. Each thread has a condition variable. When the current listener accepts a connection, it pops the next idle thread from the stack and wakes it up using the condition variable. The newly awakened thread becomes the new listener. * If there is no idle thread available to become the new listener, the next thread to finish handling its current connection takes over as listener. (Thus a process that's already saturated with connections won't call accept() until it actually has an idle thread available.)
In order to implement the patch quickly, I've used a mutex to guard the stack for now, rather than using atomic compare-and-swap operations like I'd once proposed. In order to improve scalability, though, this mutex is *not* used for the condition variable signaling. Instead, each worker thread has a private mutex for use with its condition variable. This thread-private mutex is locked at thread creation, and the only subsequent operations on it are those done implicitly by the cond_signal/cond_wait. Thus only the thread associated with that mutex ever locks or unlocks it, which should help to reduce synchronization overhead. (The design is dependent on the semantics of the one-listener-at-a-time model to synchronize the cond_signal with the cond_wait.) Can I get a few volunteers to test/review this? Thanks, --Brian
Index: server/mpm/worker/worker.c =================================================================== RCS file: /home/cvs/httpd-2.0/server/mpm/worker/worker.c,v retrieving revision 1.114 diff -u -r1.114 worker.c --- server/mpm/worker/worker.c 8 Apr 2002 16:57:06 -0000 1.114 +++ server/mpm/worker/worker.c 10 Apr 2002 05:22:03 -0000 @@ -69,6 +69,7 @@ #include "apr_file_io.h" #include "apr_thread_proc.h" #include "apr_signal.h" +#include "apr_thread_cond.h" #include "apr_thread_mutex.h" #include "apr_proc_mutex.h" #define APR_WANT_STRFUNC @@ -105,7 +106,6 @@ #include "mpm_common.h" #include "ap_listen.h" #include "scoreboard.h" -#include "fdqueue.h" #include "mpm_default.h" #include <signal.h> @@ -168,11 +168,9 @@ static int dying = 0; static int workers_may_exit = 0; static int start_thread_may_exit = 0; -static int listener_may_exit = 0; static int requests_this_child; static int num_listensocks = 0; static int resource_shortage = 0; -static fd_queue_t *worker_queue; /* The structure used to pass unique initialization info to each thread */ typedef struct { @@ -181,12 +179,12 @@ int sd; } proc_info; + /* Structure used to pass information to the thread responsible for * creating the rest of the threads. */ typedef struct { apr_thread_t **threads; - apr_thread_t *listener; int child_num_arg; apr_threadattr_t *threadattr; } thread_starter; @@ -232,7 +230,6 @@ static pid_t ap_my_pid; /* Linux getpid() doesn't work except in main thread. Use this instead */ static pid_t parent_pid; -static apr_os_thread_t *listener_os_thread; /* Locks for accept serialization */ static apr_proc_mutex_t *accept_mutex; @@ -243,37 +240,103 @@ #define SAFE_ACCEPT(stmt) (stmt) #endif -/* The LISTENER_SIGNAL signal will be sent from the main thread to the - * listener thread to wake it up for graceful termination (what a child - * process from an old generation does when the admin does "apachectl - * graceful"). This signal will be blocked in all threads of a child - * process except for the listener thread. + +/* Structure used to wake up an idle worker thread + */ +typedef struct { + apr_thread_cond_t *cond; + apr_thread_mutex_t *mutex; +} worker_wakeup_info; + +/* Structure used to hold a stack of idle worker threads */ -#define LISTENER_SIGNAL SIGHUP +typedef struct { + apr_thread_mutex_t *mutex; + int no_listener; + worker_wakeup_info **stack; + apr_size_t nelts; + apr_size_t nalloc; +} worker_stack; + +static worker_stack* worker_stack_create(apr_pool_t *pool, apr_size_t max) +{ + apr_status_t rv; + worker_stack *stack = (worker_stack *)apr_palloc(pool, sizeof(*stack)); + + if ((rv = apr_thread_mutex_create(&stack->mutex, APR_THREAD_MUTEX_DEFAULT, + pool)) != APR_SUCCESS) { + return NULL; + } + stack->no_listener = 1; + stack->nelts = 0; + stack->nalloc = max; + stack->stack = + (worker_wakeup_info **)apr_palloc(pool, stack->nalloc * + sizeof(worker_wakeup_info *)); + return stack; +} -static void wakeup_listener(void) +static apr_status_t worker_stack_wait(worker_stack *stack, + worker_wakeup_info *wakeup) { - listener_may_exit = 1; - if (!listener_os_thread) { - /* XXX there is an obscure path that this doesn't handle perfectly: - * right after listener thread is created but before - * listener_os_thread is set, the first worker thread hits an - * error and starts graceful termination + apr_status_t rv; + if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) { + return rv; + } + if (stack->no_listener) { + /* this thread should become the new listener immediately */ + stack->no_listener = 0; + if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { + return rv; + } + return APR_SUCCESS; + } + else { + /* push this thread onto the stack of idle workers, and block + * on the condition variable until awoken */ - return; + if (stack->nelts == stack->nalloc) { + return APR_ENOSPC; + } + stack->stack[stack->nelts++] = wakeup; + if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { + return rv; + } + if ((rv = apr_thread_cond_wait(wakeup->cond, wakeup->mutex)) != + APR_SUCCESS) { + return rv; + } + return APR_SUCCESS; } - /* - * we should just be able to "kill(ap_my_pid, LISTENER_SIGNAL)" on all - * platforms and wake up the listener thread since it is the only thread - * with SIGHUP unblocked, but that doesn't work on Linux - */ -#ifdef HAVE_PTHREAD_KILL - pthread_kill(*listener_os_thread, LISTENER_SIGNAL); -#else - kill(ap_my_pid, LISTENER_SIGNAL); -#endif } +static apr_status_t worker_stack_awaken_next(worker_stack *stack) +{ + apr_status_t rv; + if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) { + return rv; + } + if (stack->nelts) { + worker_wakeup_info *wakeup = stack->stack[--stack->nelts]; + if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { + return rv; + } + if ((rv = apr_thread_cond_signal(wakeup->cond)) != APR_SUCCESS) { + apr_thread_mutex_unlock(stack->mutex); + return rv; + } + } + else { + stack->no_listener = 1; + if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { + return rv; + } + } + return APR_SUCCESS; +} + +static worker_stack *idle_worker_stack; + #define ST_INIT 0 #define ST_GRACEFUL 1 #define ST_UNGRACEFUL 2 @@ -282,23 +345,15 @@ static void signal_threads(int mode) { + int i; if (terminate_mode == mode) { return; } terminate_mode = mode; - /* in case we weren't called from the listener thread, wake up the - * listener thread - */ - wakeup_listener(); - - /* for ungraceful termination, let the workers exit now; - * for graceful termination, the listener thread will notify the - * workers to exit once it has stopped accepting new connections - */ - if (mode == ST_UNGRACEFUL) { - workers_may_exit = 1; - ap_queue_interrupt_all(worker_queue); + workers_may_exit = 1; + for (i = 0; i < ap_threads_per_child; i++) { + (void)worker_stack_awaken_next(idle_worker_stack); } } @@ -576,10 +631,7 @@ * maybe it should be ap_mpm_process_exiting? */ { - /* note: for a graceful termination, listener_may_exit will be set before - * workers_may_exit, so check listener_may_exit - */ - return listener_may_exit; + return workers_may_exit; } /***************************************************************** @@ -659,45 +711,83 @@ */ } -static void *listener_thread(apr_thread_t *thd, void * dummy) +static void *worker_thread(apr_thread_t *thd, void * dummy) { proc_info * ti = dummy; int process_slot = ti->pid; + int thread_slot = ti->tid; apr_pool_t *tpool = apr_thread_pool_get(thd); void *csd = NULL; + apr_allocator_t *allocator; apr_pool_t *ptrans; /* Pool for per-transaction stuff */ - apr_pool_t *recycled_pool = NULL; + apr_bucket_alloc_t *bucket_alloc; int n; apr_pollfd_t *pollset; apr_status_t rv; ap_listen_rec *lr, *last_lr = ap_listeners; + worker_wakeup_info *wakeup; + int is_listener; + + ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_STARTING, +NULL); free(ti); + apr_allocator_create(&allocator); + apr_pool_create_ex(&ptrans, NULL, NULL, allocator); + apr_allocator_set_owner(allocator, ptrans); + bucket_alloc = apr_bucket_alloc_create(tpool); + + wakeup = (worker_wakeup_info *)apr_palloc(tpool, sizeof(*wakeup)); + if ((rv = apr_thread_cond_create(&wakeup->cond, tpool)) != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf, + "apr_thread_cond_create failed. Attempting to shutdown " + "process gracefully."); + signal_threads(ST_GRACEFUL); + goto done; + } + if ((rv = apr_thread_mutex_create(&wakeup->mutex, APR_THREAD_MUTEX_DEFAULT, + tpool)) != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf, + "apr_thread_mutex_create failed. Attempting to shutdown " + "process gracefully."); + signal_threads(ST_GRACEFUL); + goto done; + } + apr_thread_mutex_lock(wakeup->mutex); + apr_poll_setup(&pollset, num_listensocks, tpool); for(lr = ap_listeners ; lr != NULL ; lr = lr->next) apr_poll_socket_add(pollset, lr->sd, APR_POLLIN); - /* Unblock the signal used to wake this thread up, and set a handler for - * it. - */ - unblock_signal(LISTENER_SIGNAL); - apr_signal(LISTENER_SIGNAL, dummy_signal_handler); - /* TODO: Switch to a system where threads reuse the results from earlier poll calls - manoj */ - while (1) { + is_listener = 0; + while (!workers_may_exit) { + + ap_update_child_status_from_indexes(process_slot, thread_slot, + SERVER_READY, NULL); + if (!is_listener) { + /* Wait until it's our turn to become the listener */ + if ((rv = worker_stack_wait(idle_worker_stack, wakeup)) != + APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf, + "worker_stack_wait failed. Shutting down"); + break; + } + is_listener = 1; + } + /* TODO: requests_this_child should be synchronized - aaron */ if (requests_this_child <= 0) { check_infinite_requests(); } - if (listener_may_exit) break; + if (workers_may_exit) break; if ((rv = SAFE_ACCEPT(apr_proc_mutex_lock(accept_mutex))) != APR_SUCCESS) { int level = APLOG_EMERG; - if (listener_may_exit) { + if (workers_may_exit) { break; } if (ap_scoreboard_image->parent[process_slot].generation != @@ -716,7 +806,7 @@ lr = ap_listeners; } else { - while (!listener_may_exit) { + while (!workers_may_exit) { apr_status_t ret; apr_int16_t event; @@ -733,7 +823,7 @@ signal_threads(ST_GRACEFUL); } - if (listener_may_exit) break; + if (workers_may_exit) break; /* find a listener */ lr = last_lr; @@ -752,19 +842,7 @@ } } got_fd: - if (!listener_may_exit) { - /* create a new transaction pool for each accepted socket */ - if (recycled_pool == NULL) { - apr_allocator_t *allocator; - - apr_allocator_create(&allocator); - apr_pool_create_ex(&ptrans, NULL, NULL, allocator); - apr_allocator_set_owner(allocator, ptrans); - } - else { - ptrans = recycled_pool; - } - apr_pool_tag(ptrans, "transaction"); + if (!workers_may_exit) { rv = lr->accept_func(&csd, lr, ptrans); /* If we were interrupted for whatever reason, just start @@ -782,7 +860,7 @@ != APR_SUCCESS) { int level = APLOG_EMERG; - if (listener_may_exit) { + if (workers_may_exit) { break; } if (ap_scoreboard_image->parent[process_slot].generation != @@ -795,16 +873,13 @@ signal_threads(ST_GRACEFUL); } if (csd != NULL) { - rv = ap_queue_push(worker_queue, csd, ptrans, - &recycled_pool); - if (rv) { - /* trash the connection; we couldn't queue the connected - * socket to a worker - */ - apr_socket_close(csd); - ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf, - "ap_queue_push failed"); - } + is_listener = 0; + worker_stack_awaken_next(idle_worker_stack); + process_socket(ptrans, csd, process_slot, + thread_slot, bucket_alloc); + apr_pool_clear(ptrans); + requests_this_child--; + apr_socket_close(csd); /* Debug only */ } } else { @@ -819,75 +894,10 @@ } } - ap_queue_term(worker_queue); + done: dying = 1; ap_scoreboard_image->parent[process_slot].quiescing = 1; - - /* wake up the main thread */ - kill(ap_my_pid, SIGTERM); - - apr_thread_exit(thd, APR_SUCCESS); - return NULL; -} - -/* XXX For ungraceful termination/restart, we definitely don't want to - * wait for active connections to finish but we may want to wait - * for idle workers to get out of the queue code and release mutexes, - * since those mutexes are cleaned up pretty soon and some systems - * may not react favorably (i.e., segfault) if operations are attempted - * on cleaned-up mutexes. - */ -static void * APR_THREAD_FUNC worker_thread(apr_thread_t *thd, void * dummy) -{ - proc_info * ti = dummy; - int process_slot = ti->pid; - int thread_slot = ti->tid; - apr_socket_t *csd = NULL; - apr_bucket_alloc_t *bucket_alloc; - apr_pool_t *last_ptrans = NULL; - apr_pool_t *ptrans; /* Pool for per-transaction stuff */ - apr_status_t rv; - - free(ti); - - ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_STARTING, NULL); - - bucket_alloc = apr_bucket_alloc_create(apr_thread_pool_get(thd)); - - while (!workers_may_exit) { - ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_READY, NULL); - rv = ap_queue_pop(worker_queue, &csd, &ptrans, last_ptrans); - last_ptrans = NULL; - - if (rv != APR_SUCCESS) { - /* We get APR_EOF during a graceful shutdown once all the connections - * accepted by this server process have been handled. - */ - if (rv == APR_EOF) { - break; - } - /* We get APR_EINTR whenever ap_queue_pop() has been interrupted - * from an explicit call to ap_queue_interrupt_all(). This allows - * us to unblock threads stuck in ap_queue_pop() when a shutdown - * is pending. - * - * If workers_may_exit is set and this is ungraceful termination/ - * restart, we are bound to get an error on some systems (e.g., - * AIX, which sanity-checks mutex operations) since the queue - * may have already been cleaned up. Don't log the "error" if - * workers_may_exit is set. - */ - if (rv != APR_EINTR && !workers_may_exit) { - ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf, - "ap_queue_pop failed"); - } - continue; - } - process_socket(ptrans, csd, process_slot, thread_slot, bucket_alloc); - requests_this_child--; /* FIXME: should be synchronized - aaron */ - apr_pool_clear(ptrans); - last_ptrans = ptrans; - } + worker_stack_awaken_next(idle_worker_stack); ap_update_child_status_from_indexes(process_slot, thread_slot, (dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *) NULL); @@ -908,34 +918,6 @@ return 0; } -static void create_listener_thread(thread_starter *ts) -{ - int my_child_num = ts->child_num_arg; - apr_threadattr_t *thread_attr = ts->threadattr; - proc_info *my_info; - apr_status_t rv; - - my_info = (proc_info *)malloc(sizeof(proc_info)); - my_info->pid = my_child_num; - my_info->tid = -1; /* listener thread doesn't have a thread slot */ - my_info->sd = 0; - rv = apr_thread_create(&ts->listener, thread_attr, listener_thread, - my_info, pchild); - if (rv != APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_ALERT, rv, ap_server_conf, - "apr_thread_create: unable to create listener thread"); - /* In case system resources are maxxed out, we don't want - * Apache running away with the CPU trying to fork over and - * over and over again if we exit. - * XXX Jeff doesn't see how Apache is going to try to fork again since - * the exit code is APEXIT_CHILDFATAL - */ - apr_sleep(10 * APR_USEC_PER_SEC); - clean_child_exit(APEXIT_CHILDFATAL); - } - apr_os_thread_get(&listener_os_thread, ts->listener); -} - /* XXX under some circumstances not understood, children can get stuck * in start_threads forever trying to take over slots which will * never be cleaned up; for now there is an APLOG_DEBUG message issued @@ -955,19 +937,15 @@ int loops; int prev_threads_created; - /* We must create the fd queues before we start up the listener - * and worker threads. */ - worker_queue = apr_pcalloc(pchild, sizeof(*worker_queue)); - rv = ap_queue_init(worker_queue, ap_threads_per_child, pchild); - if (rv != APR_SUCCESS) { + idle_worker_stack = worker_stack_create(pchild, ap_threads_per_child); + if (idle_worker_stack == NULL) { ap_log_error(APLOG_MARK, APLOG_ALERT, rv, ap_server_conf, - "ap_queue_init() failed"); + "worker_stack_create() failed"); clean_child_exit(APEXIT_CHILDFATAL); } loops = prev_threads_created = 0; while (1) { - /* ap_threads_per_child does not include the listener thread */ for (i = 0; i < ap_threads_per_child; i++) { int status = ap_scoreboard_image->servers[child_num_arg][i].status; @@ -1003,12 +981,6 @@ clean_child_exit(APEXIT_CHILDFATAL); } threads_created++; - if (threads_created == 1) { - /* now that we have a worker thread, it makes sense to create - * a listener thread (we don't want a listener without a worker!) - */ - create_listener_thread(ts); - } } if (start_thread_may_exit || threads_created == ap_threads_per_child) { break; @@ -1041,48 +1013,11 @@ return NULL; } -static void join_workers(apr_thread_t *listener, apr_thread_t **threads) +static void join_workers(apr_thread_t **threads) { int i; apr_status_t rv, thread_rv; - if (listener) { - int iter; - - /* deal with a rare timing window which affects waking up the - * listener thread... if the signal sent to the listener thread - * is delivered between the time it verifies that the - * listener_may_exit flag is clear and the time it enters a - * blocking syscall, the signal didn't do any good... work around - * that by sleeping briefly and sending it again - */ - - iter = 0; - while (iter < 10 && -#ifdef HAVE_PTHREAD_KILL - pthread_kill(*listener_os_thread, 0) -#else - kill(ap_my_pid, 0) -#endif - == 0) { - /* listener not dead yet */ - apr_sleep(APR_USEC_PER_SEC / 2); - wakeup_listener(); - ++iter; - } - if (iter >= 10) { - ap_log_error(APLOG_MARK, APLOG_CRIT, 0, ap_server_conf, - "the listener thread didn't exit"); - } - else { - rv = apr_thread_join(&thread_rv, listener); - if (rv != APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf, - "apr_thread_join: unable to join listener thread"); - } - } - } - for (i = 0; i < ap_threads_per_child; i++) { if (threads[i]) { /* if we ever created this thread */ rv = apr_thread_join(&thread_rv, threads[i]); @@ -1181,7 +1116,6 @@ apr_threadattr_detach_set(thread_attr, 0); ts->threads = threads; - ts->listener = NULL; ts->child_num_arg = child_num_arg; ts->threadattr = thread_attr; @@ -1221,7 +1155,7 @@ * If the worker hasn't exited, then this blocks until * they have (then cleans up). */ - join_workers(ts->listener, threads); + join_workers(threads); } else { /* !one_process */ /* remove SIGTERM from the set of blocked signals... if one of @@ -1262,7 +1196,7 @@ * If the worker hasn't exited, then this blocks until * they have (then cleans up). */ - join_workers(ts->listener, threads); + join_workers(threads); } }