The concept of multiplexing apache's lingering
close comes from lingerd, but I thought it'd be
interesting to try the same thing for worker with
a dedicated closer thread. This is my first foray
into thread programming so expect rookie mistakes
in the patch (please be gentle :-).
The patch is in three parts, one for httpd-2.0 head
one for apr head, and two new files closer.[ch] that
need to be placed in httpd-2.0/server/mpm/worker.
The files implementing the closer thread are
available here
http://cvs.apache.org/~joes/closer.h
http://cvs.apache.org/~joes/closer.c
Patches for httpd-2.0 and apr HEAD are attached:
? apr.tag
? html
? latex
? build/apr_rules.out
? include/apr_tables.h.cur
? include/apr_tables.h.current
? include/apr_tables.h.new
? include/apr_tables.h.vtable
? tables/apr_tables.c.cur
? tables/apr_tables.c.current
? tables/apr_tables.c.new
? tables/apr_tables.c.vtable
Index: include/apr_network_io.h
===================================================================
RCS file: /home/cvspublic/apr/include/apr_network_io.h,v
retrieving revision 1.153
diff -u -r1.153 apr_network_io.h
--- include/apr_network_io.h 13 Feb 2004 09:38:28 -0000 1.153
+++ include/apr_network_io.h 25 Aug 2004 19:57:50 -0000
@@ -559,6 +559,10 @@
APR_DECLARE(apr_status_t) apr_socket_recv(apr_socket_t *sock,
char *buf, apr_size_t *len);
+APR_DECLARE(apr_status_t) apr_socket_setaside(apr_socket_t **new_sock,
+ apr_socket_t *old_sock,
+ apr_pool_t *p);
+
/**
* Setup socket options for the specified socket
* @param sock The socket to set up.
Index: network_io/unix/sockets.c
===================================================================
RCS file: /home/cvspublic/apr/network_io/unix/sockets.c,v
retrieving revision 1.120
diff -u -r1.120 sockets.c
--- network_io/unix/sockets.c 10 Mar 2004 20:58:34 -0000 1.120
+++ network_io/unix/sockets.c 25 Aug 2004 19:57:51 -0000
@@ -387,6 +387,38 @@
return APR_SUCCESS;
}
+APR_DECLARE(apr_status_t) apr_socket_setaside(apr_socket_t **new_sock,
+ apr_socket_t *old_sock,
+ apr_pool_t *p)
+{
+ apr_status_t rv;
+ apr_os_sock_t sd;
+ apr_os_sock_get(&sd, old_sock);
+
+ alloc_socket(new_sock, p);
+
+ /* XXX This ignores local_addr, remote_addr, pollset and userdata */
+ rv = apr_os_sock_put(new_sock, &sd, p);
+ (*new_sock)->type = old_sock->type;
+ (*new_sock)->protocol = old_sock->protocol;
+ (*new_sock)->timeout = old_sock->timeout;
+
+#ifndef HAVE_POLL
+ (*new_sock)->connected = old_sock->connected;
+#endif
+ (*new_sock)->options = old_sock->options;
+ (*new_sock)->inherit = old_sock->inherit;
+
+ old_sock->socketdes = -1;
+
+ apr_pool_cleanup_register((*new_sock)->cntxt, (void *)(*new_sock),
+ socket_cleanup, socket_cleanup);
+ apr_pool_cleanup_kill(old_sock->cntxt, (void *)(old_sock), socket_cleanup);
+
+ return rv;
+}
+
+
APR_IMPLEMENT_INHERIT_SET(socket, inherit, cntxt, socket_cleanup)
APR_IMPLEMENT_INHERIT_UNSET(socket, inherit, cntxt, socket_cleanup)
? modules/ldap/.deps
? modules/ldap/Makefile
? modules/ldap/modules.mk
? server/mpm/worker/closer.c
? server/mpm/worker/closer.h
Index: server/mpm/worker/Makefile.in
===================================================================
RCS file: /home/cvspublic/httpd-2.0/server/mpm/worker/Makefile.in,v
retrieving revision 1.2
diff -u -r1.2 Makefile.in
--- server/mpm/worker/Makefile.in 11 Feb 2002 04:56:10 -0000 1.2
+++ server/mpm/worker/Makefile.in 25 Aug 2004 19:58:36 -0000
@@ -1,5 +1,5 @@
LTLIBRARY_NAME = libworker.la
-LTLIBRARY_SOURCES = worker.c fdqueue.c pod.c
+LTLIBRARY_SOURCES = worker.c fdqueue.c pod.c closer.c
include $(top_srcdir)/build/ltlib.mk
Index: server/mpm/worker/worker.c
===================================================================
RCS file: /home/cvspublic/httpd-2.0/server/mpm/worker/worker.c,v
retrieving revision 1.151
diff -u -r1.151 worker.c
--- server/mpm/worker/worker.c 15 Mar 2004 23:08:41 -0000 1.151
+++ server/mpm/worker/worker.c 25 Aug 2004 19:58:39 -0000
@@ -64,6 +64,7 @@
#include "ap_listen.h"
#include "scoreboard.h"
#include "fdqueue.h"
+#include "closer.h"
#include "mpm_default.h"
#include <signal.h>
@@ -132,8 +133,10 @@
static int resource_shortage = 0;
static fd_queue_t *worker_queue;
static fd_queue_info_t *worker_queue_info;
+
static int mpm_state = AP_MPMQ_STARTING;
+
/* The structure used to pass unique initialization info to each thread */
typedef struct {
int pid;
@@ -147,6 +150,7 @@
typedef struct {
apr_thread_t **threads;
apr_thread_t *listener;
+ apr_thread_t *closer;
int child_num_arg;
apr_threadattr_t *threadattr;
} thread_starter;
@@ -275,6 +279,11 @@
if (mode == ST_UNGRACEFUL) {
workers_may_exit = 1;
ap_queue_interrupt_all(worker_queue);
+
+#ifdef ENABLE_CLOSER_THREAD
+ closer_may_exit = 1;
+ ap_queue_interrupt_all(closer_queue);
+#endif
ap_queue_info_term(worker_queue_info);
close_worker_sockets(); /* forcefully kill all current connections */
}
@@ -518,7 +527,98 @@
conn_id, sbh, bucket_alloc);
if (current_conn) {
ap_process_connection(current_conn, sock);
+
+#ifndef ENABLE_CLOSER_THREAD
ap_lingering_close(current_conn);
+ return;
+#endif
+
+ /* Try to shutdown both sides of the socket immediately.
+ * If that's not possible (because apr_socket_recv would block), then
+ * hand the socket off to the closer_thread.
+ */
+
+ ap_flush_conn(current_conn);
+ ap_update_child_status(current_conn->sbh, SERVER_CLOSING, NULL);
+
+ /* Shut down the socket for write, which will send a FIN
+ * to the peer. Then hand socket off to closer_thread
+ * to mux the lingering close operation.
+ */
+ if (!current_conn->aborted &&
+ apr_socket_shutdown(sock, APR_SHUTDOWN_WRITE) == APR_SUCCESS) {
+ char dummy[1024];
+ apr_size_t nb;
+ apr_status_t rv;
+
+ /* want nonblocking reads from here on out */
+ apr_socket_timeout_set(sock, 0);
+
+ /* flush the socket buffer, hoping for a quick APR_EOF */
+ do {
+ nb = sizeof dummy;
+ rv = apr_socket_recv(sock, dummy, &nb);
+ } while (rv == APR_SUCCESS);
+
+ if (rv != APR_EAGAIN)
+ return;
+
+ rv = apr_thread_mutex_lock(closer_queue->one_big_mutex);
+
+ if (rv == APR_SUCCESS && !closer_may_exit) {
+
+ if (closer_queue->nelts < closer_queue->bounds) {
+ fd_queue_elem_t *elem;
+
+ elem = &closer_queue->data[closer_queue->nelts++];
+ apr_socket_setaside(&elem->sd, sock, pchild);
+ elem->p = pchild;
+
+ if (closer_waiting_for_signal
+ && closer_queue->nelts == 1)
+ {
+ rv = apr_thread_cond_signal(closer_queue->not_empty);
+ }
+
+ rv = apr_thread_mutex_unlock(closer_queue->one_big_mutex);
+ if (rv != APR_SUCCESS)
+ ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+ "closer_queue %d: apr_thread_mutex_unlock "
+ "failed (tid=%d)", my_child_num,
+ my_thread_num);
+
+ }
+ else {
+ /* OOPS! The closer_queue is full, so
+ * we need to do the lingering ourself.
+ */
+ int total_linger_time = 0;
+
+ rv = apr_thread_mutex_unlock(closer_queue->one_big_mutex);
+ ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+ "closer_queue %d is full (tid=%d)",
+ my_child_num, my_thread_num);
+
+ apr_socket_timeout_set(sock, MICROS_TO_LINGER);
+ apr_socket_opt_set(sock, APR_INCOMPLETE_READ, 1);
+
+ while (1) {
+ nb = sizeof dummy;
+ rv = apr_socket_recv(sock, dummy, &nb);
+ if (rv != APR_SUCCESS || nb == 0)
+ break;
+
+ total_linger_time += SECONDS_TO_LINGER;
+ if (total_linger_time >= MAX_SECS_TO_LINGER) {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /* The caller's pending cleanup of the connection
+ pool p will actually close the socket sock. */
}
}
@@ -945,7 +1045,16 @@
worker_sockets = apr_pcalloc(pchild, ap_threads_per_child
* sizeof(apr_socket_t *));
+#ifdef ENABLE_CLOSER_THREAD
+ rv = create_closer_thread(ts->child_num_arg, ts->threadattr,
+ pchild, &ts->closer);
+ if (rv != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_ALERT, rv, ap_server_conf,
+ "create_closer_thread() failed");
+ clean_child_exit(APEXIT_CHILDFATAL);
+ }
+#endif
loops = prev_threads_created = 0;
while (1) {
/* ap_threads_per_child does not include the listener thread */
--
Joe Schaefer