On Tue, Jul 13, 2021 at 4:16 PM Stefan Eissing
<[email protected]> wrote:
>
> In Yann's v3 of the patch, it triggers only when I stop the server while the 
> test case is ongoing,

OK thanks, I have a new v6 which should address this and also still
call pre_close hooks in any case.
The patch is a bit "verbose" (some cosmetics/comments changes that
helped my workflow, sorry about that) but in the end I think it's
simpler to maintain (as Eric asked?) which may be worth it..

>
> [Tue Jul 13 14:12:18.800125 2021] [mpm_event:trace4] [pid 47786:tid 
> 123145484374016] event.c(563): closing socket 15/7f95ba07b4a0
> [Tue Jul 13 14:12:19.551039 2021] [mpm_event:trace4] [pid 47787:tid 
> 123145488666624] event.c(1322): closing listeners (connection_count=0)
> [Tue Jul 13 14:12:19.551146 2021] [mpm_event:trace4] [pid 47786:tid 
> 123145497251840] event.c(1322): closing listeners (connection_count=1)
> [Tue Jul 13 14:12:19.551176 2021] [mpm_event:trace4] [pid 47788:tid 
> 123145488666624] event.c(1322): closing listeners (connection_count=0)
> [Tue Jul 13 14:12:19.551209 2021] [mpm_event:trace4] [pid 47786:tid 
> 4476272128] event.c(563): closing socket 15/7f95ba07b4b0
> [Tue Jul 13 14:12:19.552305 2021] [mpm_event:trace4] [pid 47792:tid 
> 123145488666624] event.c(1322): closing listeners (connection_count=0)
> [Tue Jul 13 14:12:19.805283 2021] [mpm_event:trace4] [pid 47786:tid 
> 123145484910592] event.c(563): closing socket -1/7f95ba07b4b0
> [Tue Jul 13 14:12:19.805289 2021] [mpm_event:error] [pid 47786:tid 
> 123145484910592] (9)Bad file descriptor: AH00468: error closing socket 
> -1/7f95ba07b4b0
>
> You can see that socket 7f95ba07b4b0 is closed twice.

Indeed, where does the first close come from (the caller)?
If you see this with the new patch (LogLevel mpm_event:trace8), can
you confirm that it's from close_worker_sockets() (i.e. for ungraceful
restart) only?

Cheers;
Yann.
Index: server/mpm/event/event.c
===================================================================
--- server/mpm/event/event.c	(revision 1891501)
+++ server/mpm/event/event.c	(working copy)
@@ -254,6 +254,8 @@ struct event_conn_state_t {
     conn_state_t pub;
     /** chaining in defer_linger_chain */
     struct event_conn_state_t *chain;
+    /** Is lingering close from defer_lingering_close()? */
+    int deferred_linger;
 };
 
 APR_RING_HEAD(timeout_head_t, event_conn_state_t);
@@ -289,9 +291,10 @@ static volatile apr_time_t  queues_next_expiry;
  * Macros for accessing struct timeout_queue.
  * For TO_QUEUE_APPEND and TO_QUEUE_REMOVE, timeout_mutex must be held.
  */
