Author: rhuijben
Date: Wed Nov 18 19:01:54 2015
New Revision: 1715036

URL: http://svn.apache.org/viewvc?rev=1715036&view=rev
Log:
Use a standard method to the pump logic to add a bucket to the to be
written data. This consolidates some code written for the http2
and fcgi protocols.

* outgoing.c
  (write_to_connection): Use serf_pump__add_output.

* protocols/fcgi_protocol.c
  (serf_fcgi_protocol_t): Just store pump instead of streams.
  (fcgi_process): Update usage.
  (serf_fcgi__enqueue_frame): Use standard code.
  (fcgi_outgoing_read): Remove now unneeded code.
  (serf__fcgi_protocol_init): Simplify init.
  (fcgi_server_read,
   serf__fcgi_protocol_init_server): Simplify.

* protocols/fcgi_protocol.h
  (serf_fcgi__enqueue_frame): Tweak argument name

* protocols/http2_protocol.c
  (serf_http2_protocol_t): Store pump instead of streams.
  (serf__http2_protocol_init,
   serf__http2_protocol_init_server): Simplify init.
  (serf_http2__enqueue_frame): Wrap standard code.
  (http2_process): Use stream from pump.
  (http2_outgoing_read,
   http2_incoming_read): Remove init.

* protocols/http2_protocol.h
  (serf_http2__enqueue_frame): Tweak argument name

* pump.c
  (serf_pump__write): Set pollset dirty if receiving EAGAIN,
    and not already asking for writable.
  (serf_pump__add_output): New function.

* serf_private.h
  (serf_pump__add_output): New function.

Modified:
    serf/trunk/outgoing.c
    serf/trunk/protocols/fcgi_protocol.c
    serf/trunk/protocols/fcgi_protocol.h
    serf/trunk/protocols/http2_protocol.c
    serf/trunk/protocols/http2_protocol.h
    serf/trunk/pump.c
    serf/trunk/serf_private.h

Modified: serf/trunk/outgoing.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/outgoing.c?rev=1715036&r1=1715035&r2=1715036&view=diff
==============================================================================
--- serf/trunk/outgoing.c (original)
+++ serf/trunk/outgoing.c Wed Nov 18 19:01:54 2015
@@ -777,8 +777,7 @@ static apr_status_t write_to_connection(
                                                      request_writing_done,
                                                      request_writing_finished,
                                                      conn->allocator);
-            serf_bucket_aggregate_append(conn->pump.ostream_tail,
-                                         event_bucket);
+            serf_pump__add_output(&conn->pump, event_bucket, false);
         }
 
         /* If we got some data, then deliver it. */

Modified: serf/trunk/protocols/fcgi_protocol.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.c?rev=1715036&r1=1715035&r2=1715036&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_protocol.c (original)
+++ serf/trunk/protocols/fcgi_protocol.c Wed Nov 18 19:01:54 2015
@@ -40,14 +40,12 @@ typedef struct serf_fcgi_protocol_t
     serf_incoming_t *client;
 
     serf_io_baton_t *io; /* Low level connection */
+    serf_pump_t *pump;
 
     apr_pool_t *pool;
     serf_bucket_alloc_t *allocator;
     serf_config_t *config;
 
-    serf_bucket_t *stream;
-    serf_bucket_t *ostream;
-
     serf_fcgi_processor_t processor;
     void *processor_baton;
 
