Author: rhuijben
Date: Wed Nov 18 10:42:45 2015
New Revision: 1714965

URL: http://svn.apache.org/viewvc?rev=1714965&view=rev
Log:
Improve cleanup on incoming fcgi connections to allow using the same
connection for multiple requests with mod_proxy_fcgi.

* config_store.c
  (serf__config_store_remove_connection): Add comment.
  (serf__config_store_remove_client): New function.
  (serf__config_store_remove_host): Add comment.

* incoming.c
  (destroy_request): Rename to...
  (serf__incoming_request_destroy): ... this.
  (response_finished): Update caller.
  (read_from_client): Update caller. Don't remove listener from
    context here, but delay that to serf__incoming_update_pollset()
    where we can safely destroy the pool.
  (serf_incoming_create2): Remove init of variable.
  (serf__incoming_update_pollset): Destroy listener data when done.

* protocols/fcgi_protocol.c
  (fcgi_process): Update caller.
  (move_to_head): Remove function.
  (serf_fcgi__stream_get): Remove argument. (Given that clients typically don't
     use multiplexing... it is not worth optimizing for)
  (serf_fcgi__close_stream): New function.

* protocols/fcgi_protocol.h
  (serf_fcgi__stream_get): New function.
  (serf_fcgi__close_stream): New function.
  (serf_fcgi__stream_destroy): New function.

* protocols/fcgi_stream.c
  (serf_fcgi_stream_data_t): Add serf_fcgi_stream_t to have everyting in
    one allocation.
  (serf_fcgi__stream_create): Update init.
  (serf_fcgi__stream_destroy): New function.
  (close_stream): New function.
  (fcgi_stream_enqueue_response): Hook close_stream on destroying the done
    record.
* serf_private.h

Modified:
    serf/trunk/config_store.c
    serf/trunk/incoming.c
    serf/trunk/protocols/fcgi_protocol.c
    serf/trunk/protocols/fcgi_protocol.h
    serf/trunk/protocols/fcgi_stream.c
    serf/trunk/serf_private.h

Modified: serf/trunk/config_store.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/config_store.c?rev=1714965&r1=1714964&r2=1714965&view=diff
==============================================================================
--- serf/trunk/config_store.c (original)
+++ serf/trunk/config_store.c Wed Nov 18 10:42:45 2015
@@ -225,14 +225,22 @@ apr_status_t
 serf__config_store_remove_connection(serf__config_store_t config_store,
                                      serf_connection_t *conn)
 {
-    return APR_ENOTIMPL;
+    return APR_ENOTIMPL; /* Mem leak? */
 }
 
 apr_status_t
+serf__config_store_remove_client(serf__config_store_t config_store,
+                                 serf_incoming_t *client)
+{
+    return APR_ENOTIMPL; /* Mem leak? */
+}
+
+
+apr_status_t
 serf__config_store_remove_host(serf__config_store_t config_store,
                                const char *hostname_port)
 {
-    return APR_ENOTIMPL;
+    return APR_ENOTIMPL; /* Mem leak? */
 }
 
 /*** Config ***/

Modified: serf/trunk/incoming.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/incoming.c?rev=1714965&r1=1714964&r2=1714965&view=diff
==============================================================================
--- serf/trunk/incoming.c (original)
+++ serf/trunk/incoming.c Wed Nov 18 10:42:45 2015
@@ -113,14 +113,12 @@ static apr_status_t client_connected(ser
 }
 
 /* Destroy an incoming request and its resources */
-static apr_status_t destroy_request(serf_incoming_request_t *request)
+void serf__incoming_request_destroy(serf_incoming_request_t *request)
 {
     serf_incoming_t *incoming = request->incoming;
     apr_pool_destroy(request->pool);
 
     serf_bucket_mem_free(incoming->allocator, request);
-
-    return APR_SUCCESS;
 }
 
 /* Called when the response is completely written and the write bucket
@@ -129,14 +127,13 @@ static apr_status_t response_finished(vo
                                       apr_uint64_t bytes_written)
 {
     serf_incoming_request_t *request = baton;
-    apr_status_t status = APR_SUCCESS;
 
     request->response_finished = true;
 
     if (request->request_read && request->response_finished) {
-        status = destroy_request(request);
+        serf__incoming_request_destroy(request);
     }
-    return status;
+    return APR_SUCCESS;
 }
 
 static apr_status_t http1_enqueue_reponse(serf_incoming_request_t *request,
@@ -351,7 +348,7 @@ static apr_status_t read_from_client(ser
             client->current_request = NULL;
 
             if (rq->request_read && rq->response_finished) {
-                status = destroy_request(rq);
+                serf__incoming_request_destroy(rq);
             }
 
             /* Is the connection at eof or just the request? */