-static void TO_QUEUE_APPEND(struct timeout_queue *q, event_conn_state_t *el)
+static void TO_QUEUE_APPEND(struct timeout_queue *q, event_conn_state_t *el,
+                            int do_wakeup)
 {
-    apr_time_t q_expiry;
+    apr_time_t elem_expiry;
     apr_time_t next_expiry;
 
     APR_RING_INSERT_TAIL(&q->head, el, event_conn_state_t, timeout_list);
@@ -298,16 +301,16 @@ static volatile apr_time_t  queues_next_expiry;
     ++*q->total;
     ++q->count;
 
-    /* Cheaply update the overall queues' next expiry according to the
-     * first entry of this queue (oldest), if necessary.
+    /* Cheaply update the global queues_next_expiry with the one of the
+     * first entry of this queue (oldest) if it expires before.
      */
     el = APR_RING_FIRST(&q->head);
-    q_expiry = el->queue_timestamp + q->timeout;
+    elem_expiry = el->queue_timestamp + q->timeout;
     next_expiry = queues_next_expiry;
-    if (!next_expiry || next_expiry > q_expiry + TIMEOUT_FUDGE_FACTOR) {
-        queues_next_expiry = q_expiry;
+    if (!next_expiry || next_expiry > elem_expiry + TIMEOUT_FUDGE_FACTOR) {
+        queues_next_expiry = elem_expiry;
         /* Unblock the poll()ing listener for it to update its timeout. */
-        if (listener_is_wakeable) {
+        if (do_wakeup && listener_is_wakeable) {
             apr_pollset_wakeup(event_pollset);
         }
     }
@@ -554,9 +557,20 @@ static APR_INLINE int connections_above_limit(int
     return 1;
 }
 
-static void abort_socket_nonblocking(apr_socket_t *csd)
+static void close_socket_nonblocking_(apr_socket_t *csd,
+                                      const char *from, int line)
 {
     apr_status_t rv;
+    apr_os_sock_t fd = -1;
+
+    /* close_worker_sockets() may have closed it already */
+    rv = apr_os_sock_get(&fd, csd);
+    ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf,
+                "closing socket %i/%pp from %s:%i", (int)fd, csd, from, line);
+    if (rv == APR_SUCCESS && fd == -1) {
+        return;
+    }
+
     apr_socket_timeout_set(csd, 0);
     rv = apr_socket_close(csd);
     if (rv != APR_SUCCESS) {
@@ -565,6 +579,8 @@ static APR_INLINE int connections_above_limit(int
         AP_DEBUG_ASSERT(0);
     }
 }
+#define close_socket_nonblocking(csd) \
+    close_socket_nonblocking_(csd, __FUNCTION__, __LINE__)
 
 static void close_worker_sockets(void)
 {
@@ -573,26 +589,16 @@ static void close_worker_sockets(void)
         apr_socket_t *csd = worker_sockets[i];
         if (csd) {
             worker_sockets[i] = NULL;
-            abort_socket_nonblocking(csd);
+            close_socket_nonblocking(csd);
         }
     }
-    for (;;) {
-        event_conn_state_t *cs = defer_linger_chain;
-        if (!cs) {
-            break;
-        }
-        if (apr_atomic_casptr((void *)&defer_linger_chain, cs->chain,
-                              cs) != cs) {
-            /* Race lost, try again */
-            continue;
-        }
-        cs->chain = NULL;
-        abort_socket_nonblocking(cs->pfd.desc.s);
-    }
 }
 
 static void wakeup_listener(void)
 {
+    ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
+                 "wake up listener%s", listener_may_exit ? " again" : "");
+
     listener_may_exit = 1;
     disable_listensocks();
 
@@ -779,7 +785,10 @@ static apr_status_t decrement_connection_count(voi
 {
     int is_last_connection;
     event_conn_state_t *cs = cs_;
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c,
+                  "cleanup connection from state %i", (int)cs->pub.state);
     switch (cs->pub.state) {
+        case CONN_STATE_LINGER:
         case CONN_STATE_LINGER_NORMAL:
         case CONN_STATE_LINGER_SHORT:
             apr_atomic_dec32(&lingering_count);
@@ -800,6 +809,10 @@ static apr_status_t decrement_connection_count(voi
                 || (listeners_disabled() && !connections_above_limit(NULL)))) {
         apr_pollset_wakeup(event_pollset);
     }
+    if (dying) {
+        /* Help worker_thread_should_exit_early() */
+        ap_queue_interrupt_one(worker_queue);
+    }
     return APR_SUCCESS;
 }
 
@@ -817,66 +830,42 @@ static void notify_resume(event_conn_state_t *cs,
     ap_run_resume_connection(cs->c, cs->r);
 }
 
-/*
- * Close our side of the connection, flushing data to the client first.
- * Pre-condition: cs is not in any timeout queue and not in the pollset,
- *                timeout_mutex is not locked
- * return: 0 if connection is fully closed,
- *         1 if connection is lingering
- * May only be called by worker thread.
+/* Close the connection and release its resources (ptrans), either because an
+ * unrecoverable error occured (queues or pollset add/remove) or more usually
+ * if lingering close timed out.
+ * Pre-condition: nonblocking, can be called from anywhere provided cs is not
+ *                in any timeout queue or in the pollset.
  */
-static int start_lingering_close_blocking(event_conn_state_t *cs)
+static void abort_connection(event_conn_state_t *cs)
 {
-    apr_socket_t *csd = cs->pfd.desc.s;
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c,
+                  "abort connection from state %i", (int)cs->pub.state);
 
-    if (ap_start_lingering_close(cs->c)) {
-        notify_suspend(cs);
-        apr_socket_close(csd);
-        ap_queue_info_push_pool(worker_queue_info, cs->p);
-        return DONE;
-    }
-
-#ifdef AP_DEBUG
-    {
-        apr_status_t rv;
-        rv = apr_socket_timeout_set(csd, 0);
-        AP_DEBUG_ASSERT(rv == APR_SUCCESS);
-    }
-#else
-    apr_socket_timeout_set(csd, 0);
-#endif
-
-    cs->queue_timestamp = apr_time_now();
-    /*
-     * If some module requested a shortened waiting period, only wait for
-     * 2s (SECONDS_TO_LINGER). This is useful for mitigating certain
-     * DoS attacks.
-     */
-    if (apr_table_get(cs->c->notes, "short-lingering-close")) {
-        cs->pub.state = CONN_STATE_LINGER_SHORT;
-    }
-    else {
-        cs->pub.state = CONN_STATE_LINGER_NORMAL;
-    }
-    apr_atomic_inc32(&lingering_count);
-    notify_suspend(cs);
-
-    return OK;
+    close_socket_nonblocking(cs->pfd.desc.s);
+    ap_queue_info_push_pool(worker_queue_info, cs->p);
 }
 
 /*
  * Defer flush and close of the connection by adding it to defer_linger_chain,
  * for a worker to grab it and do the job (should that be blocking).
- * Pre-condition: cs is not in any timeout queue and not in the pollset,
- *                timeout_mutex is not locked
- * return: 1 connection is alive (but aside and about to linger)
- * May be called by listener thread.
+ * Pre-condition: nonblocking, can be called from anywhere provided cs is not
+ *                in any timeout queue or in the pollset.
  */
-static int start_lingering_close_nonblocking(event_conn_state_t *cs)
+static int defer_lingering_close(event_conn_state_t *cs)
 {
-    event_conn_state_t *chain;
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c,
+                  "defer lingering close from state %i", (int)cs->pub.state);
+
+    /* The connection is not shutdown() yet strictly speaking, but it's not
+     * in any queue nor handled by a worker either (will be very soon), so
+     * to account for it somewhere we bump lingering_count now (and set
+     * deferred_linger for process_lingering_close() to know).
+     */
+    cs->pub.state = CONN_STATE_LINGER;
+    apr_atomic_inc32(&lingering_count);
+    cs->deferred_linger = 1;
     for (;;) {
-        cs->chain = chain = defer_linger_chain;
+        event_conn_state_t *chain = cs->chain = defer_linger_chain;
         if (apr_atomic_casptr((void *)&defer_linger_chain, cs,
                               chain) != chain) {
             /* Race lost, try again */
@@ -886,22 +875,22 @@ static void notify_resume(event_conn_state_t *cs,
     }
 }
 
-/*
- * forcibly close a lingering connection after the lingering period has
- * expired
- * Pre-condition: cs is not in any timeout queue and not in the pollset
- * return: irrelevant (need same prototype as start_lingering_close)
+/* Shutdown the connection in case of timeout, error or resources shortage.
+ * This starts short lingering close if not already there, or directly closes
+ * the connection otherwise.
+ * Pre-condition: nonblocking, can be called from anywhere provided cs is not
+ *                in any timeout queue or in the pollset.
  */
-static int stop_lingering_close(event_conn_state_t *cs)
+static int shutdown_connection(event_conn_state_t *cs)
 {
-    apr_socket_t *csd = ap_get_conn_socket(cs->c);
-    ap_log_error(APLOG_MARK, APLOG_TRACE4, 0, ap_server_conf,
-                 "socket abort in state %i", (int)cs->pub.state);
-    abort_socket_nonblocking(csd);
-    ap_queue_info_push_pool(worker_queue_info, cs->p);
-    if (dying)
-        ap_queue_interrupt_one(worker_queue);
-    return 0;
+    if (cs->pub.state < CONN_STATE_LINGER) {
+        apr_table_setn(cs->c->notes, "short-lingering-close", "1");
+        defer_lingering_close(cs);
+    }
+    else {
+        abort_connection(cs);
+    }
+    return 1;
 }
 
 /*
@@ -973,9 +962,12 @@ static int event_post_read_request(request_rec *r)
 /* Forward declare */
 static void process_lingering_close(event_conn_state_t *cs);
 
-static void update_reqevents_from_sense(event_conn_state_t *cs)
+static void update_reqevents_from_sense(event_conn_state_t *cs, int sense)
 {
-    if (cs->pub.sense == CONN_SENSE_WANT_READ) {
+    if (sense < 0) {
+        sense = cs->pub.sense;
+    }
+    if (sense == CONN_SENSE_WANT_READ) {
         cs->pfd.reqevents = APR_POLLIN | APR_POLLHUP;
     }
     else {
@@ -1020,14 +1012,14 @@ static void process_socket(apr_thread_t *thd, apr_
                                   apr_pool_cleanup_null);
         ap_set_module_config(c->conn_config, &mpm_event_module, cs);
         c->current_thread = thd;
+        c->cs = &cs->pub;
         cs->c = c;
-        c->cs = &(cs->pub);
         cs->p = p;
         cs->sc = ap_get_module_config(ap_server_conf->module_config,
                                       &mpm_event_module);
         cs->pfd.desc_type = APR_POLL_SOCKET;
-        cs->pfd.reqevents = APR_POLLIN;
         cs->pfd.desc.s = sock;
+        update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ);
         pt->type = PT_CSD;
         pt->baton = cs;
         cs->pfd.client_data = pt;
@@ -1168,9 +1160,9 @@ read_request:
             cs->queue_timestamp = apr_time_now();
             notify_suspend(cs);
 
-            update_reqevents_from_sense(cs);
+            update_reqevents_from_sense(cs, -1);
             apr_thread_mutex_lock(timeout_mutex);
-            TO_QUEUE_APPEND(cs->sc->wc_q, cs);
+            TO_QUEUE_APPEND(cs->sc->wc_q, cs, 1);
             rv = apr_pollset_add(event_pollset, &cs->pfd);
             if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
                 AP_DEBUG_ASSERT(0);
@@ -1179,8 +1171,8 @@ read_request:
                 ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03465)
                              "process_socket: apr_pollset_add failure for "
                              "write completion");
-                apr_socket_close(cs->pfd.desc.s);
-                ap_queue_info_push_pool(worker_queue_info, cs->p);
+                abort_connection(cs);
+                signal_threads(ST_GRACEFUL);
             }
             else {
                 apr_thread_mutex_unlock(timeout_mutex);
@@ -1189,8 +1181,7 @@ read_request:
         }
         if (pending != DECLINED
                 || c->aborted
-                || c->keepalive != AP_CONN_KEEPALIVE
-                || listener_may_exit) {
+                || c->keepalive != AP_CONN_KEEPALIVE) {
             cs->pub.state = CONN_STATE_LINGER;
         }
         else if (ap_run_input_pending(c) == OK) {
@@ -1197,9 +1188,12 @@ read_request:
             cs->pub.state = CONN_STATE_READ_REQUEST_LINE;
             goto read_request;
         }
-        else {
+        else if (!listener_may_exit) {
             cs->pub.state = CONN_STATE_CHECK_REQUEST_LINE_READABLE;
         }
+        else {
+            cs->pub.state = CONN_STATE_LINGER;
+        }
     }
 
     if (cs->pub.state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) {
@@ -1217,9 +1211,9 @@ read_request:
         notify_suspend(cs);
 
         /* Add work to pollset. */
-        cs->pfd.reqevents = APR_POLLIN;
+        update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ);
         apr_thread_mutex_lock(timeout_mutex);
-        TO_QUEUE_APPEND(cs->sc->ka_q, cs);
+        TO_QUEUE_APPEND(cs->sc->ka_q, cs, 1);
         rv = apr_pollset_add(event_pollset, &cs->pfd);
         if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
             AP_DEBUG_ASSERT(0);
@@ -1228,8 +1222,8 @@ read_request:
             ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03093)
                          "process_socket: apr_pollset_add failure for "
                          "keep alive");