@@ -175,7 +173,7 @@ static apr_status_t fcgi_process(serf_fc
         {
             SERF_FCGI_assert(!fcgi->in_frame);
 
-            body = serf__bucket_fcgi_unframe_create(fcgi->stream,
+            body = serf__bucket_fcgi_unframe_create(fcgi->pump->stream,
                                                     fcgi->allocator);
 
             serf__bucket_fcgi_unframe_set_eof(body,
@@ -347,52 +345,9 @@ static apr_status_t fcgi_read(serf_fcgi_
 
 apr_status_t serf_fcgi__enqueue_frame(serf_fcgi_protocol_t *fcgi,
                                       serf_bucket_t *frame,
-                                      bool pump)
+                                      bool flush)
 {
-    apr_status_t status;
-    bool want_write;
-
-    if (!pump && !fcgi->io->dirty_conn)
-    {
-        const char *data;
-        apr_size_t len;
-
-        /* Cheap check to see if we should request a write
-        event next time around */
-        status = serf_bucket_peek(fcgi->ostream, &data, &len);
-
-        if (SERF_BUCKET_READ_ERROR(status))
-        {
-            serf_bucket_destroy(frame);
-            return status;
-        }
-
-        if (len == 0)
-        {
-            serf_io__set_pollset_dirty(fcgi->io);
-        }
-    }
-
-    serf_bucket_aggregate_append(fcgi->ostream, frame);
-
-    if (!pump)
-        return APR_SUCCESS;
-
-    /* Flush final output buffer (after ssl, etc.) */
-    if (fcgi->conn)
-        status = serf__connection_flush(fcgi->conn, TRUE);
-    else
-        status = serf__incoming_client_flush(fcgi->client, TRUE);
-
-    want_write = APR_STATUS_IS_EAGAIN(status);
-
-    if ((want_write && !(fcgi->io->reqevents & APR_POLLOUT))
-        || (!want_write && (fcgi->io->reqevents & APR_POLLOUT)))
-    {
-        serf_io__set_pollset_dirty(fcgi->io);
-    }
-
-    return status;
+    return serf_pump__add_output(fcgi->pump, frame, flush);
 }
 
 static apr_status_t fcgi_write(serf_fcgi_protocol_t *fcgi)
@@ -499,9 +454,6 @@ static apr_status_t fcgi_outgoing_read(s
 {
     serf_fcgi_protocol_t *fcgi = conn->protocol_baton;
 
-    if (!fcgi->stream)
-        fcgi->stream = conn->pump.stream;
-
     return fcgi_read(fcgi);
 }
 
@@ -537,8 +489,7 @@ void serf__fcgi_protocol_init(serf_conne
     fcgi->pool = protocol_pool;
     fcgi->conn = conn;
     fcgi->io = &conn->io;
-    fcgi->stream = conn->pump.stream;
-    fcgi->ostream = conn->pump.ostream_tail;
+    fcgi->pump = &conn->pump;
     fcgi->allocator = conn->allocator;
     fcgi->config = conn->config;
 
@@ -561,11 +512,6 @@ static apr_status_t fcgi_server_read(ser
 {
     serf_fcgi_protocol_t *fcgi = client->protocol_baton;
 
-    if (! fcgi->stream) {
-        fcgi->stream = client->pump.stream;
-        fcgi->ostream = client->pump.ostream_tail;
-    }
-
     return fcgi_read(fcgi);
 }
 
@@ -573,11 +519,6 @@ static apr_status_t fcgi_server_write(se
 {
     serf_fcgi_protocol_t *fcgi = client->protocol_baton;
 
-    if (!fcgi->stream) {
-        fcgi->stream = client->pump.stream;
-        fcgi->ostream = client->pump.ostream_tail;
-    }
-
     return fcgi_write(fcgi);
 }
 
@@ -606,8 +547,7 @@ void serf__fcgi_protocol_init_server(ser
     fcgi->pool = protocol_pool;
     fcgi->client = client;
     fcgi->io = &client->io;
-    fcgi->stream = client->pump.stream;
-    fcgi->ostream = client->pump.ostream_tail;
+    fcgi->pump = &client->pump;
     fcgi->allocator = client->allocator;
     fcgi->config = client->config;
 

Modified: serf/trunk/protocols/fcgi_protocol.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.h?rev=1715036&r1=1715035&r2=1715036&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_protocol.h (original)
+++ serf/trunk/protocols/fcgi_protocol.h Wed Nov 18 19:01:54 2015
@@ -196,7 +196,7 @@ apr_status_t serf_fcgi__setup_incoming_r
 
 apr_status_t serf_fcgi__enqueue_frame(serf_fcgi_protocol_t *fcgi,
                                       serf_bucket_t *frame,
-                                      bool pump);
+                                      bool flush);
 
 void serf_fcgi__close_stream(serf_fcgi_protocol_t *fcgi,
                              serf_fcgi_stream_t *stream);

Modified: serf/trunk/protocols/http2_protocol.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1715036&r1=1715035&r2=1715036&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.c (original)
+++ serf/trunk/protocols/http2_protocol.c Wed Nov 18 19:01:54 2015
@@ -122,8 +122,8 @@ struct serf_http2_protocol_t
     serf_incoming_t *client;
 
     serf_io_baton_t *io; /* Low level connection */
+    serf_pump_t *pump;
 
-    serf_bucket_t *stream, *ostream;
     serf_bucket_alloc_t *allocator;
 
     serf_http2_processor_t processor;
@@ -239,8 +239,7 @@ void serf__http2_protocol_init(serf_conn
     h2->pool = protocol_pool;
     h2->conn = conn;
     h2->io = &conn->io;
-    h2->stream = conn->pump.stream;
-    h2->ostream = conn->pump.ostream_tail;
+    h2->pump = &conn->pump;
     h2->allocator = conn->allocator;
     h2->config = conn->config;
 
@@ -289,7 +288,7 @@ void serf__http2_protocol_init(serf_conn
 
     /* Send the HTTP/2 Connection Preface */
     tmp = SERF_BUCKET_SIMPLE_STRING(HTTP2_CONNECTION_PREFIX, h2->allocator);
-    serf_bucket_aggregate_append(h2->ostream, tmp);
+    serf_pump__add_output(h2->pump, tmp, false);
 
     /* And now a settings frame and a huge window */
     {
@@ -331,8 +330,7 @@ void serf__http2_protocol_init_server(se
     h2->pool = protocol_pool;
     h2->client = client;
     h2->io = &client->io;
-    h2->stream = client->pump.stream;
-    h2->ostream = client->pump.ostream_tail;
+    h2->pump = &client->pump;
     h2->allocator = client->allocator;
     h2->config = client->config;
 
@@ -432,53 +430,9 @@ enqueue_http2_request(serf_http2_protoco
 apr_status_t
 serf_http2__enqueue_frame(serf_http2_protocol_t *h2,
                           serf_bucket_t *frame,
-                          int pump)
+                          bool flush)
 {
-    apr_status_t status;
-    bool want_write;
-
-
-    if (!pump && !h2->io->dirty_conn)
-    {
-        const char *data;
-        apr_size_t len;
-
-        /* Cheap check to see if we should request a write
-           event next time around */
-        status = serf_bucket_peek(h2->ostream, &data, &len);
-
-        if (SERF_BUCKET_READ_ERROR(status))
-        {
-            serf_bucket_destroy(frame);
-            return status;
-        }
-
-        if (len == 0)
-        {
-            serf_io__set_pollset_dirty(h2->io);
-        }
-    }
-
-    serf_bucket_aggregate_append(h2->ostream, frame);
-
-    if (!pump)
-        return APR_SUCCESS;
-
-      /* Flush final output buffer (after ssl, etc.) */
-    if (h2->conn)
-        status = serf__connection_flush(h2->conn, TRUE);
-    else
-        status = serf__incoming_client_flush(h2->client, TRUE);
-
-    want_write = APR_STATUS_IS_EAGAIN(status);
-
-    if ((want_write && !(h2->io->reqevents & APR_POLLOUT))
-        || (!want_write && (h2->io->reqevents & APR_POLLOUT)))
-    {
-        serf_io__set_pollset_dirty(h2->io);
-    }
-
-    return status;
+    return serf_pump__add_output(h2->pump, frame, flush);
 }
 
 /* Implements serf_bucket_prefix_handler_t.
@@ -1072,7 +1026,7 @@ http2_process(serf_http2_protocol_t *h2)
             SERF_H2_assert(!h2->in_frame);
 
             body = serf__bucket_http2_unframe_create(
-                h2->stream,
+                h2->pump->stream,
                 h2->rl_max_framesize,
                 h2->allocator);
 
@@ -1650,9 +1604,6 @@ http2_outgoing_read(serf_connection_t *c
         serf_io__set_pollset_dirty(&conn->io);
     }
 
-    if (h2->stream == NULL)
-        h2->stream = conn->pump.stream;
-
     status = http2_process(h2);
 
     if (!status)
@@ -1765,8 +1716,6 @@ http2_incoming_read(serf_incoming_t *cli
             /* Peek buffer is now empty. Use actual stream */
             serf_bucket_destroy(client->proto_peek_bkt);
             client->proto_peek_bkt = NULL;
-
-            h2->stream = client->pump.stream;
         }
 
         if (APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN)

Modified: serf/trunk/protocols/http2_protocol.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.h?rev=1715036&r1=1715035&r2=1715036&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.h (original)
+++ serf/trunk/protocols/http2_protocol.h Wed Nov 18 19:01:54 2015
@@ -157,7 +157,7 @@ typedef apr_status_t (* serf_http2_proce
 apr_status_t
 serf_http2__enqueue_frame(serf_http2_protocol_t *h2,
                           serf_bucket_t *frame,
-                          int pump);
+                          bool flush);
 
 /* Creates a new stream */
 serf_http2_stream_t *

Modified: serf/trunk/pump.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/pump.c?rev=1715036&r1=1715035&r2=1715036&view=diff
==============================================================================
--- serf/trunk/pump.c (original)
+++ serf/trunk/pump.c Wed Nov 18 19:01:54 2015
@@ -293,8 +293,18 @@ apr_status_t serf_pump__write(serf_pump_
               return no_more_writes(pump);
         }
 
-        if (status || !fetch_new)
+        if (status || !fetch_new) {
+
+            /* If we couldn't write everything that we tried,
+               make sure that we will receive a write event next time */
+            if (APR_STATUS_IS_EAGAIN(status)
+                && !pump->io->dirty_conn
+                && !(pump->io->reqevents & APR_POLLOUT))
+            {
+                serf_io__set_pollset_dirty(pump->io);
+            }
             return status;
+        }
         else if (read_status || pump->vec_len || pump->hit_eof)
             return read_status;
 
@@ -340,3 +350,31 @@ apr_status_t serf_pump__write(serf_pump_
 
     return status;
 }
+
+apr_status_t serf_pump__add_output(serf_pump_t *pump,
+                                   serf_bucket_t *bucket,
+                                   bool flush)
+{
+    if (!flush
+        && !pump->io->dirty_conn
+        && !pump->stop_writing
+        && !(pump->io->reqevents & APR_POLLOUT)
+        && !serf_pump__data_pending(pump))
+    {
+        /* If not writing now,
+           * and not already dirty
+           * and nothing pending yet
+           Then mark the pollset dirty to trigger a write */
+
+        serf_io__set_pollset_dirty(pump->io);
+    }
+
+    serf_bucket_aggregate_append(pump->ostream_tail, bucket);
+
+    if (!flush)
+        return APR_SUCCESS;
+
+    /* Flush final output buffer (after ssl, etc.) */
+    return serf_pump__write(pump, TRUE);
+}
+

Modified: serf/trunk/serf_private.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1715036&r1=1715035&r2=1715036&view=diff
==============================================================================
--- serf/trunk/serf_private.h (original)
+++ serf/trunk/serf_private.h Wed Nov 18 19:01:54 2015
@@ -762,6 +762,10 @@ void serf_pump__store_ipaddresses_in_con
 apr_status_t serf_pump__write(serf_pump_t *pump,
                               bool fetch_new);
 
+apr_status_t serf_pump__add_output(serf_pump_t *pump,
+                                   serf_bucket_t *bucket,
+                                   bool flush);
+
 /* These must always be called as a pair to avoid a memory leak */
 void serf_pump__prepare_setup(serf_pump_t *pump);
 void serf_pump__complete_setup(serf_pump_t *pump,


Reply via email to