Author: rhuijben Date: Wed Nov 18 19:01:54 2015 New Revision: 1715036 URL: http://svn.apache.org/viewvc?rev=1715036&view=rev Log: Use a standard method to the pump logic to add a bucket to the to be written data. This consolidates some code written for the http2 and fcgi protocols.
* outgoing.c (write_to_connection): Use serf_pump__add_output. * protocols/fcgi_protocol.c (serf_fcgi_protocol_t): Just store pump instead of streams. (fcgi_process): Update usage. (serf_fcgi__enqueue_frame): Use standard code. (fcgi_outgoing_read): Remove now unneeded code. (serf__fcgi_protocol_init): Simplify init. (fcgi_server_read, serf__fcgi_protocol_init_server): Simplify. * protocols/fcgi_protocol.h (serf_fcgi__enqueue_frame): Tweak argument name * protocols/http2_protocol.c (serf_http2_protocol_t): Store pump instead of streams. (serf__http2_protocol_init, serf__http2_protocol_init_server): Simplify init. (serf_http2__enqueue_frame): Wrap standard code. (http2_process): Use stream from pump. (http2_outgoing_read, http2_incoming_read): Remove init. * protocols/http2_protocol.h (serf_http2__enqueue_frame): Tweak argument name * pump.c (serf_pump__write): Set pollset dirty if receiving EAGAIN, and not already asking for writable. (serf_pump__add_output): New function. * serf_private.h (serf_pump__add_output): New function. Modified: serf/trunk/outgoing.c serf/trunk/protocols/fcgi_protocol.c serf/trunk/protocols/fcgi_protocol.h serf/trunk/protocols/http2_protocol.c serf/trunk/protocols/http2_protocol.h serf/trunk/pump.c serf/trunk/serf_private.h Modified: serf/trunk/outgoing.c URL: http://svn.apache.org/viewvc/serf/trunk/outgoing.c?rev=1715036&r1=1715035&r2=1715036&view=diff ============================================================================== --- serf/trunk/outgoing.c (original) +++ serf/trunk/outgoing.c Wed Nov 18 19:01:54 2015 @@ -777,8 +777,7 @@ static apr_status_t write_to_connection( request_writing_done, request_writing_finished, conn->allocator); - serf_bucket_aggregate_append(conn->pump.ostream_tail, - event_bucket); + serf_pump__add_output(&conn->pump, event_bucket, false); } /* If we got some data, then deliver it. */ Modified: serf/trunk/protocols/fcgi_protocol.c URL: http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.c?rev=1715036&r1=1715035&r2=1715036&view=diff ============================================================================== --- serf/trunk/protocols/fcgi_protocol.c (original) +++ serf/trunk/protocols/fcgi_protocol.c Wed Nov 18 19:01:54 2015 @@ -40,14 +40,12 @@ typedef struct serf_fcgi_protocol_t serf_incoming_t *client; serf_io_baton_t *io; /* Low level connection */ + serf_pump_t *pump; apr_pool_t *pool; serf_bucket_alloc_t *allocator; serf_config_t *config; - serf_bucket_t *stream; - serf_bucket_t *ostream; - serf_fcgi_processor_t processor; void *processor_baton; @@ -175,7 +173,7 @@ static apr_status_t fcgi_process(serf_fc { SERF_FCGI_assert(!fcgi->in_frame); - body = serf__bucket_fcgi_unframe_create(fcgi->stream, + body = serf__bucket_fcgi_unframe_create(fcgi->pump->stream, fcgi->allocator); serf__bucket_fcgi_unframe_set_eof(body, @@ -347,52 +345,9 @@ static apr_status_t fcgi_read(serf_fcgi_ apr_status_t serf_fcgi__enqueue_frame(serf_fcgi_protocol_t *fcgi, serf_bucket_t *frame, - bool pump) + bool flush) { - apr_status_t status; - bool want_write; - - if (!pump && !fcgi->io->dirty_conn) - { - const char *data; - apr_size_t len; - - /* Cheap check to see if we should request a write - event next time around */ - status = serf_bucket_peek(fcgi->ostream, &data, &len); - - if (SERF_BUCKET_READ_ERROR(status)) - { - serf_bucket_destroy(frame); - return status; - } - - if (len == 0) - { - serf_io__set_pollset_dirty(fcgi->io); - } - } - - serf_bucket_aggregate_append(fcgi->ostream, frame); - - if (!pump) - return APR_SUCCESS; - - /* Flush final output buffer (after ssl, etc.) */ - if (fcgi->conn) - status = serf__connection_flush(fcgi->conn, TRUE); - else - status = serf__incoming_client_flush(fcgi->client, TRUE); - - want_write = APR_STATUS_IS_EAGAIN(status); - - if ((want_write && !(fcgi->io->reqevents & APR_POLLOUT)) - || (!want_write && (fcgi->io->reqevents & APR_POLLOUT))) - { - serf_io__set_pollset_dirty(fcgi->io); - } - - return status; + return serf_pump__add_output(fcgi->pump, frame, flush); } static apr_status_t fcgi_write(serf_fcgi_protocol_t *fcgi) @@ -499,9 +454,6 @@ static apr_status_t fcgi_outgoing_read(s { serf_fcgi_protocol_t *fcgi = conn->protocol_baton; - if (!fcgi->stream) - fcgi->stream = conn->pump.stream; - return fcgi_read(fcgi); } @@ -537,8 +489,7 @@ void serf__fcgi_protocol_init(serf_conne fcgi->pool = protocol_pool; fcgi->conn = conn; fcgi->io = &conn->io; - fcgi->stream = conn->pump.stream; - fcgi->ostream = conn->pump.ostream_tail; + fcgi->pump = &conn->pump; fcgi->allocator = conn->allocator; fcgi->config = conn->config; @@ -561,11 +512,6 @@ static apr_status_t fcgi_server_read(ser { serf_fcgi_protocol_t *fcgi = client->protocol_baton; - if (! fcgi->stream) { - fcgi->stream = client->pump.stream; - fcgi->ostream = client->pump.ostream_tail; - } - return fcgi_read(fcgi); } @@ -573,11 +519,6 @@ static apr_status_t fcgi_server_write(se { serf_fcgi_protocol_t *fcgi = client->protocol_baton; - if (!fcgi->stream) { - fcgi->stream = client->pump.stream; - fcgi->ostream = client->pump.ostream_tail; - } - return fcgi_write(fcgi); } @@ -606,8 +547,7 @@ void serf__fcgi_protocol_init_server(ser fcgi->pool = protocol_pool; fcgi->client = client; fcgi->io = &client->io; - fcgi->stream = client->pump.stream; - fcgi->ostream = client->pump.ostream_tail; + fcgi->pump = &client->pump; fcgi->allocator = client->allocator; fcgi->config = client->config; Modified: serf/trunk/protocols/fcgi_protocol.h URL: http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.h?rev=1715036&r1=1715035&r2=1715036&view=diff ============================================================================== --- serf/trunk/protocols/fcgi_protocol.h (original) +++ serf/trunk/protocols/fcgi_protocol.h Wed Nov 18 19:01:54 2015 @@ -196,7 +196,7 @@ apr_status_t serf_fcgi__setup_incoming_r apr_status_t serf_fcgi__enqueue_frame(serf_fcgi_protocol_t *fcgi, serf_bucket_t *frame, - bool pump); + bool flush); void serf_fcgi__close_stream(serf_fcgi_protocol_t *fcgi, serf_fcgi_stream_t *stream); Modified: serf/trunk/protocols/http2_protocol.c URL: http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1715036&r1=1715035&r2=1715036&view=diff ============================================================================== --- serf/trunk/protocols/http2_protocol.c (original) +++ serf/trunk/protocols/http2_protocol.c Wed Nov 18 19:01:54 2015 @@ -122,8 +122,8 @@ struct serf_http2_protocol_t serf_incoming_t *client; serf_io_baton_t *io; /* Low level connection */ + serf_pump_t *pump; - serf_bucket_t *stream, *ostream; serf_bucket_alloc_t *allocator; serf_http2_processor_t processor; @@ -239,8 +239,7 @@ void serf__http2_protocol_init(serf_conn h2->pool = protocol_pool; h2->conn = conn; h2->io = &conn->io; - h2->stream = conn->pump.stream; - h2->ostream = conn->pump.ostream_tail; + h2->pump = &conn->pump; h2->allocator = conn->allocator; h2->config = conn->config; @@ -289,7 +288,7 @@ void serf__http2_protocol_init(serf_conn /* Send the HTTP/2 Connection Preface */ tmp = SERF_BUCKET_SIMPLE_STRING(HTTP2_CONNECTION_PREFIX, h2->allocator); - serf_bucket_aggregate_append(h2->ostream, tmp); + serf_pump__add_output(h2->pump, tmp, false); /* And now a settings frame and a huge window */ { @@ -331,8 +330,7 @@ void serf__http2_protocol_init_server(se h2->pool = protocol_pool; h2->client = client; h2->io = &client->io; - h2->stream = client->pump.stream; - h2->ostream = client->pump.ostream_tail; + h2->pump = &client->pump; h2->allocator = client->allocator; h2->config = client->config; @@ -432,53 +430,9 @@ enqueue_http2_request(serf_http2_protoco apr_status_t serf_http2__enqueue_frame(serf_http2_protocol_t *h2, serf_bucket_t *frame, - int pump) + bool flush) { - apr_status_t status; - bool want_write; - - - if (!pump && !h2->io->dirty_conn) - { - const char *data; - apr_size_t len; - - /* Cheap check to see if we should request a write - event next time around */ - status = serf_bucket_peek(h2->ostream, &data, &len); - - if (SERF_BUCKET_READ_ERROR(status)) - { - serf_bucket_destroy(frame); - return status; - } - - if (len == 0) - { - serf_io__set_pollset_dirty(h2->io); - } - } - - serf_bucket_aggregate_append(h2->ostream, frame); - - if (!pump) - return APR_SUCCESS; - - /* Flush final output buffer (after ssl, etc.) */ - if (h2->conn) - status = serf__connection_flush(h2->conn, TRUE); - else - status = serf__incoming_client_flush(h2->client, TRUE); - - want_write = APR_STATUS_IS_EAGAIN(status); - - if ((want_write && !(h2->io->reqevents & APR_POLLOUT)) - || (!want_write && (h2->io->reqevents & APR_POLLOUT))) - { - serf_io__set_pollset_dirty(h2->io); - } - - return status; + return serf_pump__add_output(h2->pump, frame, flush); } /* Implements serf_bucket_prefix_handler_t. @@ -1072,7 +1026,7 @@ http2_process(serf_http2_protocol_t *h2) SERF_H2_assert(!h2->in_frame); body = serf__bucket_http2_unframe_create( - h2->stream, + h2->pump->stream, h2->rl_max_framesize, h2->allocator); @@ -1650,9 +1604,6 @@ http2_outgoing_read(serf_connection_t *c serf_io__set_pollset_dirty(&conn->io); } - if (h2->stream == NULL) - h2->stream = conn->pump.stream; - status = http2_process(h2); if (!status) @@ -1765,8 +1716,6 @@ http2_incoming_read(serf_incoming_t *cli /* Peek buffer is now empty. Use actual stream */ serf_bucket_destroy(client->proto_peek_bkt); client->proto_peek_bkt = NULL; - - h2->stream = client->pump.stream; } if (APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN) Modified: serf/trunk/protocols/http2_protocol.h URL: http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.h?rev=1715036&r1=1715035&r2=1715036&view=diff ============================================================================== --- serf/trunk/protocols/http2_protocol.h (original) +++ serf/trunk/protocols/http2_protocol.h Wed Nov 18 19:01:54 2015 @@ -157,7 +157,7 @@ typedef apr_status_t (* serf_http2_proce apr_status_t serf_http2__enqueue_frame(serf_http2_protocol_t *h2, serf_bucket_t *frame, - int pump); + bool flush); /* Creates a new stream */ serf_http2_stream_t * Modified: serf/trunk/pump.c URL: http://svn.apache.org/viewvc/serf/trunk/pump.c?rev=1715036&r1=1715035&r2=1715036&view=diff ============================================================================== --- serf/trunk/pump.c (original) +++ serf/trunk/pump.c Wed Nov 18 19:01:54 2015 @@ -293,8 +293,18 @@ apr_status_t serf_pump__write(serf_pump_ return no_more_writes(pump); } - if (status || !fetch_new) + if (status || !fetch_new) { + + /* If we couldn't write everything that we tried, + make sure that we will receive a write event next time */ + if (APR_STATUS_IS_EAGAIN(status) + && !pump->io->dirty_conn + && !(pump->io->reqevents & APR_POLLOUT)) + { + serf_io__set_pollset_dirty(pump->io); + } return status; + } else if (read_status || pump->vec_len || pump->hit_eof) return read_status; @@ -340,3 +350,31 @@ apr_status_t serf_pump__write(serf_pump_ return status; } + +apr_status_t serf_pump__add_output(serf_pump_t *pump, + serf_bucket_t *bucket, + bool flush) +{ + if (!flush + && !pump->io->dirty_conn + && !pump->stop_writing + && !(pump->io->reqevents & APR_POLLOUT) + && !serf_pump__data_pending(pump)) + { + /* If not writing now, + * and not already dirty + * and nothing pending yet + Then mark the pollset dirty to trigger a write */ + + serf_io__set_pollset_dirty(pump->io); + } + + serf_bucket_aggregate_append(pump->ostream_tail, bucket); + + if (!flush) + return APR_SUCCESS; + + /* Flush final output buffer (after ssl, etc.) */ + return serf_pump__write(pump, TRUE); +} + Modified: serf/trunk/serf_private.h URL: http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1715036&r1=1715035&r2=1715036&view=diff ============================================================================== --- serf/trunk/serf_private.h (original) +++ serf/trunk/serf_private.h Wed Nov 18 19:01:54 2015 @@ -762,6 +762,10 @@ void serf_pump__store_ipaddresses_in_con apr_status_t serf_pump__write(serf_pump_t *pump, bool fetch_new); +apr_status_t serf_pump__add_output(serf_pump_t *pump, + serf_bucket_t *bucket, + bool flush); + /* These must always be called as a pair to avoid a memory leak */ void serf_pump__prepare_setup(serf_pump_t *pump); void serf_pump__complete_setup(serf_pump_t *pump,