-            apr_socket_close(cs->pfd.desc.s);
-            ap_queue_info_push_pool(worker_queue_info, cs->p);
+            abort_connection(cs);
+            signal_threads(ST_GRACEFUL);
         }
         else {
             apr_thread_mutex_unlock(timeout_mutex);
@@ -1244,12 +1238,10 @@ read_request:
         return;
     }
 
-    if (cs->pub.state == CONN_STATE_LINGER) {
-        rc = start_lingering_close_blocking(cs);
-    }
-    if (rc == OK && (cs->pub.state == CONN_STATE_LINGER_NORMAL ||
-                     cs->pub.state == CONN_STATE_LINGER_SHORT)) {
+    /* CONN_STATE_LINGER[_*] fall through process_lingering_close() */
+    if (cs->pub.state >= CONN_STATE_LINGER) {
         process_lingering_close(cs);
+        return;
     }
 }
 
@@ -1269,24 +1261,21 @@ static apr_status_t event_resume_suspended (conn_r
     apr_atomic_dec32(&suspended_count);
     c->suspended_baton = NULL;
 
-    if (cs->pub.state == CONN_STATE_LINGER) {
-        int rc = start_lingering_close_blocking(cs);
-        if (rc == OK && (cs->pub.state == CONN_STATE_LINGER_NORMAL ||
-                         cs->pub.state == CONN_STATE_LINGER_SHORT)) {
-            process_lingering_close(cs);
-        }
-    }
-    else {
+    if (cs->pub.state < CONN_STATE_LINGER) {
         cs->queue_timestamp = apr_time_now();
         cs->pub.state = CONN_STATE_WRITE_COMPLETION;
         notify_suspend(cs);
 
-        update_reqevents_from_sense(cs);
+        update_reqevents_from_sense(cs, -1);
         apr_thread_mutex_lock(timeout_mutex);
-        TO_QUEUE_APPEND(cs->sc->wc_q, cs);
+        TO_QUEUE_APPEND(cs->sc->wc_q, cs, 1);
         apr_pollset_add(event_pollset, &cs->pfd);
         apr_thread_mutex_unlock(timeout_mutex);
     }
+    else {
+        cs->pub.state = CONN_STATE_LINGER;
+        process_lingering_close(cs);
+    }
 
     return OK;
 }
@@ -1307,12 +1296,17 @@ static void check_infinite_requests(void)
     }
 }
 
