The concept of multiplexing apache's lingering 
close comes from lingerd, but I thought it'd be 
interesting to try the same thing for worker with 
a dedicated closer thread.  This is my first foray
into thread programming so expect rookie mistakes
in the patch (please be gentle :-).

The patch is in three parts, one for httpd-2.0 head 
one for apr head, and two new files closer.[ch] that
need to be placed in httpd-2.0/server/mpm/worker.

The files implementing the closer thread are 
available here

  http://cvs.apache.org/~joes/closer.h
  http://cvs.apache.org/~joes/closer.c

Patches for httpd-2.0 and apr HEAD are attached:

? apr.tag
? html
? latex
? build/apr_rules.out
? include/apr_tables.h.cur
? include/apr_tables.h.current
? include/apr_tables.h.new
? include/apr_tables.h.vtable
? tables/apr_tables.c.cur
? tables/apr_tables.c.current
? tables/apr_tables.c.new
? tables/apr_tables.c.vtable
Index: include/apr_network_io.h
===================================================================
RCS file: /home/cvspublic/apr/include/apr_network_io.h,v
retrieving revision 1.153
diff -u -r1.153 apr_network_io.h
--- include/apr_network_io.h	13 Feb 2004 09:38:28 -0000	1.153
+++ include/apr_network_io.h	25 Aug 2004 19:57:50 -0000
@@ -559,6 +559,10 @@
 APR_DECLARE(apr_status_t) apr_socket_recv(apr_socket_t *sock, 
                                    char *buf, apr_size_t *len);
 
+APR_DECLARE(apr_status_t) apr_socket_setaside(apr_socket_t **new_sock,
+                                              apr_socket_t *old_sock,
+                                              apr_pool_t *p);
+
 /**
  * Setup socket options for the specified socket
  * @param sock The socket to set up.
Index: network_io/unix/sockets.c
===================================================================
RCS file: /home/cvspublic/apr/network_io/unix/sockets.c,v
retrieving revision 1.120
diff -u -r1.120 sockets.c
--- network_io/unix/sockets.c	10 Mar 2004 20:58:34 -0000	1.120
+++ network_io/unix/sockets.c	25 Aug 2004 19:57:51 -0000
@@ -387,6 +387,38 @@
     return APR_SUCCESS;
 }
 
+APR_DECLARE(apr_status_t) apr_socket_setaside(apr_socket_t **new_sock,
+                                              apr_socket_t *old_sock,
+                                              apr_pool_t *p)
+{
+    apr_status_t rv;
+    apr_os_sock_t sd;
+    apr_os_sock_get(&sd, old_sock);
+
+    alloc_socket(new_sock, p);
+
+    /* XXX This ignores local_addr, remote_addr, pollset and userdata */
+    rv = apr_os_sock_put(new_sock, &sd, p);
+    (*new_sock)->type     = old_sock->type;
+    (*new_sock)->protocol = old_sock->protocol;
+    (*new_sock)->timeout  = old_sock->timeout;
+
+#ifndef HAVE_POLL
+    (*new_sock)->connected = old_sock->connected;
+#endif
+    (*new_sock)->options = old_sock->options;
+    (*new_sock)->inherit = old_sock->inherit;
+
+    old_sock->socketdes = -1;
+
+    apr_pool_cleanup_register((*new_sock)->cntxt, (void *)(*new_sock), 
+                              socket_cleanup, socket_cleanup);
+    apr_pool_cleanup_kill(old_sock->cntxt, (void *)(old_sock), socket_cleanup);
+
+    return rv;
+}
+
+
 APR_IMPLEMENT_INHERIT_SET(socket, inherit, cntxt, socket_cleanup)
 
 APR_IMPLEMENT_INHERIT_UNSET(socket, inherit, cntxt, socket_cleanup)
? modules/ldap/.deps
? modules/ldap/Makefile
? modules/ldap/modules.mk
? server/mpm/worker/closer.c
? server/mpm/worker/closer.h
Index: server/mpm/worker/Makefile.in
===================================================================
RCS file: /home/cvspublic/httpd-2.0/server/mpm/worker/Makefile.in,v
retrieving revision 1.2
diff -u -r1.2 Makefile.in
--- server/mpm/worker/Makefile.in	11 Feb 2002 04:56:10 -0000	1.2
+++ server/mpm/worker/Makefile.in	25 Aug 2004 19:58:36 -0000
@@ -1,5 +1,5 @@
 
 LTLIBRARY_NAME    = libworker.la
-LTLIBRARY_SOURCES = worker.c fdqueue.c pod.c
+LTLIBRARY_SOURCES = worker.c fdqueue.c pod.c closer.c
 
 include $(top_srcdir)/build/ltlib.mk
Index: server/mpm/worker/worker.c
===================================================================
RCS file: /home/cvspublic/httpd-2.0/server/mpm/worker/worker.c,v
retrieving revision 1.151
diff -u -r1.151 worker.c
--- server/mpm/worker/worker.c	15 Mar 2004 23:08:41 -0000	1.151
+++ server/mpm/worker/worker.c	25 Aug 2004 19:58:39 -0000
@@ -64,6 +64,7 @@
 #include "ap_listen.h"
 #include "scoreboard.h" 
 #include "fdqueue.h"
