Author: rhuijben Date: Wed Nov 18 10:42:45 2015 New Revision: 1714965 URL: http://svn.apache.org/viewvc?rev=1714965&view=rev Log: Improve cleanup on incoming fcgi connections to allow using the same connection for multiple requests with mod_proxy_fcgi.
* config_store.c (serf__config_store_remove_connection): Add comment. (serf__config_store_remove_client): New function. (serf__config_store_remove_host): Add comment. * incoming.c (destroy_request): Rename to... (serf__incoming_request_destroy): ... this. (response_finished): Update caller. (read_from_client): Update caller. Don't remove listener from context here, but delay that to serf__incoming_update_pollset() where we can safely destroy the pool. (serf_incoming_create2): Remove init of variable. (serf__incoming_update_pollset): Destroy listener data when done. * protocols/fcgi_protocol.c (fcgi_process): Update caller. (move_to_head): Remove function. (serf_fcgi__stream_get): Remove argument. (Given that clients typically don't use multiplexing... it is not worth optimizing for) (serf_fcgi__close_stream): New function. * protocols/fcgi_protocol.h (serf_fcgi__stream_get): New function. (serf_fcgi__close_stream): New function. (serf_fcgi__stream_destroy): New function. * protocols/fcgi_stream.c (serf_fcgi_stream_data_t): Add serf_fcgi_stream_t to have everyting in one allocation. (serf_fcgi__stream_create): Update init. (serf_fcgi__stream_destroy): New function. (close_stream): New function. (fcgi_stream_enqueue_response): Hook close_stream on destroying the done record. * serf_private.h Modified: serf/trunk/config_store.c serf/trunk/incoming.c serf/trunk/protocols/fcgi_protocol.c serf/trunk/protocols/fcgi_protocol.h serf/trunk/protocols/fcgi_stream.c serf/trunk/serf_private.h Modified: serf/trunk/config_store.c URL: http://svn.apache.org/viewvc/serf/trunk/config_store.c?rev=1714965&r1=1714964&r2=1714965&view=diff ============================================================================== --- serf/trunk/config_store.c (original) +++ serf/trunk/config_store.c Wed Nov 18 10:42:45 2015 @@ -225,14 +225,22 @@ apr_status_t serf__config_store_remove_connection(serf__config_store_t config_store, serf_connection_t *conn) { - return APR_ENOTIMPL; + return APR_ENOTIMPL; /* Mem leak? */ } apr_status_t +serf__config_store_remove_client(serf__config_store_t config_store, + serf_incoming_t *client) +{ + return APR_ENOTIMPL; /* Mem leak? */ +} + + +apr_status_t serf__config_store_remove_host(serf__config_store_t config_store, const char *hostname_port) { - return APR_ENOTIMPL; + return APR_ENOTIMPL; /* Mem leak? */ } /*** Config ***/ Modified: serf/trunk/incoming.c URL: http://svn.apache.org/viewvc/serf/trunk/incoming.c?rev=1714965&r1=1714964&r2=1714965&view=diff ============================================================================== --- serf/trunk/incoming.c (original) +++ serf/trunk/incoming.c Wed Nov 18 10:42:45 2015 @@ -113,14 +113,12 @@ static apr_status_t client_connected(ser } /* Destroy an incoming request and its resources */ -static apr_status_t destroy_request(serf_incoming_request_t *request) +void serf__incoming_request_destroy(serf_incoming_request_t *request) { serf_incoming_t *incoming = request->incoming; apr_pool_destroy(request->pool); serf_bucket_mem_free(incoming->allocator, request); - - return APR_SUCCESS; } /* Called when the response is completely written and the write bucket @@ -129,14 +127,13 @@ static apr_status_t response_finished(vo apr_uint64_t bytes_written) { serf_incoming_request_t *request = baton; - apr_status_t status = APR_SUCCESS; request->response_finished = true; if (request->request_read && request->response_finished) { - status = destroy_request(request); + serf__incoming_request_destroy(request); } - return status; + return APR_SUCCESS; } static apr_status_t http1_enqueue_reponse(serf_incoming_request_t *request, @@ -351,7 +348,7 @@ static apr_status_t read_from_client(ser client->current_request = NULL; if (rq->request_read && rq->response_finished) { - status = destroy_request(rq); + serf__incoming_request_destroy(rq); } /* Is the connection at eof or just the request? */ @@ -369,7 +366,6 @@ static apr_status_t read_from_client(ser { apr_pollfd_t tdesc = { 0 }; - int i; /* Remove us from the pollset */ tdesc.desc_type = APR_POLL_SOCKET; @@ -378,17 +374,14 @@ static apr_status_t read_from_client(ser client->ctx->pollset_rm(client->ctx->pollset_baton, &tdesc, &client->baton); - /* And from the incommings list */ - for (i = 0; i < client->ctx->incomings->nelts; i++) { - if (GET_INCOMING(client->ctx, i) == client) { - GET_INCOMING(client->ctx, i) - = GET_INCOMING(client->ctx, - client->ctx->incomings->nelts - 1); - break; - } - } - client->ctx->incomings->nelts--; client->seen_in_pollset |= APR_POLLHUP; /* No more events */ + + /* Note that the client is done. The pool containing skt + and this listener will now be cleared from the context + handlers dirty pollset support */ + client->skt = NULL; + client->dirty_conn = true; + client->ctx->dirty_pollset = true; } status = client->closed(client, client->closed_baton, status, @@ -760,7 +753,6 @@ apr_status_t serf_incoming_create2( ic->stream = NULL; ic->ostream_head = NULL; ic->ostream_tail = NULL; - ic->ssltunnel_ostream = NULL; ic->protocol_baton = NULL; ic->perform_read = read_from_client; @@ -872,24 +864,52 @@ apr_status_t serf_listener_create( return APR_SUCCESS; } -apr_status_t serf__incoming_update_pollset(serf_incoming_t *incoming) +apr_status_t serf__incoming_update_pollset(serf_incoming_t *client) { - serf_context_t *ctx = incoming->ctx; + serf_context_t *ctx = client->ctx; apr_status_t status; apr_pollfd_t desc = { 0 }; bool data_waiting; - if (!incoming->skt) { + if (!client->skt) { + int cid; + /* We are in the proces of being cleaned up. As we are not + in the event loop and already notified the close callback + we can now clear our pool and remove us from the context */ + + if (client->config) + serf__config_store_remove_client(ctx->config_store, client); + + /* And from the incommings list */ + for (cid = 0; cid < ctx->incomings->nelts; cid++) { + if (GET_INCOMING(ctx, cid) == client) { + GET_INCOMING(ctx, cid) = + GET_INCOMING(ctx, + ctx->incomings->nelts - 1); + break; + } + } + client->ctx->incomings->nelts--; + + apr_pool_destroy(client->pool); + + if (cid >= ctx->incomings->nelts) { + /* We skipped updating the pollset on this item as we moved it. + Let's run it now */ + + return serf__incoming_update_pollset(GET_INCOMING(ctx, cid)); + } + return APR_SUCCESS; } /* Remove the socket from the poll set. */ desc.desc_type = APR_POLL_SOCKET; - desc.desc.s = incoming->skt; - desc.reqevents = incoming->reqevents; + desc.desc.s = client->skt; + desc.reqevents = client->reqevents; status = ctx->pollset_rm(ctx->pollset_baton, - &desc, &incoming->baton); + &desc, &client->baton); if (status && !APR_STATUS_IS_NOTFOUND(status)) return status; @@ -897,7 +917,7 @@ apr_status_t serf__incoming_update_polls desc.reqevents = APR_POLLIN | APR_POLLHUP | APR_POLLERR; /* If we are not connected yet, we just want to know when we are */ - if (incoming->wait_for_connect) { + if (client->wait_for_connect) { data_waiting = true; desc.reqevents |= APR_POLLOUT; } @@ -909,7 +929,7 @@ apr_status_t serf__incoming_update_polls But it also has the nice side effect of removing references from the aggregate to requests that are done. */ - if (incoming->vec_len) { + if (client->vec_len) { /* We still have vecs in the connection, which lifetime is managed by buckets inside client->ostream_head. @@ -920,10 +940,7 @@ apr_status_t serf__incoming_update_polls else { serf_bucket_t *ostream; - ostream = incoming->ostream_head; - - if (!ostream) - ostream = incoming->ssltunnel_ostream; + ostream = client->ostream_head; if (ostream) { const char *dummy_data; @@ -951,11 +968,11 @@ apr_status_t serf__incoming_update_polls } /* save our reqevents, so we can pass it in to remove later. */ - incoming->reqevents = desc.reqevents; + client->reqevents = desc.reqevents; /* Note: even if we don't want to read/write this socket, we still * want to poll it for hangups and errors. */ return ctx->pollset_add(ctx->pollset_baton, - &desc, &incoming->baton); + &desc, &client->baton); } Modified: serf/trunk/protocols/fcgi_protocol.c URL: http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.c?rev=1714965&r1=1714964&r2=1714965&view=diff ============================================================================== --- serf/trunk/protocols/fcgi_protocol.c (original) +++ serf/trunk/protocols/fcgi_protocol.c Wed Nov 18 10:42:45 2015 @@ -226,13 +226,13 @@ static apr_status_t fcgi_process(serf_fc switch (frametype) { case FCGI_FRAMETYPE(FCGI_V1, FCGI_BEGIN_REQUEST): - stream = serf_fcgi__stream_get(fcgi, sid, FALSE, FALSE); + stream = serf_fcgi__stream_get(fcgi, sid, false); if (stream) { /* Stream must be new */ return SERF_ERROR_FCGI_PROTOCOL_ERROR; } - stream = serf_fcgi__stream_get(fcgi, sid, TRUE, TRUE); + stream = serf_fcgi__stream_get(fcgi, sid, true); remaining = (apr_size_t)serf_bucket_get_remaining(body); if (remaining != sizeof(FCGI_BeginRequestBody)) { @@ -254,7 +254,7 @@ static apr_status_t fcgi_process(serf_fc process_bucket = body; break; case FCGI_FRAMETYPE(FCGI_V1, FCGI_PARAMS): - stream = serf_fcgi__stream_get(fcgi, sid, FALSE, FALSE); + stream = serf_fcgi__stream_get(fcgi, sid, false); if (!stream) { return SERF_ERROR_FCGI_PROTOCOL_ERROR; } @@ -275,7 +275,7 @@ static apr_status_t fcgi_process(serf_fc } break; case FCGI_FRAMETYPE(FCGI_V1, FCGI_STDIN): - stream = serf_fcgi__stream_get(fcgi, sid, FALSE, FALSE); + stream = serf_fcgi__stream_get(fcgi, sid, false); if (!stream) { return SERF_ERROR_FCGI_PROTOCOL_ERROR; } @@ -430,17 +430,10 @@ static apr_status_t fcgi_teardown(serf_f return APR_ENOTIMPL; } -static void -move_to_head(serf_fcgi_stream_t *stream) -{ - /* Not implemented yet */ -} - serf_fcgi_stream_t * serf_fcgi__stream_get(serf_fcgi_protocol_t *fcgi, apr_uint16_t streamid, - bool create, - bool move_first) + bool create) { serf_fcgi_stream_t *stream; @@ -450,12 +443,7 @@ serf_fcgi__stream_get(serf_fcgi_protocol for (stream = fcgi->first; stream; stream = stream->next) { if (stream->streamid == streamid) - { - if (move_first && stream != fcgi->first) - move_to_head(stream); - return stream; - } } if (create) @@ -476,6 +464,24 @@ serf_fcgi__stream_get(serf_fcgi_protocol return NULL; } +void serf_fcgi__close_stream(serf_fcgi_protocol_t *fcgi, + serf_fcgi_stream_t *stream) +{ + if (!stream->prev) + fcgi->first = stream->next; + else + stream->prev->next = stream; + + if (stream->next) + stream->next->prev = stream->prev; + else + fcgi->last = stream->prev; + + fcgi->first = fcgi->last = NULL; + + serf_fcgi__stream_destroy(stream); +} + apr_status_t serf_fcgi__setup_incoming_request( serf_incoming_request_t **in_request, serf_incoming_request_setup_t *req_setup, Modified: serf/trunk/protocols/fcgi_protocol.h URL: http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.h?rev=1714965&r1=1714964&r2=1714965&view=diff ============================================================================== --- serf/trunk/protocols/fcgi_protocol.h (original) +++ serf/trunk/protocols/fcgi_protocol.h Wed Nov 18 10:42:45 2015 @@ -185,8 +185,7 @@ typedef apr_status_t(*serf_fcgi_processo /* From fcgi_protocol.c */ serf_fcgi_stream_t * serf_fcgi__stream_get(serf_fcgi_protocol_t *fcgi, apr_uint16_t streamid, - bool create, - bool move_first); + bool create); apr_status_t serf_fcgi__setup_incoming_request( @@ -199,6 +198,9 @@ apr_status_t serf_fcgi__enqueue_frame(se serf_bucket_t *frame, bool pump); +void serf_fcgi__close_stream(serf_fcgi_protocol_t *fcgi, + serf_fcgi_stream_t *stream); + /* From fcgi_stream.c */ serf_fcgi_stream_t * serf_fcgi__stream_create(serf_fcgi_protocol_t *fcgi, @@ -219,6 +221,7 @@ serf_bucket_t * serf_fcgi__stream_handle serf_config_t *config, serf_bucket_alloc_t *alloc); +void serf_fcgi__stream_destroy(serf_fcgi_stream_t *stream); #ifdef __cplusplus Modified: serf/trunk/protocols/fcgi_stream.c URL: http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_stream.c?rev=1714965&r1=1714964&r2=1714965&view=diff ============================================================================== --- serf/trunk/protocols/fcgi_stream.c (original) +++ serf/trunk/protocols/fcgi_stream.c Wed Nov 18 10:42:45 2015 @@ -31,8 +31,11 @@ #include "protocols/fcgi_buckets.h" #include "protocols/fcgi_protocol.h" +/* Fully opaque variant of serf_fcgi_stream_t */ struct serf_fcgi_stream_data_t { + serf_fcgi_stream_t stream_data; + serf_bucket_t *req_agg; bool headers_eof; bool stdin_eof; @@ -45,8 +48,12 @@ serf_fcgi_stream_t * serf_fcgi__stream_c apr_uint16_t streamid, serf_bucket_alloc_t *alloc) { - serf_fcgi_stream_t *stream = serf_bucket_mem_alloc(alloc, - sizeof(*stream)); + serf_fcgi_stream_t *stream; + serf_fcgi_stream_data_t *data = serf_bucket_mem_calloc(alloc, + sizeof(*data)); + + stream = &data->stream_data; + stream->data = data; stream->fcgi = fcgi; stream->alloc = alloc; @@ -54,12 +61,19 @@ serf_fcgi_stream_t * serf_fcgi__stream_c stream->next = stream->prev = NULL; - /* Delay creating this? */ - stream->data = serf_bucket_mem_calloc(alloc, sizeof(*stream->data)); - return stream; } +void serf_fcgi__stream_destroy(serf_fcgi_stream_t * stream) +{ + if (stream->data->in_request) + serf__incoming_request_destroy(stream->data->in_request); + + + /* Destroy stream and stream->data */ + serf_bucket_mem_free(stream->alloc, stream); +} + /* Aggregate hold open callback for what requests will think is the actual body */ static apr_status_t stream_agg_eof(void *baton, @@ -73,6 +87,16 @@ static apr_status_t stream_agg_eof(void return APR_EOF; } +static apr_status_t close_stream(void *baton, + apr_uint64_t bytes_read) +{ + serf_fcgi_stream_t *stream = baton; + + serf_fcgi__close_stream(stream->fcgi, stream); + + return APR_SUCCESS; +} + static apr_status_t fcgi_stream_enqueue_response(serf_incoming_request_t *request, void *enqueue_baton, @@ -136,12 +160,15 @@ fcgi_stream_enqueue_response(serf_incomi /* Send end of request: FCGI_REQUEST_COMPLETE, exit code 0 */ tmp = SERF_BUCKET_SIMPLE_STRING_LEN("\0\0\0\0\0\0\0\0", 8, alloc); - status = serf_fcgi__enqueue_frame( - stream->fcgi, - serf__bucket_fcgi_frame_create(tmp, stream->streamid, - FCGI_FRAMETYPE(FCGI_V1, FCGI_END_REQUEST), - false, false, - alloc), true); + tmp = serf__bucket_fcgi_frame_create(tmp, stream->streamid, + FCGI_FRAMETYPE(FCGI_V1, FCGI_END_REQUEST), + false, false, + alloc); + + tmp = serf__bucket_event_create(tmp, stream, NULL, NULL, + close_stream, alloc); + + status = serf_fcgi__enqueue_frame(stream->fcgi, tmp, true); return status; } Modified: serf/trunk/serf_private.h URL: http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1714965&r1=1714964&r2=1714965&view=diff ============================================================================== --- serf/trunk/serf_private.h (original) +++ serf/trunk/serf_private.h Wed Nov 18 10:42:45 2015 @@ -294,6 +294,10 @@ apr_status_t serf__config_store_remove_connection(serf__config_store_t config_store, serf_connection_t *conn); +apr_status_t +serf__config_store_remove_client(serf__config_store_t config_store, + serf_incoming_t *client); + /* Cleans up all host specific configuration values */ apr_status_t serf__config_store_remove_host(serf__config_store_t config_store, @@ -407,9 +411,6 @@ struct serf_incoming_t { serf_bucket_t *ostream_head; serf_bucket_t *ostream_tail; - /* Aggregate bucket used to send the CONNECT request. */ - serf_bucket_t *ssltunnel_ostream; - serf_config_t *config; serf_bucket_t *proto_peek_bkt; @@ -653,6 +654,7 @@ apr_status_t serf__process_listener(serf apr_status_t serf__incoming_update_pollset(serf_incoming_t *incoming); apr_status_t serf__incoming_client_flush(serf_incoming_t *client, bool pump); serf_incoming_request_t *serf__incoming_request_create(serf_incoming_t *client); +void serf__incoming_request_destroy(serf_incoming_request_t *request); /* from outgoing.c */ apr_status_t serf__open_connections(serf_context_t *ctx);