On Tue, Jul 13, 2021 at 4:16 PM Stefan Eissing
<[email protected]> wrote:
>
> In Yann's v3 of the patch, it triggers only when I stop the server while the
> test case is ongoing,
OK thanks, I have a new v6 which should address this and also still
call pre_close hooks in any case.
The patch is a bit "verbose" (some cosmetics/comments changes that
helped my workflow, sorry about that) but in the end I think it's
simpler to maintain (as Eric asked?) which may be worth it..
>
> [Tue Jul 13 14:12:18.800125 2021] [mpm_event:trace4] [pid 47786:tid
> 123145484374016] event.c(563): closing socket 15/7f95ba07b4a0
> [Tue Jul 13 14:12:19.551039 2021] [mpm_event:trace4] [pid 47787:tid
> 123145488666624] event.c(1322): closing listeners (connection_count=0)
> [Tue Jul 13 14:12:19.551146 2021] [mpm_event:trace4] [pid 47786:tid
> 123145497251840] event.c(1322): closing listeners (connection_count=1)
> [Tue Jul 13 14:12:19.551176 2021] [mpm_event:trace4] [pid 47788:tid
> 123145488666624] event.c(1322): closing listeners (connection_count=0)
> [Tue Jul 13 14:12:19.551209 2021] [mpm_event:trace4] [pid 47786:tid
> 4476272128] event.c(563): closing socket 15/7f95ba07b4b0
> [Tue Jul 13 14:12:19.552305 2021] [mpm_event:trace4] [pid 47792:tid
> 123145488666624] event.c(1322): closing listeners (connection_count=0)
> [Tue Jul 13 14:12:19.805283 2021] [mpm_event:trace4] [pid 47786:tid
> 123145484910592] event.c(563): closing socket -1/7f95ba07b4b0
> [Tue Jul 13 14:12:19.805289 2021] [mpm_event:error] [pid 47786:tid
> 123145484910592] (9)Bad file descriptor: AH00468: error closing socket
> -1/7f95ba07b4b0
>
> You can see that socket 7f95ba07b4b0 is closed twice.
Indeed, where does the first close come from (the caller)?
If you see this with the new patch (LogLevel mpm_event:trace8), can
you confirm that it's from close_worker_sockets() (i.e. for ungraceful
restart) only?
Cheers;
Yann.
Index: server/mpm/event/event.c
===================================================================
--- server/mpm/event/event.c (revision 1891501)
+++ server/mpm/event/event.c (working copy)
@@ -254,6 +254,8 @@ struct event_conn_state_t {
conn_state_t pub;
/** chaining in defer_linger_chain */
struct event_conn_state_t *chain;
+ /** Is lingering close from defer_lingering_close()? */
+ int deferred_linger;
};
APR_RING_HEAD(timeout_head_t, event_conn_state_t);
@@ -289,9 +291,10 @@ static volatile apr_time_t queues_next_expiry;
* Macros for accessing struct timeout_queue.
* For TO_QUEUE_APPEND and TO_QUEUE_REMOVE, timeout_mutex must be held.
*/
-static void TO_QUEUE_APPEND(struct timeout_queue *q, event_conn_state_t *el)
+static void TO_QUEUE_APPEND(struct timeout_queue *q, event_conn_state_t *el,
+ int do_wakeup)
{
- apr_time_t q_expiry;
+ apr_time_t elem_expiry;
apr_time_t next_expiry;
APR_RING_INSERT_TAIL(&q->head, el, event_conn_state_t, timeout_list);
@@ -298,16 +301,16 @@ static volatile apr_time_t queues_next_expiry;
++*q->total;
++q->count;
- /* Cheaply update the overall queues' next expiry according to the
- * first entry of this queue (oldest), if necessary.
+ /* Cheaply update the global queues_next_expiry with the one of the
+ * first entry of this queue (oldest) if it expires before.
*/
el = APR_RING_FIRST(&q->head);
- q_expiry = el->queue_timestamp + q->timeout;
+ elem_expiry = el->queue_timestamp + q->timeout;
next_expiry = queues_next_expiry;
- if (!next_expiry || next_expiry > q_expiry + TIMEOUT_FUDGE_FACTOR) {
- queues_next_expiry = q_expiry;
+ if (!next_expiry || next_expiry > elem_expiry + TIMEOUT_FUDGE_FACTOR) {
+ queues_next_expiry = elem_expiry;
/* Unblock the poll()ing listener for it to update its timeout. */
- if (listener_is_wakeable) {
+ if (do_wakeup && listener_is_wakeable) {
apr_pollset_wakeup(event_pollset);
}
}
@@ -554,9 +557,20 @@ static APR_INLINE int connections_above_limit(int
return 1;
}
-static void abort_socket_nonblocking(apr_socket_t *csd)
+static void close_socket_nonblocking_(apr_socket_t *csd,
+ const char *from, int line)
{
apr_status_t rv;
+ apr_os_sock_t fd = -1;
+
+ /* close_worker_sockets() may have closed it already */
+ rv = apr_os_sock_get(&fd, csd);
+ ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf,
+ "closing socket %i/%pp from %s:%i", (int)fd, csd, from, line);
+ if (rv == APR_SUCCESS && fd == -1) {
+ return;
+ }
+
apr_socket_timeout_set(csd, 0);
rv = apr_socket_close(csd);
if (rv != APR_SUCCESS) {
@@ -565,6 +579,8 @@ static APR_INLINE int connections_above_limit(int
AP_DEBUG_ASSERT(0);
}
}
+#define close_socket_nonblocking(csd) \
+ close_socket_nonblocking_(csd, __FUNCTION__, __LINE__)
static void close_worker_sockets(void)
{
@@ -573,26 +589,16 @@ static void close_worker_sockets(void)
apr_socket_t *csd = worker_sockets[i];
if (csd) {
worker_sockets[i] = NULL;
- abort_socket_nonblocking(csd);
+ close_socket_nonblocking(csd);
}
}
- for (;;) {
- event_conn_state_t *cs = defer_linger_chain;
- if (!cs) {
- break;
- }
- if (apr_atomic_casptr((void *)&defer_linger_chain, cs->chain,
- cs) != cs) {
- /* Race lost, try again */
- continue;
- }
- cs->chain = NULL;
- abort_socket_nonblocking(cs->pfd.desc.s);
- }
}
static void wakeup_listener(void)
{
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
+ "wake up listener%s", listener_may_exit ? " again" : "");
+
listener_may_exit = 1;
disable_listensocks();
@@ -779,7 +785,10 @@ static apr_status_t decrement_connection_count(voi
{
int is_last_connection;
event_conn_state_t *cs = cs_;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c,
+ "cleanup connection from state %i", (int)cs->pub.state);
switch (cs->pub.state) {
+ case CONN_STATE_LINGER:
case CONN_STATE_LINGER_NORMAL:
case CONN_STATE_LINGER_SHORT:
apr_atomic_dec32(&lingering_count);
@@ -800,6 +809,10 @@ static apr_status_t decrement_connection_count(voi
|| (listeners_disabled() && !connections_above_limit(NULL)))) {
apr_pollset_wakeup(event_pollset);
}
+ if (dying) {
+ /* Help worker_thread_should_exit_early() */
+ ap_queue_interrupt_one(worker_queue);
+ }
return APR_SUCCESS;
}
@@ -817,66 +830,42 @@ static void notify_resume(event_conn_state_t *cs,
ap_run_resume_connection(cs->c, cs->r);
}
-/*
- * Close our side of the connection, flushing data to the client first.
- * Pre-condition: cs is not in any timeout queue and not in the pollset,
- * timeout_mutex is not locked
- * return: 0 if connection is fully closed,
- * 1 if connection is lingering
- * May only be called by worker thread.
+/* Close the connection and release its resources (ptrans), either because an
+ * unrecoverable error occured (queues or pollset add/remove) or more usually
+ * if lingering close timed out.
+ * Pre-condition: nonblocking, can be called from anywhere provided cs is not
+ * in any timeout queue or in the pollset.
*/
-static int start_lingering_close_blocking(event_conn_state_t *cs)
+static void abort_connection(event_conn_state_t *cs)
{
- apr_socket_t *csd = cs->pfd.desc.s;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c,
+ "abort connection from state %i", (int)cs->pub.state);
- if (ap_start_lingering_close(cs->c)) {
- notify_suspend(cs);
- apr_socket_close(csd);
- ap_queue_info_push_pool(worker_queue_info, cs->p);
- return DONE;
- }
-
-#ifdef AP_DEBUG
- {
- apr_status_t rv;
- rv = apr_socket_timeout_set(csd, 0);
- AP_DEBUG_ASSERT(rv == APR_SUCCESS);
- }
-#else
- apr_socket_timeout_set(csd, 0);
-#endif
-
- cs->queue_timestamp = apr_time_now();
- /*
- * If some module requested a shortened waiting period, only wait for
- * 2s (SECONDS_TO_LINGER). This is useful for mitigating certain
- * DoS attacks.
- */
- if (apr_table_get(cs->c->notes, "short-lingering-close")) {
- cs->pub.state = CONN_STATE_LINGER_SHORT;
- }
- else {
- cs->pub.state = CONN_STATE_LINGER_NORMAL;
- }
- apr_atomic_inc32(&lingering_count);
- notify_suspend(cs);
-
- return OK;
+ close_socket_nonblocking(cs->pfd.desc.s);
+ ap_queue_info_push_pool(worker_queue_info, cs->p);
}
/*
* Defer flush and close of the connection by adding it to defer_linger_chain,
* for a worker to grab it and do the job (should that be blocking).
- * Pre-condition: cs is not in any timeout queue and not in the pollset,
- * timeout_mutex is not locked
- * return: 1 connection is alive (but aside and about to linger)
- * May be called by listener thread.
+ * Pre-condition: nonblocking, can be called from anywhere provided cs is not
+ * in any timeout queue or in the pollset.
*/
-static int start_lingering_close_nonblocking(event_conn_state_t *cs)
+static int defer_lingering_close(event_conn_state_t *cs)
{
- event_conn_state_t *chain;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c,
+ "defer lingering close from state %i", (int)cs->pub.state);
+
+ /* The connection is not shutdown() yet strictly speaking, but it's not
+ * in any queue nor handled by a worker either (will be very soon), so
+ * to account for it somewhere we bump lingering_count now (and set
+ * deferred_linger for process_lingering_close() to know).
+ */
+ cs->pub.state = CONN_STATE_LINGER;
+ apr_atomic_inc32(&lingering_count);
+ cs->deferred_linger = 1;
for (;;) {
- cs->chain = chain = defer_linger_chain;
+ event_conn_state_t *chain = cs->chain = defer_linger_chain;
if (apr_atomic_casptr((void *)&defer_linger_chain, cs,
chain) != chain) {
/* Race lost, try again */
@@ -886,22 +875,22 @@ static void notify_resume(event_conn_state_t *cs,
}
}
-/*
- * forcibly close a lingering connection after the lingering period has
- * expired
- * Pre-condition: cs is not in any timeout queue and not in the pollset
- * return: irrelevant (need same prototype as start_lingering_close)
+/* Shutdown the connection in case of timeout, error or resources shortage.
+ * This starts short lingering close if not already there, or directly closes
+ * the connection otherwise.
+ * Pre-condition: nonblocking, can be called from anywhere provided cs is not
+ * in any timeout queue or in the pollset.
*/
-static int stop_lingering_close(event_conn_state_t *cs)
+static int shutdown_connection(event_conn_state_t *cs)
{
- apr_socket_t *csd = ap_get_conn_socket(cs->c);
- ap_log_error(APLOG_MARK, APLOG_TRACE4, 0, ap_server_conf,
- "socket abort in state %i", (int)cs->pub.state);
- abort_socket_nonblocking(csd);
- ap_queue_info_push_pool(worker_queue_info, cs->p);
- if (dying)
- ap_queue_interrupt_one(worker_queue);
- return 0;
+ if (cs->pub.state < CONN_STATE_LINGER) {
+ apr_table_setn(cs->c->notes, "short-lingering-close", "1");
+ defer_lingering_close(cs);
+ }
+ else {
+ abort_connection(cs);
+ }
+ return 1;
}
/*
@@ -973,9 +962,12 @@ static int event_post_read_request(request_rec *r)
/* Forward declare */
static void process_lingering_close(event_conn_state_t *cs);
-static void update_reqevents_from_sense(event_conn_state_t *cs)
+static void update_reqevents_from_sense(event_conn_state_t *cs, int sense)
{
- if (cs->pub.sense == CONN_SENSE_WANT_READ) {
+ if (sense < 0) {
+ sense = cs->pub.sense;
+ }
+ if (sense == CONN_SENSE_WANT_READ) {
cs->pfd.reqevents = APR_POLLIN | APR_POLLHUP;
}
else {
@@ -1020,14 +1012,14 @@ static void process_socket(apr_thread_t *thd, apr_
apr_pool_cleanup_null);
ap_set_module_config(c->conn_config, &mpm_event_module, cs);
c->current_thread = thd;
+ c->cs = &cs->pub;
cs->c = c;
- c->cs = &(cs->pub);
cs->p = p;
cs->sc = ap_get_module_config(ap_server_conf->module_config,
&mpm_event_module);
cs->pfd.desc_type = APR_POLL_SOCKET;
- cs->pfd.reqevents = APR_POLLIN;
cs->pfd.desc.s = sock;
+ update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ);
pt->type = PT_CSD;
pt->baton = cs;
cs->pfd.client_data = pt;
@@ -1168,9 +1160,9 @@ read_request:
cs->queue_timestamp = apr_time_now();
notify_suspend(cs);
- update_reqevents_from_sense(cs);
+ update_reqevents_from_sense(cs, -1);
apr_thread_mutex_lock(timeout_mutex);
- TO_QUEUE_APPEND(cs->sc->wc_q, cs);
+ TO_QUEUE_APPEND(cs->sc->wc_q, cs, 1);
rv = apr_pollset_add(event_pollset, &cs->pfd);
if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
AP_DEBUG_ASSERT(0);
@@ -1179,8 +1171,8 @@ read_request:
ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03465)
"process_socket: apr_pollset_add failure for "
"write completion");
- apr_socket_close(cs->pfd.desc.s);
- ap_queue_info_push_pool(worker_queue_info, cs->p);
+ abort_connection(cs);
+ signal_threads(ST_GRACEFUL);
}
else {
apr_thread_mutex_unlock(timeout_mutex);
@@ -1189,8 +1181,7 @@ read_request:
}
if (pending != DECLINED
|| c->aborted
- || c->keepalive != AP_CONN_KEEPALIVE
- || listener_may_exit) {
+ || c->keepalive != AP_CONN_KEEPALIVE) {
cs->pub.state = CONN_STATE_LINGER;
}
else if (ap_run_input_pending(c) == OK) {
@@ -1197,9 +1188,12 @@ read_request:
cs->pub.state = CONN_STATE_READ_REQUEST_LINE;
goto read_request;
}
- else {
+ else if (!listener_may_exit) {
cs->pub.state = CONN_STATE_CHECK_REQUEST_LINE_READABLE;
}
+ else {
+ cs->pub.state = CONN_STATE_LINGER;
+ }
}
if (cs->pub.state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) {
@@ -1217,9 +1211,9 @@ read_request:
notify_suspend(cs);
/* Add work to pollset. */
- cs->pfd.reqevents = APR_POLLIN;
+ update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ);
apr_thread_mutex_lock(timeout_mutex);
- TO_QUEUE_APPEND(cs->sc->ka_q, cs);
+ TO_QUEUE_APPEND(cs->sc->ka_q, cs, 1);
rv = apr_pollset_add(event_pollset, &cs->pfd);
if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
AP_DEBUG_ASSERT(0);
@@ -1228,8 +1222,8 @@ read_request:
ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03093)
"process_socket: apr_pollset_add failure for "
"keep alive");
- apr_socket_close(cs->pfd.desc.s);
- ap_queue_info_push_pool(worker_queue_info, cs->p);
+ abort_connection(cs);
+ signal_threads(ST_GRACEFUL);
}
else {
apr_thread_mutex_unlock(timeout_mutex);
@@ -1244,12 +1238,10 @@ read_request:
return;
}
- if (cs->pub.state == CONN_STATE_LINGER) {
- rc = start_lingering_close_blocking(cs);
- }
- if (rc == OK && (cs->pub.state == CONN_STATE_LINGER_NORMAL ||
- cs->pub.state == CONN_STATE_LINGER_SHORT)) {
+ /* CONN_STATE_LINGER[_*] fall through process_lingering_close() */
+ if (cs->pub.state >= CONN_STATE_LINGER) {
process_lingering_close(cs);
+ return;
}
}
@@ -1269,24 +1261,21 @@ static apr_status_t event_resume_suspended (conn_r
apr_atomic_dec32(&suspended_count);
c->suspended_baton = NULL;
- if (cs->pub.state == CONN_STATE_LINGER) {
- int rc = start_lingering_close_blocking(cs);
- if (rc == OK && (cs->pub.state == CONN_STATE_LINGER_NORMAL ||
- cs->pub.state == CONN_STATE_LINGER_SHORT)) {
- process_lingering_close(cs);
- }
- }
- else {
+ if (cs->pub.state < CONN_STATE_LINGER) {
cs->queue_timestamp = apr_time_now();
cs->pub.state = CONN_STATE_WRITE_COMPLETION;
notify_suspend(cs);
- update_reqevents_from_sense(cs);
+ update_reqevents_from_sense(cs, -1);
apr_thread_mutex_lock(timeout_mutex);
- TO_QUEUE_APPEND(cs->sc->wc_q, cs);
+ TO_QUEUE_APPEND(cs->sc->wc_q, cs, 1);
apr_pollset_add(event_pollset, &cs->pfd);
apr_thread_mutex_unlock(timeout_mutex);
}
+ else {
+ cs->pub.state = CONN_STATE_LINGER;
+ process_lingering_close(cs);
+ }
return OK;
}
@@ -1307,12 +1296,17 @@ static void check_infinite_requests(void)
}
}
-static void close_listeners(int *closed)
+static int close_listeners(int *closed)
{
+ ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf,
+ "clos%s listeners (connection_count=%u)",
+ *closed ? "ed" : "ing", apr_atomic_read32(&connection_count));
if (!*closed) {
int i;
+
ap_close_listeners_ex(my_bucket->listeners);
- *closed = 1;
+ *closed = 1; /* once */
+
dying = 1;
ap_scoreboard_image->parent[ap_child_slot].quiescing = 1;
for (i = 0; i < threads_per_child; ++i) {
@@ -1324,7 +1318,10 @@ static void check_infinite_requests(void)
ap_queue_info_free_idle_pools(worker_queue_info);
ap_queue_interrupt_all(worker_queue);
+
+ return 1;
}
+ return 0;
}
static void unblock_signal(int sig)
@@ -1417,11 +1414,16 @@ static apr_status_t push2worker(event_conn_state_t
/* trash the connection; we couldn't queue the connected
* socket to a worker
*/
- if (csd) {
- abort_socket_nonblocking(csd);
+ if (cs) {
+ shutdown_connection(cs);
}
- if (ptrans) {
- ap_queue_info_push_pool(worker_queue_info, ptrans);
+ else {
+ if (csd) {
+ close_socket_nonblocking(csd);
+ }
+ if (ptrans) {
+ ap_queue_info_push_pool(worker_queue_info, ptrans);
+ }
}
signal_threads(ST_GRACEFUL);
}
@@ -1537,8 +1539,8 @@ static timer_event_t * event_get_timer_event(apr_t
/* Okay, add sorted by when.. */
apr_skiplist_insert(timer_skiplist, te);
- /* Cheaply update the overall timers' next expiry according to
- * this event, if necessary.
+ /* Cheaply update the global timers_next_expiry with this event's
+ * if it expires before.
*/
next_expiry = timers_next_expiry;
if (!next_expiry || next_expiry > te->when + EVENT_FUDGE_FACTOR) {
@@ -1657,10 +1659,13 @@ static apr_status_t event_register_poll_callback(a
}
/*
- * Close socket and clean up if remote closed its end while we were in
- * lingering close. Only to be called in the worker thread, and since it's
- * in immediate call stack, we can afford a comfortable buffer size to
- * consume data quickly.
+ * Flush data and close our side of the connection, then drain incoming data.
+ * If the latter would block put the connection in one of the linger timeout
+ * queues to be called back when ready, and repeat until it's closed by peer.
+ * Only to be called in the worker thread, and since it's in immediate call
+ * stack, we can afford a comfortable buffer size to consume data quickly.
+ * Pre-condition: cs is not in any timeout queue and not in the pollset,
+ * timeout_mutex is not locked
*/
#define LINGERING_BUF_SIZE (32 * 1024)
static void process_lingering_close(event_conn_state_t *cs)
@@ -1671,7 +1676,39 @@ static void process_lingering_close(event_conn_sta
apr_status_t rv;
struct timeout_queue *q;
- /* socket is already in non-blocking state */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c,
+ "process lingering close from state %i", (int)cs->pub.state);
+ AP_DEBUG_ASSERT(cs->pub.state >= CONN_STATE_LINGER);
+
+ if (cs->pub.state == CONN_STATE_LINGER) {
+ apr_socket_timeout_set(csd, apr_time_from_sec(SECONDS_TO_LINGER));
+ if (ap_start_lingering_close(cs->c)) {
+ notify_suspend(cs);
+ abort_connection(cs);
+ return;
+ }
+
+ /* defer_lingering_close() may have bumped lingering_count already */
+ if (!cs->deferred_linger) {
+ apr_atomic_inc32(&lingering_count);
+ }
+ notify_suspend(cs);
+
+ cs->queue_timestamp = apr_time_now();
+ /*
+ * If some module requested a shortened waiting period, only wait for
+ * 2s (SECONDS_TO_LINGER). This is useful for mitigating certain
+ * DoS attacks.
+ */
+ if (apr_table_get(cs->c->notes, "short-lingering-close")) {
+ cs->pub.state = CONN_STATE_LINGER_SHORT;
+ }
+ else {
+ cs->pub.state = CONN_STATE_LINGER_NORMAL;
+ }
+ }
+
+ apr_socket_timeout_set(csd, 0);
do {
nbytes = sizeof(dummybuf);
rv = apr_socket_recv(csd, dummybuf, &nbytes);
@@ -1678,18 +1715,15 @@ static void process_lingering_close(event_conn_sta
} while (rv == APR_SUCCESS);
if (!APR_STATUS_IS_EAGAIN(rv)) {
- rv = apr_socket_close(csd);
- AP_DEBUG_ASSERT(rv == APR_SUCCESS);
- ap_queue_info_push_pool(worker_queue_info, cs->p);
+ abort_connection(cs);
return;
}
- /* Re-queue the connection to come back when readable */
- cs->pfd.reqevents = APR_POLLIN;
- cs->pub.sense = CONN_SENSE_DEFAULT;
+ /* (Re)queue the connection to come back when readable */
+ update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ);
q = (cs->pub.state == CONN_STATE_LINGER_SHORT) ? short_linger_q : linger_q;
apr_thread_mutex_lock(timeout_mutex);
- TO_QUEUE_APPEND(q, cs);
+ TO_QUEUE_APPEND(q, cs, 1);
rv = apr_pollset_add(event_pollset, &cs->pfd);
if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
AP_DEBUG_ASSERT(0);
@@ -1697,20 +1731,18 @@ static void process_lingering_close(event_conn_sta
apr_thread_mutex_unlock(timeout_mutex);
ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03092)
"process_lingering_close: apr_pollset_add failure");
- rv = apr_socket_close(cs->pfd.desc.s);
- AP_DEBUG_ASSERT(rv == APR_SUCCESS);
- ap_queue_info_push_pool(worker_queue_info, cs->p);
+ abort_connection(cs);
+ signal_threads(ST_GRACEFUL);
return;
}
apr_thread_mutex_unlock(timeout_mutex);
}
-/* call 'func' for all elements of 'q' with timeout less than 'timeout_time'.
+/* call 'func' for all elements of 'q' above 'expiry'.
* Pre-condition: timeout_mutex must already be locked
* Post-condition: timeout_mutex will be locked again
*/
-static void process_timeout_queue(struct timeout_queue *q,
- apr_time_t timeout_time,
+static void process_timeout_queue(struct timeout_queue *q, apr_time_t expiry,
int (*func)(event_conn_state_t *))
{
apr_uint32_t total = 0, count;
@@ -1730,10 +1762,10 @@ static void process_lingering_close(event_conn_sta
while (cs != APR_RING_SENTINEL(&qp->head, event_conn_state_t,
timeout_list)) {
/* Trash the entry if:
- * - no timeout_time was given (asked for all), or
+ * - no expiry was given (zero means all), or
* - it expired (according to the queue timeout), or
* - the system clock skewed in the past: no entry should be
- * registered above the given timeout_time (~now) + the queue
+ * registered above the given expiry (~now) + the queue
* timeout, we won't keep any here (eg. for centuries).
*
* Otherwise stop, no following entry will match thanks to the
@@ -1740,17 +1772,16 @@ static void process_lingering_close(event_conn_sta
* single timeout per queue (entries are added to the end!).
* This allows maintenance in O(1).
*/
- if (timeout_time
- && cs->queue_timestamp + qp->timeout > timeout_time
- && cs->queue_timestamp < timeout_time + qp->timeout) {
- /* Since this is the next expiring of this queue, update the
- * overall queues' next expiry if it's later than this one.
+ if (expiry && cs->queue_timestamp + qp->timeout > expiry
+ && cs->queue_timestamp < expiry + qp->timeout) {
+ /* Since this is the next expiring entry of this queue, update
+ * the global queues_next_expiry if it's later than this one.
*/
- apr_time_t q_expiry = cs->queue_timestamp + qp->timeout;
+ apr_time_t elem_expiry = cs->queue_timestamp + qp->timeout;
apr_time_t next_expiry = queues_next_expiry;
if (!next_expiry
- || next_expiry > q_expiry + TIMEOUT_FUDGE_FACTOR) {
- queues_next_expiry = q_expiry;
+ || next_expiry > elem_expiry + TIMEOUT_FUDGE_FACTOR) {
+ queues_next_expiry = elem_expiry;
}
break;
}
@@ -1790,18 +1821,17 @@ static void process_lingering_close(event_conn_sta
apr_thread_mutex_lock(timeout_mutex);
}
-static void process_keepalive_queue(apr_time_t timeout_time)
+static void process_keepalive_queue(apr_time_t expiry)
{
/* If all workers are busy, we kill older keep-alive connections so
* that they may connect to another process.
*/
- if (!timeout_time) {
+ if (!expiry && *keepalive_q->total) {
ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
- "All workers are busy or dying, will close %u "
+ "All workers are busy or dying, will kill %u "
"keep-alive connections", *keepalive_q->total);
}
- process_timeout_queue(keepalive_q, timeout_time,
- start_lingering_close_nonblocking);
+ process_timeout_queue(keepalive_q, expiry, shutdown_connection);
}
static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
@@ -1831,9 +1861,9 @@ static void * APR_THREAD_FUNC listener_thread(apr_
timer_event_t *te;
const apr_pollfd_t *out_pfd;
apr_int32_t num = 0;
- apr_interval_time_t timeout_interval;
+ apr_interval_time_t timeout;
socket_callback_baton_t *user_chain;
- apr_time_t now, timeout_time;
+ apr_time_t now, expiry = -1;
int workers_were_busy = 0;
if (conns_this_child <= 0)
@@ -1840,10 +1870,20 @@ static void * APR_THREAD_FUNC listener_thread(apr_
check_infinite_requests();
if (listener_may_exit) {
- close_listeners(&closed);
+ int first_close = close_listeners(&closed);
+
if (terminate_mode == ST_UNGRACEFUL
|| apr_atomic_read32(&connection_count) == 0)
break;
+
+ /* Don't wait in poll() for the first close (i.e. dying now), we
+ * want to maintain the queues and schedule defer_linger_chain ASAP
+ * to kill kept-alive connection and shutdown the workers and child
+ * faster.
+ */
+ if (first_close) {
+ goto do_maintenance;
+ }
}
if (APLOGtrace6(ap_server_conf)) {
@@ -1857,8 +1897,8 @@ static void * APR_THREAD_FUNC listener_thread(apr_
"keep-alive: %d lingering: %d suspended: %u)",
apr_atomic_read32(&connection_count),
apr_atomic_read32(&clogged_count),
- *(volatile apr_uint32_t*)write_completion_q->total,
- *(volatile apr_uint32_t*)keepalive_q->total,
+ apr_atomic_read32(write_completion_q->total),
+ apr_atomic_read32(keepalive_q->total),
apr_atomic_read32(&lingering_count),
apr_atomic_read32(&suspended_count));
if (dying) {
@@ -1886,18 +1926,18 @@ static void * APR_THREAD_FUNC listener_thread(apr_
* up occurs, otherwise periodic checks (maintenance, shutdown, ...)
* must be performed.
*/
- timeout_interval = -1;
+ timeout = -1;
/* Push expired timers to a worker, the first remaining one determines
* the maximum time to poll() below, if any.
*/
- timeout_time = timers_next_expiry;
- if (timeout_time && timeout_time < now + EVENT_FUDGE_FACTOR) {
+ expiry = timers_next_expiry;
+ if (expiry && expiry < now + EVENT_FUDGE_FACTOR) {
apr_thread_mutex_lock(g_timer_skiplist_mtx);
while ((te = apr_skiplist_peek(timer_skiplist))) {
if (te->when > now + EVENT_FUDGE_FACTOR) {
timers_next_expiry = te->when;
- timeout_interval = te->when - now;
+ timeout = te->when - now;
break;
}
apr_skiplist_pop(timer_skiplist, NULL);
@@ -1921,37 +1961,40 @@ static void * APR_THREAD_FUNC listener_thread(apr_
}
/* Same for queues, use their next expiry, if any. */
- timeout_time = queues_next_expiry;
- if (timeout_time
- && (timeout_interval < 0
- || timeout_time <= now
- || timeout_interval > timeout_time - now)) {
- timeout_interval = timeout_time > now ? timeout_time - now : 1;
+ expiry = queues_next_expiry;
+ if (expiry
+ && (timeout < 0
+ || expiry <= now
+ || timeout > expiry - now)) {
+ timeout = expiry > now ? expiry - now : 0;
}
/* When non-wakeable, don't wait more than 100 ms, in any case. */
#define NON_WAKEABLE_POLL_TIMEOUT apr_time_from_msec(100)
if (!listener_is_wakeable
- && (timeout_interval < 0
- || timeout_interval > NON_WAKEABLE_POLL_TIMEOUT)) {
- timeout_interval = NON_WAKEABLE_POLL_TIMEOUT;
+ && (timeout < 0
+ || timeout > NON_WAKEABLE_POLL_TIMEOUT)) {
+ timeout = NON_WAKEABLE_POLL_TIMEOUT;
}
+ else if (timeout > 0) {
+ /* apr_pollset_poll() might round down the timeout to milliseconds,
+ * let's forcibly round up here to never return before the timeout.
+ */
+ timeout = apr_time_from_msec(
+ apr_time_as_msec(timeout + apr_time_from_msec(1) - 1)
+ );
+ }
- rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd);
+ ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf,
+ "poll()ing with timeout=%" APR_TIME_T_FMT
+ " queues_timeout=%" APR_TIME_T_FMT
+ " timers_timeout=%" APR_TIME_T_FMT,
+ timeout, queues_next_expiry - now,
+ timers_next_expiry - now);
+
+ rc = apr_pollset_poll(event_pollset, timeout, &num, &out_pfd);
if (rc != APR_SUCCESS) {
- if (APR_STATUS_IS_EINTR(rc)) {
- /* Woken up, if we are exiting or listeners are disabled we
- * must fall through to kill kept-alive connections or test
- * whether listeners should be re-enabled. Otherwise we only
- * need to update timeouts (logic is above, so simply restart
- * the loop).
- */
- if (!listener_may_exit && !listeners_disabled()) {
- continue;
- }
- timeout_time = 0;
- }
- else if (!APR_STATUS_IS_TIMEUP(rc)) {
+ if (!APR_STATUS_IS_EINTR(rc) && !APR_STATUS_IS_TIMEUP(rc)) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf,
APLOGNO(03267)
"apr_pollset_poll failed. Attempting to "
@@ -1961,13 +2004,21 @@ static void * APR_THREAD_FUNC listener_thread(apr_
num = 0;
}
- if (listener_may_exit) {
- close_listeners(&closed);
- if (terminate_mode == ST_UNGRACEFUL
- || apr_atomic_read32(&connection_count) == 0)
- break;
+ if (APLOGtrace8(ap_server_conf)) {
+ now = apr_time_now();
+ ap_log_error(APLOG_MARK, APLOG_TRACE8, rc, ap_server_conf,
+ "poll()ed with num=%u exit=%d/%d conns=%d"
+ " queues_timeout=%" APR_TIME_T_FMT
+ " timers_timeout=%" APR_TIME_T_FMT,
+ num, listener_may_exit, dying,
+ apr_atomic_read32(&connection_count),
+ queues_next_expiry - now, timers_next_expiry - now);
}
+ /* XXX possible optimization: stash the current time for use as
+ * r->request_time for new requests or queues maintenance
+ */
+
for (user_chain = NULL; num; --num, ++out_pfd) {
listener_poll_type *pt = (listener_poll_type *) out_pfd->client_data;
if (pt->type == PT_CSD) {
@@ -2020,7 +2071,8 @@ static void * APR_THREAD_FUNC listener_thread(apr_
AP_DEBUG_ASSERT(0);
ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
APLOGNO(03094) "pollset remove failed");
- start_lingering_close_nonblocking(cs);
+ abort_connection(cs);
+ signal_threads(ST_GRACEFUL);
break;
}
@@ -2027,18 +2079,13 @@ static void * APR_THREAD_FUNC listener_thread(apr_
/* If we don't get a worker immediately (nonblocking), we
* close the connection; the client can re-connect to a
* different process for keepalive, and for lingering close
- * the connection will be reset so the choice is to favor
+ * the connection will be killed so the choice is to favor
* incoming/alive connections.
*/
get_worker(&have_idle_worker, blocking,
&workers_were_busy);
if (!have_idle_worker) {
- if (remove_from_q == cs->sc->ka_q) {
- start_lingering_close_nonblocking(cs);
- }
- else {
- stop_lingering_close(cs);
- }
+ shutdown_connection(cs);
}
else if (push2worker(cs, NULL, NULL) == APR_SUCCESS) {
have_idle_worker = 0;
@@ -2174,18 +2221,20 @@ static void * APR_THREAD_FUNC listener_thread(apr_
push_timer2worker(te);
}
- /* XXX possible optimization: stash the current time for use as
- * r->request_time for new requests
- */
- /* We process the timeout queues here only when their overall next
- * expiry (read once above) is over. This happens accurately since
+do_maintenance:
+ /* We process the timeout queues here only when the global
+ * queues_next_expiry is passed. This happens accurately since
* adding to the queues (in workers) can only decrease this expiry,
* while latest ones are only taken into account here (in listener)
* during queues' processing, with the lock held. This works both
* with and without wake-ability.
*/
- if (timeout_time && timeout_time < (now = apr_time_now())) {
- /* handle timed out sockets */
+ expiry = queues_next_expiry;
+ if (expiry && expiry < (now = apr_time_now())) {
+ ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf,
+ "processing queues: expiry=%" APR_TIME_T_FMT,
+ expiry - now);
+
apr_thread_mutex_lock(timeout_mutex);
/* Processing all the queues below will recompute this. */
@@ -2198,26 +2247,43 @@ static void * APR_THREAD_FUNC listener_thread(apr_
else {
process_keepalive_queue(now);
}
+ ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf,
+ "ka_q processed: expiry=%" APR_TIME_T_FMT,
+ queues_next_expiry > now ? queues_next_expiry - now : 0);
+
/* Step 2: write completion timeouts */
process_timeout_queue(write_completion_q, now,
- start_lingering_close_nonblocking);
+ defer_lingering_close);
+ ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf,
+ "wc_q processed: expiry=%" APR_TIME_T_FMT,
+ queues_next_expiry > now ? queues_next_expiry - now : 0);
+
/* Step 3: (normal) lingering close completion timeouts */
- process_timeout_queue(linger_q, now,
- stop_lingering_close);
+ if (dying && linger_q->timeout > short_linger_q->timeout) {
+ /* Dying, force short timeout for normal lingering close */
+ linger_q->timeout = short_linger_q->timeout;
+ }
+ process_timeout_queue(linger_q, now, shutdown_connection);
+ ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf,
+ "linger_q processed: expiry=%" APR_TIME_T_FMT,
+ queues_next_expiry > now ? queues_next_expiry - now : 0);
+
/* Step 4: (short) lingering close completion timeouts */
- process_timeout_queue(short_linger_q, now,
- stop_lingering_close);
+ process_timeout_queue(short_linger_q, now, shutdown_connection);
+ ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf,
+ "short_linger_q processed: expiry=%" APR_TIME_T_FMT,
+ queues_next_expiry > now ? queues_next_expiry - now : 0);
apr_thread_mutex_unlock(timeout_mutex);
- ps->keep_alive = *(volatile apr_uint32_t*)keepalive_q->total;
- ps->write_completion = *(volatile apr_uint32_t*)write_completion_q->total;
+ ps->keep_alive = apr_atomic_read32(keepalive_q->total);
+ ps->write_completion = apr_atomic_read32(write_completion_q->total);
ps->connections = apr_atomic_read32(&connection_count);
ps->suspended = apr_atomic_read32(&suspended_count);
ps->lingering_close = apr_atomic_read32(&lingering_count);
}
else if ((workers_were_busy || dying)
- && *(volatile apr_uint32_t*)keepalive_q->total) {
+ && apr_atomic_read32(keepalive_q->total)) {
apr_thread_mutex_lock(timeout_mutex);
process_keepalive_queue(0); /* kill'em all \m/ */
apr_thread_mutex_unlock(timeout_mutex);
@@ -2249,7 +2315,6 @@ static void * APR_THREAD_FUNC listener_thread(apr_
}
} /* listener main loop */
- close_listeners(&closed);
ap_queue_term(worker_queue);
apr_thread_exit(thd, APR_SUCCESS);
@@ -2397,15 +2462,9 @@ static void *APR_THREAD_FUNC worker_thread(apr_thr
continue;
}
cs->chain = NULL;
+ AP_DEBUG_ASSERT(cs->pub.state == CONN_STATE_LINGER);
worker_sockets[thread_slot] = csd = cs->pfd.desc.s;
-#ifdef AP_DEBUG
- rv = apr_socket_timeout_set(csd, SECONDS_TO_LINGER);
- AP_DEBUG_ASSERT(rv == APR_SUCCESS);
-#else
- apr_socket_timeout_set(csd, SECONDS_TO_LINGER);
-#endif
- cs->pub.state = CONN_STATE_LINGER;
process_socket(thd, cs->p, csd, cs, process_slot, thread_slot);
worker_sockets[thread_slot] = NULL;
}
@@ -2571,7 +2630,7 @@ static void setup_threads_runtime(void)
AP_DEBUG_ASSERT(i < num_listensocks);
pfd = &listener_pollfd[i];
- pfd->reqevents = APR_POLLIN;
+ pfd->reqevents = APR_POLLIN | APR_POLLHUP | APR_POLLERR;
pfd->desc_type = APR_POLL_SOCKET;
pfd->desc.s = lr->sd;
@@ -2703,13 +2762,17 @@ static void join_workers(apr_thread_t * listener,
*/
iter = 0;
- while (iter < 10 && !dying) {
+ while (!dying) {
+ apr_sleep(apr_time_from_msec(500));
+ if (dying || ++iter > 10) {
+ break;
+ }
/* listener has not stopped accepting yet */
- apr_sleep(apr_time_make(0, 500000));
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
+ "listener has not stopped accepting yet (%d iter)", iter);
wakeup_listener();
- ++iter;
}
- if (iter >= 10) {
+ if (iter > 10) {
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(00475)
"the listener thread didn't stop accepting");
}
@@ -2918,7 +2981,13 @@ static void child_main(int child_num_arg, int chil
* If the worker hasn't exited, then this blocks until
* they have (then cleans up).
*/
+ ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
+ "%s termination received, joining workers",
+ rv == AP_MPM_PODX_GRACEFUL ? "graceful" : "ungraceful");
join_workers(ts->listener, threads);
+ ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
+ "%s termination, workers joined, exiting",
+ rv == AP_MPM_PODX_GRACEFUL ? "graceful" : "ungraceful");
}
free(threads);
Index: server/connection.c
===================================================================
--- server/connection.c (revision 1891501)
+++ server/connection.c (working copy)
@@ -139,18 +139,12 @@ AP_DECLARE(int) ap_start_lingering_close(conn_rec
ap_flush_conn(c);
#ifdef NO_LINGCLOSE
- apr_socket_close(csd);
return 1;
#else
/* Shut down the socket for write, which will send a FIN
* to the peer.
*/
- if (c->aborted
- || apr_socket_shutdown(csd, APR_SHUTDOWN_WRITE) != APR_SUCCESS) {
- apr_socket_close(csd);
- return 1;
- }
- return 0;
+ return (c->aborted || apr_socket_shutdown(csd, APR_SHUTDOWN_WRITE));
#endif
}
@@ -162,6 +156,7 @@ AP_DECLARE(void) ap_lingering_close(conn_rec *c)
apr_socket_t *csd = ap_get_conn_socket(c);
if (ap_start_lingering_close(c)) {
+ apr_socket_close(csd);
return;
}
Index: server/mpm_fdqueue.c
===================================================================
--- server/mpm_fdqueue.c (revision 1891501)
+++ server/mpm_fdqueue.c (working copy)
@@ -493,6 +493,10 @@ static apr_status_t queue_interrupt(fd_queue_t *qu
{
apr_status_t rv;
+ if (queue->terminated) {
+ return APR_EOF;
+ }
+
if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
return rv;
}
Index: server/mpm_fdqueue.h
===================================================================
--- server/mpm_fdqueue.h (revision 1891501)
+++ server/mpm_fdqueue.h (working copy)
@@ -83,7 +83,7 @@ struct fd_queue_t
unsigned int out;
apr_thread_mutex_t *one_big_mutex;
apr_thread_cond_t *not_empty;
- int terminated;
+ volatile int terminated;
};
typedef struct fd_queue_t fd_queue_t;