+#include "closer.h"
 #include "mpm_default.h"
 
 #include <signal.h>
@@ -132,8 +133,10 @@
 static int resource_shortage = 0;
 static fd_queue_t *worker_queue;
 static fd_queue_info_t *worker_queue_info;
+
 static int mpm_state = AP_MPMQ_STARTING;
 
+
 /* The structure used to pass unique initialization info to each thread */
 typedef struct {
     int pid;
@@ -147,6 +150,7 @@
 typedef struct {
     apr_thread_t **threads;
     apr_thread_t *listener;
+    apr_thread_t *closer;
     int child_num_arg;
     apr_threadattr_t *threadattr;
 } thread_starter;
@@ -275,6 +279,11 @@
     if (mode == ST_UNGRACEFUL) {
         workers_may_exit = 1;
         ap_queue_interrupt_all(worker_queue);
+
+#ifdef ENABLE_CLOSER_THREAD
+        closer_may_exit = 1;
+        ap_queue_interrupt_all(closer_queue);
+#endif
         ap_queue_info_term(worker_queue_info);
         close_worker_sockets(); /* forcefully kill all current connections */
     }
@@ -518,7 +527,98 @@
                                             conn_id, sbh, bucket_alloc);
     if (current_conn) {
         ap_process_connection(current_conn, sock);
+
+#ifndef ENABLE_CLOSER_THREAD
         ap_lingering_close(current_conn);
+        return;
+#endif
+
+        /* Try to shutdown both sides of the socket immediately.
+         * If that's not possible (because apr_socket_recv would block), then 
+         * hand the socket off to the closer_thread.
+         */
+
+        ap_flush_conn(current_conn);
+        ap_update_child_status(current_conn->sbh, SERVER_CLOSING, NULL);
+
+        /* Shut down the socket for write, which will send a FIN
+         * to the peer.  Then hand socket off to closer_thread
+         * to mux the lingering close operation.
+         */
+        if (!current_conn->aborted &&
+            apr_socket_shutdown(sock, APR_SHUTDOWN_WRITE) == APR_SUCCESS) {
+            char dummy[1024];
+            apr_size_t nb;
+            apr_status_t rv;
+            
+            /* want nonblocking reads from here on out */
+            apr_socket_timeout_set(sock, 0);
+
+            /* flush the socket buffer, hoping for a quick APR_EOF */
+            do {
+                nb = sizeof dummy;
+                rv = apr_socket_recv(sock, dummy, &nb);
+            } while (rv == APR_SUCCESS);
+
+            if (rv != APR_EAGAIN)
+                return;
+
+            rv = apr_thread_mutex_lock(closer_queue->one_big_mutex);
+
+            if (rv == APR_SUCCESS && !closer_may_exit) {
+
+                if (closer_queue->nelts < closer_queue->bounds) {
+                    fd_queue_elem_t *elem;
+
+                    elem = &closer_queue->data[closer_queue->nelts++];
+                    apr_socket_setaside(&elem->sd, sock, pchild);
+                    elem->p = pchild;
+
+                    if (closer_waiting_for_signal
+                        && closer_queue->nelts == 1)
+                    {
+                        rv = apr_thread_cond_signal(closer_queue->not_empty);
+                    }
+
+                    rv = apr_thread_mutex_unlock(closer_queue->one_big_mutex);
+                    if (rv != APR_SUCCESS)
+                        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf, 
+                                     "closer_queue %d: apr_thread_mutex_unlock "
+                                     "failed (tid=%d)",  my_child_num, 
+                                     my_thread_num);
+
+                }
+                else {
+                    /* OOPS! The closer_queue is full, so
+                     * we need to do the lingering ourself.
+                     */
+                    int total_linger_time = 0;
+
+                    rv = apr_thread_mutex_unlock(closer_queue->one_big_mutex);
+                    ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf, 
+                                 "closer_queue %d is full (tid=%d)",
+                                 my_child_num, my_thread_num);
+
+                    apr_socket_timeout_set(sock, MICROS_TO_LINGER);
+                    apr_socket_opt_set(sock, APR_INCOMPLETE_READ, 1);
+
+                    while (1) {
+                        nb = sizeof dummy;
+                        rv = apr_socket_recv(sock, dummy, &nb);
+                        if (rv != APR_SUCCESS || nb == 0)
+                            break;
+
+                        total_linger_time += SECONDS_TO_LINGER;
+                        if (total_linger_time >= MAX_SECS_TO_LINGER) {
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+
+        /* The caller's pending cleanup of the connection 
+           pool p will actually close the socket sock. */
     }
 }
 
@@ -945,7 +1045,16 @@
 
     worker_sockets = apr_pcalloc(pchild, ap_threads_per_child
                                         * sizeof(apr_socket_t *));
+#ifdef ENABLE_CLOSER_THREAD
+    rv = create_closer_thread(ts->child_num_arg, ts->threadattr, 
+                              pchild, &ts->closer);
 
+    if (rv != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_ALERT, rv, ap_server_conf,
+                     "create_closer_thread() failed");
+        clean_child_exit(APEXIT_CHILDFATAL);
+    }
+#endif
     loops = prev_threads_created = 0;
     while (1) {
         /* ap_threads_per_child does not include the listener thread */
-- 
Joe Schaefer

Reply via email to