-static void close_listeners(int *closed)
+static int close_listeners(int *closed)
 {
+    ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf,
+                 "clos%s listeners (connection_count=%u)",
+                 *closed ? "ed" : "ing", apr_atomic_read32(&connection_count));
     if (!*closed) {
         int i;
+
         ap_close_listeners_ex(my_bucket->listeners);
-        *closed = 1;
+        *closed = 1; /* once */
+
         dying = 1;
         ap_scoreboard_image->parent[ap_child_slot].quiescing = 1;
         for (i = 0; i < threads_per_child; ++i) {
@@ -1324,7 +1318,10 @@ static void check_infinite_requests(void)
 
         ap_queue_info_free_idle_pools(worker_queue_info);
         ap_queue_interrupt_all(worker_queue);
+
+        return 1;
     }
+    return 0;
 }
 
 static void unblock_signal(int sig)
@@ -1417,11 +1414,16 @@ static apr_status_t push2worker(event_conn_state_t
         /* trash the connection; we couldn't queue the connected
          * socket to a worker
          */
-        if (csd) {
-            abort_socket_nonblocking(csd);
+        if (cs) {
+            shutdown_connection(cs);
         }
-        if (ptrans) {
-            ap_queue_info_push_pool(worker_queue_info, ptrans);
+        else {
+            if (csd) {
+                close_socket_nonblocking(csd);
+            }
+            if (ptrans) {
+                ap_queue_info_push_pool(worker_queue_info, ptrans);
+            }
         }
         signal_threads(ST_GRACEFUL);
     }
@@ -1537,8 +1539,8 @@ static timer_event_t * event_get_timer_event(apr_t
         /* Okay, add sorted by when.. */
         apr_skiplist_insert(timer_skiplist, te);
 
-        /* Cheaply update the overall timers' next expiry according to
-         * this event, if necessary.
+        /* Cheaply update the global timers_next_expiry with this event's
+         * if it expires before.
          */
         next_expiry = timers_next_expiry;
         if (!next_expiry || next_expiry > te->when + EVENT_FUDGE_FACTOR) {
@@ -1657,10 +1659,13 @@ static apr_status_t event_register_poll_callback(a
 }
 
 /*
- * Close socket and clean up if remote closed its end while we were in
- * lingering close. Only to be called in the worker thread, and since it's
- * in immediate call stack, we can afford a comfortable buffer size to
- * consume data quickly.
+ * Flush data and close our side of the connection, then drain incoming data.
+ * If the latter would block put the connection in one of the linger timeout
+ * queues to be called back when ready, and repeat until it's closed by peer.
+ * Only to be called in the worker thread, and since it's in immediate call
+ * stack, we can afford a comfortable buffer size to consume data quickly.
+ * Pre-condition: cs is not in any timeout queue and not in the pollset,
+ *                timeout_mutex is not locked
  */
 #define LINGERING_BUF_SIZE (32 * 1024)
 static void process_lingering_close(event_conn_state_t *cs)
@@ -1671,7 +1676,39 @@ static void process_lingering_close(event_conn_sta
     apr_status_t rv;
     struct timeout_queue *q;
 
-    /* socket is already in non-blocking state */
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c,
+                  "process lingering close from state %i", (int)cs->pub.state);
+    AP_DEBUG_ASSERT(cs->pub.state >= CONN_STATE_LINGER);
+
+    if (cs->pub.state == CONN_STATE_LINGER) {
+        apr_socket_timeout_set(csd, apr_time_from_sec(SECONDS_TO_LINGER));
+        if (ap_start_lingering_close(cs->c)) {
+            notify_suspend(cs);
+            abort_connection(cs);
+            return;
+        }
+
+        /* defer_lingering_close() may have bumped lingering_count already */
+        if (!cs->deferred_linger) {
+            apr_atomic_inc32(&lingering_count);
+        }
+        notify_suspend(cs);
+
+        cs->queue_timestamp = apr_time_now();
+        /*
+         * If some module requested a shortened waiting period, only wait for
+         * 2s (SECONDS_TO_LINGER). This is useful for mitigating certain
+         * DoS attacks.
+         */
+        if (apr_table_get(cs->c->notes, "short-lingering-close")) {
+            cs->pub.state = CONN_STATE_LINGER_SHORT;
+        }
+        else {
+            cs->pub.state = CONN_STATE_LINGER_NORMAL;
+        }
+    }
+
+    apr_socket_timeout_set(csd, 0);
     do {
         nbytes = sizeof(dummybuf);
         rv = apr_socket_recv(csd, dummybuf, &nbytes);
@@ -1678,18 +1715,15 @@ static void process_lingering_close(event_conn_sta
     } while (rv == APR_SUCCESS);
 
     if (!APR_STATUS_IS_EAGAIN(rv)) {
-        rv = apr_socket_close(csd);
-        AP_DEBUG_ASSERT(rv == APR_SUCCESS);
-        ap_queue_info_push_pool(worker_queue_info, cs->p);
+        abort_connection(cs);
         return;
     }
 
-    /* Re-queue the connection to come back when readable */
-    cs->pfd.reqevents = APR_POLLIN;
-    cs->pub.sense = CONN_SENSE_DEFAULT;
+    /* (Re)queue the connection to come back when readable */
+    update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ);
     q = (cs->pub.state == CONN_STATE_LINGER_SHORT) ? short_linger_q : linger_q;
     apr_thread_mutex_lock(timeout_mutex);
-    TO_QUEUE_APPEND(q, cs);
+    TO_QUEUE_APPEND(q, cs, 1);
     rv = apr_pollset_add(event_pollset, &cs->pfd);
     if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
         AP_DEBUG_ASSERT(0);
@@ -1697,20 +1731,18 @@ static void process_lingering_close(event_conn_sta
         apr_thread_mutex_unlock(timeout_mutex);
         ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03092)
                      "process_lingering_close: apr_pollset_add failure");
-        rv = apr_socket_close(cs->pfd.desc.s);
-        AP_DEBUG_ASSERT(rv == APR_SUCCESS);
-        ap_queue_info_push_pool(worker_queue_info, cs->p);
+        abort_connection(cs);
+        signal_threads(ST_GRACEFUL);
         return;
     }
     apr_thread_mutex_unlock(timeout_mutex);
 }
 
-/* call 'func' for all elements of 'q' with timeout less than 'timeout_time'.
+/* call 'func' for all elements of 'q' above 'expiry'.
  * Pre-condition: timeout_mutex must already be locked
  * Post-condition: timeout_mutex will be locked again
  */
-static void process_timeout_queue(struct timeout_queue *q,
-                                  apr_time_t timeout_time,
+static void process_timeout_queue(struct timeout_queue *q, apr_time_t expiry,
                                   int (*func)(event_conn_state_t *))
 {
     apr_uint32_t total = 0, count;
@@ -1730,10 +1762,10 @@ static void process_lingering_close(event_conn_sta
         while (cs != APR_RING_SENTINEL(&qp->head, event_conn_state_t,
                                        timeout_list)) {
             /* Trash the entry if:
-             * - no timeout_time was given (asked for all), or
+             * - no expiry was given (zero means all), or
              * - it expired (according to the queue timeout), or
              * - the system clock skewed in the past: no entry should be
-             *   registered above the given timeout_time (~now) + the queue
+             *   registered above the given expiry (~now) + the queue
              *   timeout, we won't keep any here (eg. for centuries).
              *
              * Otherwise stop, no following entry will match thanks to the
@@ -1740,17 +1772,16 @@ static void process_lingering_close(event_conn_sta
              * single timeout per queue (entries are added to the end!).
              * This allows maintenance in O(1).
              */
-            if (timeout_time
-                    && cs->queue_timestamp + qp->timeout > timeout_time
-                    && cs->queue_timestamp < timeout_time + qp->timeout) {
-                /* Since this is the next expiring of this queue, update the
-                 * overall queues' next expiry if it's later than this one.
+            if (expiry && cs->queue_timestamp + qp->timeout > expiry
+                       && cs->queue_timestamp < expiry + qp->timeout) {
+                /* Since this is the next expiring entry of this queue, update
+                 * the global queues_next_expiry if it's later than this one.
                  */
-                apr_time_t q_expiry = cs->queue_timestamp + qp->timeout;
+                apr_time_t elem_expiry = cs->queue_timestamp + qp->timeout;
                 apr_time_t next_expiry = queues_next_expiry;
                 if (!next_expiry
-                        || next_expiry > q_expiry + TIMEOUT_FUDGE_FACTOR) {
-                    queues_next_expiry = q_expiry;
+                        || next_expiry > elem_expiry + TIMEOUT_FUDGE_FACTOR) {
+                    queues_next_expiry = elem_expiry;
                 }
                 break;
             }
@@ -1790,18 +1821,17 @@ static void process_lingering_close(event_conn_sta
     apr_thread_mutex_lock(timeout_mutex);
 }
 
-static void process_keepalive_queue(apr_time_t timeout_time)
+static void process_keepalive_queue(apr_time_t expiry)
 {
     /* If all workers are busy, we kill older keep-alive connections so
      * that they may connect to another process.
      */
-    if (!timeout_time) {
+    if (!expiry && *keepalive_q->total) {
         ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
-                     "All workers are busy or dying, will close %u "
+                     "All workers are busy or dying, will kill %u "
                      "keep-alive connections", *keepalive_q->total);
     }
-    process_timeout_queue(keepalive_q, timeout_time,
-                          start_lingering_close_nonblocking);
+    process_timeout_queue(keepalive_q, expiry, shutdown_connection);
 }
 
 static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
@@ -1831,9 +1861,9 @@ static void * APR_THREAD_FUNC listener_thread(apr_
         timer_event_t *te;
         const apr_pollfd_t *out_pfd;
         apr_int32_t num = 0;
-        apr_interval_time_t timeout_interval;
+        apr_interval_time_t timeout;
         socket_callback_baton_t *user_chain;
-        apr_time_t now, timeout_time;
+        apr_time_t now, expiry = -1;
         int workers_were_busy = 0;
 
         if (conns_this_child <= 0)
@@ -1840,10 +1870,20 @@ static void * APR_THREAD_FUNC listener_thread(apr_
             check_infinite_requests();
 
         if (listener_may_exit) {
-            close_listeners(&closed);
+            int first_close = close_listeners(&closed);
+
             if (terminate_mode == ST_UNGRACEFUL
                 || apr_atomic_read32(&connection_count) == 0)
                 break;
+
+            /* Don't wait in poll() for the first close (i.e. dying now), we
+             * want to maintain the queues and schedule defer_linger_chain ASAP
+             * to kill kept-alive connection and shutdown the workers and child
+             * faster.
+             */
+            if (first_close) {
+                goto do_maintenance;
+            }
         }
 
         if (APLOGtrace6(ap_server_conf)) {
@@ -1857,8 +1897,8 @@ static void * APR_THREAD_FUNC listener_thread(apr_
                              "keep-alive: %d lingering: %d suspended: %u)",
                              apr_atomic_read32(&connection_count),
                              apr_atomic_read32(&clogged_count),
-                             *(volatile apr_uint32_t*)write_completion_q->total,
-                             *(volatile apr_uint32_t*)keepalive_q->total,
+                             apr_atomic_read32(write_completion_q->total),
+                             apr_atomic_read32(keepalive_q->total),
                              apr_atomic_read32(&lingering_count),
                              apr_atomic_read32(&suspended_count));
                 if (dying) {
@@ -1886,18 +1926,18 @@ static void * APR_THREAD_FUNC listener_thread(apr_
          * up occurs, otherwise periodic checks (maintenance, shutdown, ...)
          * must be performed.
          */
-        timeout_interval = -1;
+        timeout = -1;
 
         /* Push expired timers to a worker, the first remaining one determines
          * the maximum time to poll() below, if any.
          */
-        timeout_time = timers_next_expiry;
-        if (timeout_time && timeout_time < now + EVENT_FUDGE_FACTOR) {
+        expiry = timers_next_expiry;
+        if (expiry && expiry < now + EVENT_FUDGE_FACTOR) {
             apr_thread_mutex_lock(g_timer_skiplist_mtx);
             while ((te = apr_skiplist_peek(timer_skiplist))) {
                 if (te->when > now + EVENT_FUDGE_FACTOR) {
                     timers_next_expiry = te->when;
-                    timeout_interval = te->when - now;
+                    timeout = te->when - now;
                     break;
                 }
                 apr_skiplist_pop(timer_skiplist, NULL);
@@ -1921,37 +1961,40 @@ static void * APR_THREAD_FUNC listener_thread(apr_
         }
 
         /* Same for queues, use their next expiry, if any. */
-        timeout_time = queues_next_expiry;
-        if (timeout_time
-                && (timeout_interval < 0
-                    || timeout_time <= now
-                    || timeout_interval > timeout_time - now)) {
-            timeout_interval = timeout_time > now ? timeout_time - now : 1;
+        expiry = queues_next_expiry;
+        if (expiry
+                && (timeout < 0
+                    || expiry <= now
+                    || timeout > expiry - now)) {
+            timeout = expiry > now ? expiry - now : 0;
         }
 
         /* When non-wakeable, don't wait more than 100 ms, in any case. */
 #define NON_WAKEABLE_POLL_TIMEOUT apr_time_from_msec(100)
         if (!listener_is_wakeable
-                && (timeout_interval < 0
-                    || timeout_interval > NON_WAKEABLE_POLL_TIMEOUT)) {
-            timeout_interval = NON_WAKEABLE_POLL_TIMEOUT;
+                && (timeout < 0
+                    || timeout > NON_WAKEABLE_POLL_TIMEOUT)) {
+            timeout = NON_WAKEABLE_POLL_TIMEOUT;
         }
+        else if (timeout > 0) {
+            /* apr_pollset_poll() might round down the timeout to milliseconds,
+             * let's forcibly round up here to never return before the timeout.
+             */
+            timeout = apr_time_from_msec(
+                apr_time_as_msec(timeout + apr_time_from_msec(1) - 1)
+            );
+        }
 
-        rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd);
+        ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf,
+                     "poll()ing with timeout=%" APR_TIME_T_FMT
+                     " queues_timeout=%" APR_TIME_T_FMT
+                     " timers_timeout=%" APR_TIME_T_FMT,
+                     timeout, queues_next_expiry - now,
+                     timers_next_expiry - now);
+
+        rc = apr_pollset_poll(event_pollset, timeout, &num, &out_pfd);
         if (rc != APR_SUCCESS) {
-            if (APR_STATUS_IS_EINTR(rc)) {
-                /* Woken up, if we are exiting or listeners are disabled we
-                 * must fall through to kill kept-alive connections or test
-                 * whether listeners should be re-enabled. Otherwise we only
-                 * need to update timeouts (logic is above, so simply restart
-                 * the loop).
-                 */
-                if (!listener_may_exit && !listeners_disabled()) {
-                    continue;
-                }
-                timeout_time = 0;
-            }
-            else if (!APR_STATUS_IS_TIMEUP(rc)) {
+            if (!APR_STATUS_IS_EINTR(rc) && !APR_STATUS_IS_TIMEUP(rc)) {
                 ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf,
                              APLOGNO(03267)
                              "apr_pollset_poll failed.  Attempting to "
@@ -1961,13 +2004,21 @@ static void * APR_THREAD_FUNC listener_thread(apr_
             num = 0;
         }
 
-        if (listener_may_exit) {
-            close_listeners(&closed);
-            if (terminate_mode == ST_UNGRACEFUL
-                || apr_atomic_read32(&connection_count) == 0)
-                break;
+        if (APLOGtrace8(ap_server_conf)) {
+            now = apr_time_now();
+            ap_log_error(APLOG_MARK, APLOG_TRACE8, rc, ap_server_conf,
+                         "poll()ed with num=%u exit=%d/%d conns=%d"
+                         " queues_timeout=%" APR_TIME_T_FMT
+                         " timers_timeout=%" APR_TIME_T_FMT,
+                         num, listener_may_exit, dying,
+                         apr_atomic_read32(&connection_count),
+                         queues_next_expiry - now, timers_next_expiry - now);
         }
 
+        /* XXX possible optimization: stash the current time for use as
+         * r->request_time for new requests or queues maintenance
+         */
+
         for (user_chain = NULL; num; --num, ++out_pfd) {
             listener_poll_type *pt = (listener_poll_type *) out_pfd->client_data;
             if (pt->type == PT_CSD) {
@@ -2020,7 +2071,8 @@ static void * APR_THREAD_FUNC listener_thread(apr_
                         AP_DEBUG_ASSERT(0);
                         ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
                                      APLOGNO(03094) "pollset remove failed");
-                        start_lingering_close_nonblocking(cs);
+                        abort_connection(cs);
+                        signal_threads(ST_GRACEFUL);
                         break;
                     }
 
@@ -2027,18 +2079,13 @@ static void * APR_THREAD_FUNC listener_thread(apr_
                     /* If we don't get a worker immediately (nonblocking), we
                      * close the connection; the client can re-connect to a
                      * different process for keepalive, and for lingering close
-                     * the connection will be reset so the choice is to favor
+                     * the connection will be killed so the choice is to favor
                      * incoming/alive connections.
                      */
                     get_worker(&have_idle_worker, blocking,
                                &workers_were_busy);
                     if (!have_idle_worker) {
-                        if (remove_from_q == cs->sc->ka_q) {
-                            start_lingering_close_nonblocking(cs);
-                        }
-                        else {
-                            stop_lingering_close(cs);
-                        }
+                        shutdown_connection(cs);
                     }
                     else if (push2worker(cs, NULL, NULL) == APR_SUCCESS) {
                         have_idle_worker = 0;
@@ -2174,18 +2221,20 @@ static void * APR_THREAD_FUNC listener_thread(apr_
             push_timer2worker(te);
         }
 
-        /* XXX possible optimization: stash the current time for use as
-         * r->request_time for new requests
-         */
-        /* We process the timeout queues here only when their overall next
-         * expiry (read once above) is over. This happens accurately since
+do_maintenance:
+        /* We process the timeout queues here only when the global
+         * queues_next_expiry is passed. This happens accurately since
          * adding to the queues (in workers) can only decrease this expiry,
          * while latest ones are only taken into account here (in listener)
          * during queues' processing, with the lock held. This works both
          * with and without wake-ability.
          */
-        if (timeout_time && timeout_time < (now = apr_time_now())) {
-            /* handle timed out sockets */
+        expiry = queues_next_expiry;
+        if (expiry && expiry < (now = apr_time_now())) {
+            ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf,
+                         "processing queues: expiry=%" APR_TIME_T_FMT,
+                         expiry - now);
+
             apr_thread_mutex_lock(timeout_mutex);
 
             /* Processing all the queues below will recompute this. */
@@ -2198,26 +2247,43 @@ static void * APR_THREAD_FUNC listener_thread(apr_
             else {
                 process_keepalive_queue(now);
             }
+            ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf,
+                         "ka_q processed: expiry=%" APR_TIME_T_FMT,
+                         queues_next_expiry > now ? queues_next_expiry - now : 0);
+
             /* Step 2: write completion timeouts */
             process_timeout_queue(write_completion_q, now,
-                                  start_lingering_close_nonblocking);
+                                  defer_lingering_close);
+            ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf,
+                         "wc_q processed: expiry=%" APR_TIME_T_FMT,
+                         queues_next_expiry > now ? queues_next_expiry - now : 0);
+
             /* Step 3: (normal) lingering close completion timeouts */
-            process_timeout_queue(linger_q, now,
-                                  stop_lingering_close);
+            if (dying && linger_q->timeout > short_linger_q->timeout) {
+                /* Dying, force short timeout for normal lingering close */
+                linger_q->timeout = short_linger_q->timeout;
+            }
+            process_timeout_queue(linger_q, now, shutdown_connection);
+            ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf,
+                         "linger_q processed: expiry=%" APR_TIME_T_FMT,
+                         queues_next_expiry > now ? queues_next_expiry - now : 0);
+
             /* Step 4: (short) lingering close completion timeouts */
-            process_timeout_queue(short_linger_q, now,
-                                  stop_lingering_close);
+            process_timeout_queue(short_linger_q, now, shutdown_connection);
+            ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf,
+                         "short_linger_q processed: expiry=%" APR_TIME_T_FMT,
+                         queues_next_expiry > now ? queues_next_expiry - now : 0);
 
             apr_thread_mutex_unlock(timeout_mutex);
 
-            ps->keep_alive = *(volatile apr_uint32_t*)keepalive_q->total;
-            ps->write_completion = *(volatile apr_uint32_t*)write_completion_q->total;
+            ps->keep_alive = apr_atomic_read32(keepalive_q->total);
+            ps->write_completion = apr_atomic_read32(write_completion_q->total);
             ps->connections = apr_atomic_read32(&connection_count);
             ps->suspended = apr_atomic_read32(&suspended_count);
             ps->lingering_close = apr_atomic_read32(&lingering_count);
         }
         else if ((workers_were_busy || dying)
-                 && *(volatile apr_uint32_t*)keepalive_q->total) {
+                 && apr_atomic_read32(keepalive_q->total)) {
             apr_thread_mutex_lock(timeout_mutex);
             process_keepalive_queue(0); /* kill'em all \m/ */
             apr_thread_mutex_unlock(timeout_mutex);
@@ -2249,7 +2315,6 @@ static void * APR_THREAD_FUNC listener_thread(apr_
         }
     } /* listener main loop */
 
-    close_listeners(&closed);
     ap_queue_term(worker_queue);
 
     apr_thread_exit(thd, APR_SUCCESS);
@@ -2397,15 +2462,9 @@ static void *APR_THREAD_FUNC worker_thread(apr_thr
                 continue;
             }
             cs->chain = NULL;
+            AP_DEBUG_ASSERT(cs->pub.state == CONN_STATE_LINGER);
 
             worker_sockets[thread_slot] = csd = cs->pfd.desc.s;
-#ifdef AP_DEBUG
-            rv = apr_socket_timeout_set(csd, SECONDS_TO_LINGER);
-            AP_DEBUG_ASSERT(rv == APR_SUCCESS);
-#else
-            apr_socket_timeout_set(csd, SECONDS_TO_LINGER);
-#endif
-            cs->pub.state = CONN_STATE_LINGER;
             process_socket(thd, cs->p, csd, cs, process_slot, thread_slot);
             worker_sockets[thread_slot] = NULL;
         }
@@ -2571,7 +2630,7 @@ static void setup_threads_runtime(void)
         AP_DEBUG_ASSERT(i < num_listensocks);
         pfd = &listener_pollfd[i];
 
-        pfd->reqevents = APR_POLLIN;
+        pfd->reqevents = APR_POLLIN | APR_POLLHUP | APR_POLLERR;
         pfd->desc_type = APR_POLL_SOCKET;
         pfd->desc.s = lr->sd;
 
@@ -2703,13 +2762,17 @@ static void join_workers(apr_thread_t * listener,
          */
 
         iter = 0;
-        while (iter < 10 && !dying) {
+        while (!dying) {
+            apr_sleep(apr_time_from_msec(500));
+            if (dying || ++iter > 10) {
+                break;
+            }
             /* listener has not stopped accepting yet */
-            apr_sleep(apr_time_make(0, 500000));
+            ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
+                         "listener has not stopped accepting yet (%d iter)", iter);
             wakeup_listener();
-            ++iter;
         }
-        if (iter >= 10) {
+        if (iter > 10) {
             ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(00475)
                          "the listener thread didn't stop accepting");
         }
@@ -2918,7 +2981,13 @@ static void child_main(int child_num_arg, int chil
          *   If the worker hasn't exited, then this blocks until
          *   they have (then cleans up).
          */
+        ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
+                     "%s termination received, joining workers",
+                     rv == AP_MPM_PODX_GRACEFUL ? "graceful" : "ungraceful");
         join_workers(ts->listener, threads);
+        ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
+                     "%s termination, workers joined, exiting",
+                     rv == AP_MPM_PODX_GRACEFUL ? "graceful" : "ungraceful");
     }
 
     free(threads);
Index: server/connection.c
===================================================================
--- server/connection.c	(revision 1891501)
+++ server/connection.c	(working copy)
@@ -139,18 +139,12 @@ AP_DECLARE(int) ap_start_lingering_close(conn_rec
     ap_flush_conn(c);
 
 #ifdef NO_LINGCLOSE
-    apr_socket_close(csd);
     return 1;
 #else
     /* Shut down the socket for write, which will send a FIN
      * to the peer.
      */
-    if (c->aborted
-            || apr_socket_shutdown(csd, APR_SHUTDOWN_WRITE) != APR_SUCCESS) {
-        apr_socket_close(csd);
-        return 1;
-    }
-    return 0;
+    return (c->aborted || apr_socket_shutdown(csd, APR_SHUTDOWN_WRITE));
 #endif
 }
 
@@ -162,6 +156,7 @@ AP_DECLARE(void) ap_lingering_close(conn_rec *c)
     apr_socket_t *csd = ap_get_conn_socket(c);
 
     if (ap_start_lingering_close(c)) {
+        apr_socket_close(csd);
         return;
     }
 
Index: server/mpm_fdqueue.c
===================================================================
--- server/mpm_fdqueue.c	(revision 1891501)
+++ server/mpm_fdqueue.c	(working copy)
@@ -493,6 +493,10 @@ static apr_status_t queue_interrupt(fd_queue_t *qu
 {
     apr_status_t rv;
 
+    if (queue->terminated) {
+        return APR_EOF;
+    }
+
     if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
         return rv;
     }
Index: server/mpm_fdqueue.h
===================================================================
--- server/mpm_fdqueue.h	(revision 1891501)
+++ server/mpm_fdqueue.h	(working copy)
@@ -83,7 +83,7 @@ struct fd_queue_t
     unsigned int out;
     apr_thread_mutex_t *one_big_mutex;
     apr_thread_cond_t *not_empty;
-    int terminated;
+    volatile int terminated;
 };
 typedef struct fd_queue_t fd_queue_t;
 

Reply via email to