@@ -369,7 +366,6 @@ static apr_status_t read_from_client(ser
 
     {
         apr_pollfd_t tdesc = { 0 };
-        int i;
 
         /* Remove us from the pollset */
         tdesc.desc_type = APR_POLL_SOCKET;
@@ -378,17 +374,14 @@ static apr_status_t read_from_client(ser
         client->ctx->pollset_rm(client->ctx->pollset_baton,
                                 &tdesc, &client->baton);
 
-        /* And from the incommings list */
-        for (i = 0; i < client->ctx->incomings->nelts; i++) {
-            if (GET_INCOMING(client->ctx, i) == client) {
-                GET_INCOMING(client->ctx, i)
-                    = GET_INCOMING(client->ctx,
-                                   client->ctx->incomings->nelts - 1);
-                break;
-            }
-        }
-        client->ctx->incomings->nelts--;
         client->seen_in_pollset |= APR_POLLHUP; /* No more events */
+
+        /* Note that the client is done. The pool containing skt
+           and this listener will now be cleared from the context
+           handlers dirty pollset support */
+        client->skt = NULL;
+        client->dirty_conn = true;
+        client->ctx->dirty_pollset = true;
     }
 
     status = client->closed(client, client->closed_baton, status,
@@ -760,7 +753,6 @@ apr_status_t serf_incoming_create2(
     ic->stream = NULL;
     ic->ostream_head = NULL;
     ic->ostream_tail = NULL;
-    ic->ssltunnel_ostream = NULL;
 
     ic->protocol_baton = NULL;
     ic->perform_read = read_from_client;
@@ -872,24 +864,52 @@ apr_status_t serf_listener_create(
     return APR_SUCCESS;
 }
 
-apr_status_t serf__incoming_update_pollset(serf_incoming_t *incoming)
+apr_status_t serf__incoming_update_pollset(serf_incoming_t *client)
 {
-    serf_context_t *ctx = incoming->ctx;
+    serf_context_t *ctx = client->ctx;
     apr_status_t status;
     apr_pollfd_t desc = { 0 };
     bool data_waiting;
 
-    if (!incoming->skt) {
+    if (!client->skt) {
+        int cid;
+        /* We are in the proces of being cleaned up. As we are not
+           in the event loop and already notified the close callback
+           we can now clear our pool and remove us from the context */
+
+        if (client->config)
+            serf__config_store_remove_client(ctx->config_store, client);
+
+        /* And from the incommings list */
+        for (cid = 0; cid < ctx->incomings->nelts; cid++) {
+            if (GET_INCOMING(ctx, cid) == client) {
+                GET_INCOMING(ctx, cid) =
+                                GET_INCOMING(ctx,
+                                             ctx->incomings->nelts - 1);
+                break;
+            }
+        }
+        client->ctx->incomings->nelts--;
+
+        apr_pool_destroy(client->pool);
+
+        if (cid >= ctx->incomings->nelts) {
+            /* We skipped updating the pollset on this item as we moved it.
+               Let's run it now */
+
+            return serf__incoming_update_pollset(GET_INCOMING(ctx, cid));
+        }
+
         return APR_SUCCESS;
     }
 
     /* Remove the socket from the poll set. */
     desc.desc_type = APR_POLL_SOCKET;
-    desc.desc.s = incoming->skt;
-    desc.reqevents = incoming->reqevents;
+    desc.desc.s = client->skt;
+    desc.reqevents = client->reqevents;
 
     status = ctx->pollset_rm(ctx->pollset_baton,
-                             &desc, &incoming->baton);
+                             &desc, &client->baton);
     if (status && !APR_STATUS_IS_NOTFOUND(status))
         return status;
 
@@ -897,7 +917,7 @@ apr_status_t serf__incoming_update_polls
     desc.reqevents = APR_POLLIN | APR_POLLHUP | APR_POLLERR;
 
     /* If we are not connected yet, we just want to know when we are */
-    if (incoming->wait_for_connect) {
+    if (client->wait_for_connect) {
         data_waiting = true;
         desc.reqevents |= APR_POLLOUT;
     }
@@ -909,7 +929,7 @@ apr_status_t serf__incoming_update_polls
            But it also has the nice side effect of removing references
            from the aggregate to requests that are done.
          */
-        if (incoming->vec_len) {
+        if (client->vec_len) {
             /* We still have vecs in the connection, which lifetime is
                managed by buckets inside client->ostream_head.
 
@@ -920,10 +940,7 @@ apr_status_t serf__incoming_update_polls
         else {
             serf_bucket_t *ostream;
 
-            ostream = incoming->ostream_head;
-
-            if (!ostream)
-              ostream = incoming->ssltunnel_ostream;
+            ostream = client->ostream_head;
 
             if (ostream) {
                 const char *dummy_data;
@@ -951,11 +968,11 @@ apr_status_t serf__incoming_update_polls
     }
 
     /* save our reqevents, so we can pass it in to remove later. */
-    incoming->reqevents = desc.reqevents;
+    client->reqevents = desc.reqevents;
 
     /* Note: even if we don't want to read/write this socket, we still
      * want to poll it for hangups and errors.
      */
     return ctx->pollset_add(ctx->pollset_baton,
-                            &desc, &incoming->baton);
+                            &desc, &client->baton);
 }

Modified: serf/trunk/protocols/fcgi_protocol.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.c?rev=1714965&r1=1714964&r2=1714965&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_protocol.c (original)
+++ serf/trunk/protocols/fcgi_protocol.c Wed Nov 18 10:42:45 2015
@@ -226,13 +226,13 @@ static apr_status_t fcgi_process(serf_fc
             switch (frametype)
             {
                 case FCGI_FRAMETYPE(FCGI_V1, FCGI_BEGIN_REQUEST):
-                    stream = serf_fcgi__stream_get(fcgi, sid, FALSE, FALSE);
+                    stream = serf_fcgi__stream_get(fcgi, sid, false);
 
                     if (stream) {
                         /* Stream must be new */
                         return SERF_ERROR_FCGI_PROTOCOL_ERROR;
                     }
-                    stream = serf_fcgi__stream_get(fcgi, sid, TRUE, TRUE);
+                    stream = serf_fcgi__stream_get(fcgi, sid, true);
 
                     remaining = (apr_size_t)serf_bucket_get_remaining(body);
                     if (remaining != sizeof(FCGI_BeginRequestBody)) {
@@ -254,7 +254,7 @@ static apr_status_t fcgi_process(serf_fc
                     process_bucket = body;
                     break;
                 case FCGI_FRAMETYPE(FCGI_V1, FCGI_PARAMS):
-                    stream = serf_fcgi__stream_get(fcgi, sid, FALSE, FALSE);
+                    stream = serf_fcgi__stream_get(fcgi, sid, false);
                     if (!stream) {
                         return SERF_ERROR_FCGI_PROTOCOL_ERROR;
                     }
@@ -275,7 +275,7 @@ static apr_status_t fcgi_process(serf_fc
                     }
                     break;
                 case FCGI_FRAMETYPE(FCGI_V1, FCGI_STDIN):
-                    stream = serf_fcgi__stream_get(fcgi, sid, FALSE, FALSE);
+                    stream = serf_fcgi__stream_get(fcgi, sid, false);
                     if (!stream) {
                         return SERF_ERROR_FCGI_PROTOCOL_ERROR;
                     }
@@ -430,17 +430,10 @@ static apr_status_t fcgi_teardown(serf_f
     return APR_ENOTIMPL;
 }
 
-static void
-move_to_head(serf_fcgi_stream_t *stream)
-{
-    /* Not implemented yet */
-}
-
 serf_fcgi_stream_t *
 serf_fcgi__stream_get(serf_fcgi_protocol_t *fcgi,
                       apr_uint16_t streamid,
-                      bool create,
-                      bool move_first)
+                      bool create)
 {
     serf_fcgi_stream_t *stream;
 
@@ -450,12 +443,7 @@ serf_fcgi__stream_get(serf_fcgi_protocol
     for (stream = fcgi->first; stream; stream = stream->next)
     {
         if (stream->streamid == streamid)
-        {
-            if (move_first && stream != fcgi->first)
-                move_to_head(stream);
-
             return stream;
-        }
     }
 
     if (create)
@@ -476,6 +464,24 @@ serf_fcgi__stream_get(serf_fcgi_protocol
     return NULL;
 }
 
+void serf_fcgi__close_stream(serf_fcgi_protocol_t *fcgi,
+                             serf_fcgi_stream_t *stream)
+{
+    if (!stream->prev)
+        fcgi->first = stream->next;
+    else
+        stream->prev->next = stream;
+
+    if (stream->next)
+        stream->next->prev = stream->prev;
+    else
+        fcgi->last = stream->prev;
+
+    fcgi->first = fcgi->last = NULL;
+
+    serf_fcgi__stream_destroy(stream);
+}
+
 apr_status_t serf_fcgi__setup_incoming_request(
     serf_incoming_request_t **in_request,
     serf_incoming_request_setup_t *req_setup,

Modified: serf/trunk/protocols/fcgi_protocol.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.h?rev=1714965&r1=1714964&r2=1714965&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_protocol.h (original)
+++ serf/trunk/protocols/fcgi_protocol.h Wed Nov 18 10:42:45 2015
@@ -185,8 +185,7 @@ typedef apr_status_t(*serf_fcgi_processo
 /* From fcgi_protocol.c */
 serf_fcgi_stream_t * serf_fcgi__stream_get(serf_fcgi_protocol_t *fcgi,
                                            apr_uint16_t streamid,
-                                           bool create,
-                                           bool move_first);
+                                           bool create);
 
 
 apr_status_t serf_fcgi__setup_incoming_request(
@@ -199,6 +198,9 @@ apr_status_t serf_fcgi__enqueue_frame(se
                                       serf_bucket_t *frame,
                                       bool pump);
 
+void serf_fcgi__close_stream(serf_fcgi_protocol_t *fcgi,
+                             serf_fcgi_stream_t *stream);
+
 
 /* From fcgi_stream.c */
 serf_fcgi_stream_t * serf_fcgi__stream_create(serf_fcgi_protocol_t *fcgi,
@@ -219,6 +221,7 @@ serf_bucket_t * serf_fcgi__stream_handle
                                                serf_config_t *config,
                                                serf_bucket_alloc_t *alloc);
 
+void serf_fcgi__stream_destroy(serf_fcgi_stream_t *stream);
 
 
 #ifdef __cplusplus

Modified: serf/trunk/protocols/fcgi_stream.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_stream.c?rev=1714965&r1=1714964&r2=1714965&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_stream.c (original)
+++ serf/trunk/protocols/fcgi_stream.c Wed Nov 18 10:42:45 2015
@@ -31,8 +31,11 @@
 #include "protocols/fcgi_buckets.h"
 #include "protocols/fcgi_protocol.h"
 
+/* Fully opaque variant of serf_fcgi_stream_t */
 struct serf_fcgi_stream_data_t
 {
+    serf_fcgi_stream_t stream_data;
+
     serf_bucket_t *req_agg;
     bool headers_eof;
     bool stdin_eof;
@@ -45,8 +48,12 @@ serf_fcgi_stream_t * serf_fcgi__stream_c
                                               apr_uint16_t streamid,
                                               serf_bucket_alloc_t *alloc)
 {
-    serf_fcgi_stream_t *stream = serf_bucket_mem_alloc(alloc,
-                                                        sizeof(*stream));
+    serf_fcgi_stream_t *stream;
+    serf_fcgi_stream_data_t *data = serf_bucket_mem_calloc(alloc,
+                                                           sizeof(*data));
+
+    stream = &data->stream_data;
+    stream->data = data;
 
     stream->fcgi = fcgi;
     stream->alloc = alloc;
@@ -54,12 +61,19 @@ serf_fcgi_stream_t * serf_fcgi__stream_c
 
     stream->next = stream->prev = NULL;
 
-    /* Delay creating this? */
-    stream->data = serf_bucket_mem_calloc(alloc, sizeof(*stream->data));
-
     return stream;
 }
 
+void serf_fcgi__stream_destroy(serf_fcgi_stream_t * stream)
+{
+    if (stream->data->in_request)
+        serf__incoming_request_destroy(stream->data->in_request);
+
+
+    /* Destroy stream and stream->data */
+    serf_bucket_mem_free(stream->alloc, stream);
+}
+
 /* Aggregate hold open callback for what requests will think is the
    actual body */
 static apr_status_t stream_agg_eof(void *baton,
@@ -73,6 +87,16 @@ static apr_status_t stream_agg_eof(void
     return APR_EOF;
 }
 
+static apr_status_t close_stream(void *baton,
+                                 apr_uint64_t bytes_read)
+{
+    serf_fcgi_stream_t *stream = baton;
+
+    serf_fcgi__close_stream(stream->fcgi, stream);
+
+    return APR_SUCCESS;
+}
+
 static apr_status_t
 fcgi_stream_enqueue_response(serf_incoming_request_t *request,
                              void *enqueue_baton,
@@ -136,12 +160,15 @@ fcgi_stream_enqueue_response(serf_incomi
 
     /* Send end of request: FCGI_REQUEST_COMPLETE, exit code 0 */
     tmp = SERF_BUCKET_SIMPLE_STRING_LEN("\0\0\0\0\0\0\0\0", 8, alloc);
-    status = serf_fcgi__enqueue_frame(
-        stream->fcgi,
-        serf__bucket_fcgi_frame_create(tmp, stream->streamid,
-                                       FCGI_FRAMETYPE(FCGI_V1, 
FCGI_END_REQUEST),
-                                       false, false,
-                                       alloc), true);
+    tmp = serf__bucket_fcgi_frame_create(tmp, stream->streamid,
+                                         FCGI_FRAMETYPE(FCGI_V1, 
FCGI_END_REQUEST),
+                                         false, false,
+                                         alloc);
+
+    tmp = serf__bucket_event_create(tmp, stream, NULL, NULL,
+                                    close_stream, alloc);
+
+    status = serf_fcgi__enqueue_frame(stream->fcgi, tmp, true);
     return status;
 }
 

Modified: serf/trunk/serf_private.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1714965&r1=1714964&r2=1714965&view=diff
==============================================================================
--- serf/trunk/serf_private.h (original)
+++ serf/trunk/serf_private.h Wed Nov 18 10:42:45 2015
@@ -294,6 +294,10 @@ apr_status_t
 serf__config_store_remove_connection(serf__config_store_t config_store,
                                      serf_connection_t *conn);
 
+apr_status_t
+serf__config_store_remove_client(serf__config_store_t config_store,
+                                 serf_incoming_t *client);
+
 /* Cleans up all host specific configuration values */
 apr_status_t
 serf__config_store_remove_host(serf__config_store_t config_store,
@@ -407,9 +411,6 @@ struct serf_incoming_t {
     serf_bucket_t *ostream_head;
     serf_bucket_t *ostream_tail;
 
-    /* Aggregate bucket used to send the CONNECT request. */
-    serf_bucket_t *ssltunnel_ostream;
-
     serf_config_t *config;
 
     serf_bucket_t *proto_peek_bkt;
@@ -653,6 +654,7 @@ apr_status_t serf__process_listener(serf
 apr_status_t serf__incoming_update_pollset(serf_incoming_t *incoming);
 apr_status_t serf__incoming_client_flush(serf_incoming_t *client, bool pump);
 serf_incoming_request_t *serf__incoming_request_create(serf_incoming_t 
*client);
+void serf__incoming_request_destroy(serf_incoming_request_t *request);
 
 /* from outgoing.c */
 apr_status_t serf__open_connections(serf_context_t *ctx);


Reply via email to