Author: rhuijben Date: Wed Nov 18 14:45:58 2015 New Revision: 1715005 URL: http://svn.apache.org/viewvc?rev=1715005&view=rev Log: Introduce a 'pump' layer that contains the stream pumping logic that was originally only part of outgoing.c, but is now partially duplicated in incoming.c.
The implementation is currently directly copied (with svn history) from the outgoing connection, but this patch only uses it for the incoming connections yet. * io.c New file copied from outgoing.c. Removing parts that are not necessary and making things serf private where needed. * incoming.c (client_detect_eof): Remove function. (client_connected): Use several pump functions to avoid duplicated code. (http1_enqueue_reponse, perform_peek_protocol, read_from_client): Update usage. (socket_writev, no_more_writes): Remove functions. (serf__incoming_client_flush): Replace implementation with call to serf_pump__write(). (serf_incoming_set_framing_type): Update usage. (serf_incoming_create2): Init pump. (serf__incoming_update_pollset): Use data pending helper. * protocols/fcgi_protocol.c (fcgi_server_read, fcgi_server_write, fcgi_server_teardown): Update usage. * protocols/http2_protocol.c (serf__http2_protocol_init_server, http2_incoming_read): Update usage. * pump.c (pump_cleanup): New function. (serf_pump__init): New function. (data_pending): Turn into... (serf_pump__data_pending): ... this. (detect_eof): Use extended baton. Return final EOF. (do_conn_setup): Split into... (serf_pump__prepare_setup): ... this and (serf_pump__complete_setup): ... this. (serf__connection_flush): Tweak to... (serf_pump__write): ... this. * serf_private.h (serf_pump_io_t): New struct. Added: serf/trunk/pump.c - copied, changed from r1714992, serf/trunk/outgoing.c Modified: serf/trunk/incoming.c serf/trunk/protocols/fcgi_protocol.c serf/trunk/protocols/http2_protocol.c serf/trunk/serf_private.h Modified: serf/trunk/incoming.c URL: http://svn.apache.org/viewvc/serf/trunk/incoming.c?rev=1715005&r1=1715004&r2=1715005&view=diff ============================================================================== --- serf/trunk/incoming.c (original) +++ serf/trunk/incoming.c Wed Nov 18 14:45:58 2015 @@ -28,76 +28,35 @@ #include "serf_private.h" -static apr_status_t client_detect_eof(void *baton, - serf_bucket_t *aggregator) -{ - serf_incoming_t *client = baton; - client->hit_eof = true; - return APR_EAGAIN; -} - static apr_status_t client_connected(serf_incoming_t *client) { /* serf_context_t *ctx = client->ctx; */ apr_status_t status; serf_bucket_t *ostream; - apr_sockaddr_t *sa; - if (apr_socket_addr_get(&sa, APR_LOCAL, client->skt) == APR_SUCCESS) { - char buf[48]; - if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa)) - serf_config_set_stringf(client->config, SERF_CONFIG_CONN_LOCALIP, - "%s:%d", buf, sa->port); - } - if (apr_socket_addr_get(&sa, APR_REMOTE, client->skt) == APR_SUCCESS) { - char buf[48]; - if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa)) - serf_config_set_stringf(client->config, SERF_CONFIG_CONN_REMOTEIP, - "%s:%d", buf, sa->port); - } + serf_pump__store_ipaddresses_in_config(&client->pump); serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config, "socket for client 0x%x connected\n", client); /* ### Connection does auth setup here */ - if (client->ostream_head == NULL) { - client->ostream_head = serf_bucket_aggregate_create(client->allocator); - } - - if (client->ostream_tail == NULL) { - client->ostream_tail = serf_bucket_aggregate_create(client->allocator); - - serf_bucket_aggregate_hold_open(client->ostream_tail, - client_detect_eof, client); - } + serf_pump__prepare_setup(&client->pump); - ostream = client->ostream_tail; + ostream = client->pump.ostream_tail; status = client->setup(client->skt, - &client->stream, + &client->pump.stream, &ostream, client->setup_baton, client->pool); if (status) { - /* extra destroy here since it wasn't added to the head bucket yet. */ - serf_bucket_destroy(client->ostream_tail); + serf_pump__complete_setup(&client->pump, NULL); /* ### Cleanup! (serf__connection_pre_cleanup) */ return status; } - /* Share the configuration with all the buckets in the newly created output - chain (see PLAIN or ENCRYPTED scenario's), including the request buckets - created by the application (ostream_tail will handle this for us). */ - serf_bucket_set_config(client->ostream_head, client->config); - - /* Share the configuration with the ssl_decrypt and socket buckets. The - response buckets wrapping the ssl_decrypt/socket buckets won't get the - config automatically because they are upstream. */ - serf_bucket_set_config(client->stream, client->config); - - serf_bucket_aggregate_append(client->ostream_head, - ostream); + serf_pump__complete_setup(&client->pump, ostream); if (client->framing_type == SERF_CONNECTION_FRAMING_TYPE_NONE) { client->proto_peek_bkt = serf_bucket_aggregate_create( @@ -105,7 +64,7 @@ static apr_status_t client_connected(ser serf_bucket_aggregate_append( client->proto_peek_bkt, - serf_bucket_barrier_create(client->stream, + serf_bucket_barrier_create(client->pump.stream, client->allocator)); } @@ -140,7 +99,7 @@ static apr_status_t http1_enqueue_repons void *enqueue_baton, serf_bucket_t *bucket) { - serf_bucket_aggregate_append(request->incoming->ostream_tail, + serf_bucket_aggregate_append(request->incoming->pump.ostream_tail, serf__bucket_event_create(bucket, request, NULL, @@ -194,7 +153,7 @@ apr_status_t perform_peek_protocol(serf_ if (!peek_data) { - status = serf_bucket_peek(client->stream, &data, &len); + status = serf_bucket_peek(client->pump.stream, &data, &len); if (len > h2prefixlen) len = h2prefixlen; @@ -227,7 +186,7 @@ apr_status_t perform_peek_protocol(serf_ } do { - status = serf_bucket_read(client->stream, + status = serf_bucket_read(client->pump.stream, h2prefixlen - peek_data->read, &data, &len); @@ -314,7 +273,7 @@ static apr_status_t read_from_client(ser client->proto_peek_bkt = NULL; } else - read_bkt = serf_bucket_barrier_create(client->stream, + read_bkt = serf_bucket_barrier_create(client->pump.stream, client->allocator); status = client->req_setup(&rq->req_bkt, read_bkt, rq, @@ -355,7 +314,7 @@ static apr_status_t read_from_client(ser const char *data; apr_size_t len; - status = serf_bucket_peek(client->stream, &data, &len); + status = serf_bucket_peek(client->pump.stream, &data, &len); } } } @@ -390,144 +349,10 @@ static apr_status_t read_from_client(ser return status; } -static apr_status_t socket_writev(serf_incoming_t *client) -{ - apr_size_t written; - apr_status_t status; - - status = apr_socket_sendv(client->skt, client->vec, - client->vec_len, &written); - if (status && !APR_STATUS_IS_EAGAIN(status)) - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config, - "socket_sendv error %d\n", status); - - /* did we write everything? */ - if (written) { - apr_size_t len = 0; - int i; - - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config, - "--- socket_sendv: %d bytes. --\n", written); - - for (i = 0; i < client->vec_len; i++) { - len += client->vec[i].iov_len; - if (written < len) { - serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, client->config, - "%.*s", client->vec[i].iov_len - (len - written), - client->vec[i].iov_base); - if (i) { - memmove(client->vec, &client->vec[i], - sizeof(struct iovec) * (client->vec_len - i)); - client->vec_len -= i; - } - client->vec[0].iov_base = (char *)client->vec[0].iov_base + (client->vec[0].iov_len - (len - written)); - client->vec[0].iov_len = len - written; - break; - } else { - serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, client->config, - "%.*s", - client->vec[i].iov_len, client->vec[i].iov_base); - } - } - if (len == written) { - client->vec_len = 0; - } - serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, client->config, "\n"); - - /* Log progress information */ - serf__context_progress_delta(client->ctx, 0, written); - } - - return status; -} - -static apr_status_t no_more_writes(serf_incoming_t *client) -{ - /* Note that we should hold new requests until we open our new socket. */ - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config, - "stop writing on client 0x%x\n", client); - - /* Clear our iovec. */ - client->vec_len = 0; - - /* Update the pollset to know we don't want to write on this socket any - * more. - */ - serf_io__set_pollset_dirty(&client->io); - return APR_SUCCESS; -} - apr_status_t serf__incoming_client_flush(serf_incoming_t *client, bool pump) { - apr_status_t status = APR_SUCCESS; - apr_status_t read_status = APR_SUCCESS; - serf_bucket_t *ostreamh = client->ostream_head; - - client->hit_eof = FALSE; - - while (status == APR_SUCCESS) { - - /* First try to write out what is already stored in the - connection vecs. */ - while (client->vec_len && !status) { - status = socket_writev(client); - - /* If the write would have blocked, then we're done. - * Don't try to write anything else to the socket. - */ - if (APR_STATUS_IS_EPIPE(status) - || APR_STATUS_IS_ECONNRESET(status) - || APR_STATUS_IS_ECONNABORTED(status)) - return no_more_writes(client); - } - - if (status || !pump) - return status; - else if (read_status || client->vec_len || client->hit_eof) - return read_status; - - /* ### optimize at some point by using read_for_sendfile */ - /* TODO: now that read_iovec will effectively try to return as much - data as available, we probably don't want to read ALL_AVAIL, but - a lower number, like the size of one or a few TCP packets, the - available TCP buffer size ... */ - client->hit_eof = 0; - read_status = serf_bucket_read_iovec(ostreamh, - SERF_READ_ALL_AVAIL, - IOV_MAX, - client->vec, - &client->vec_len); - - if (read_status == SERF_ERROR_WAIT_CONN) { - /* The bucket told us that it can't provide more data until - more data is read from the socket. This normally happens - during a SSL handshake. - - We should avoid looking for writability for a while so - that (hopefully) something will appear in the bucket so - we can actually write something. otherwise, we could - end up in a CPU spin: socket wants something, but we - don't have anything (and keep returning EAGAIN) */ - client->stop_writing = true; - serf_io__set_pollset_dirty(&client->io); - - read_status = APR_EAGAIN; - } - else if (APR_STATUS_IS_EAGAIN(read_status)) { - - /* We read some stuff, but did we read everything ? */ - if (client->hit_eof) - read_status = APR_SUCCESS; - } - else if (SERF_BUCKET_READ_ERROR(read_status)) { - - /* Something bad happened. Propagate any errors. */ - return read_status; - } - } - - return status; + return serf_pump__write(&client->pump, pump); } static apr_status_t write_to_client(serf_incoming_t *client) @@ -561,7 +386,7 @@ void serf_incoming_set_framing_type( if (client->skt) { serf_io__set_pollset_dirty(&client->io); - client->stop_writing = 0; + client->pump.stop_writing = false; /* Close down existing protocol */ if (client->protocol_baton && client->perform_teardown) { @@ -745,10 +570,16 @@ apr_status_t serf_incoming_create2( ic->closed = closed; ic->closed_baton = closed_baton; - /* A bucket wrapped around our socket (for reading responses). */ - ic->stream = NULL; - ic->ostream_head = NULL; - ic->ostream_tail = NULL; + /* Store the connection specific info in the configuration store */ + rv = serf__config_store_get_client_config(ctx, ic, &config, pool); + if (rv) { + apr_pool_destroy(ic->pool); + return rv; + } + ic->config = config; + + /* Prepare wrapping the socket with buckets. */ + serf_pump__init(&ic->pump, &ic->io, ic->skt, config, ic->allocator, ic->pool); ic->protocol_baton = NULL; ic->perform_read = read_from_client; @@ -762,14 +593,6 @@ apr_status_t serf_incoming_create2( ic->desc.reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP; ic->seen_in_pollset = 0; - /* Store the connection specific info in the configuration store */ - rv = serf__config_store_get_client_config(ctx, ic, &config, pool); - if (rv) { - apr_pool_destroy(ic->pool); - return rv; - } - ic->config = config; - rv = ctx->pollset_add(ctx->pollset_baton, &ic->desc, &ic->io); @@ -928,38 +751,8 @@ apr_status_t serf__incoming_update_polls But it also has the nice side effect of removing references from the aggregate to requests that are done. */ - if (client->vec_len) { - /* We still have vecs in the connection, which lifetime is - managed by buckets inside client->ostream_head. - - Don't touch ostream as that might destroy the vecs */ - - data_waiting = true; - } - else { - serf_bucket_t *ostream; - - ostream = client->ostream_head; - if (ostream) { - const char *dummy_data; - apr_size_t len; - - status = serf_bucket_peek(ostream, &dummy_data, &len); - - if (SERF_BUCKET_READ_ERROR(status) || len > 0) { - /* DATA or error waiting */ - data_waiting = TRUE; /* Error waiting */ - } - else if (! status || APR_STATUS_IS_EOF(status)) { - data_waiting = FALSE; - } - else - data_waiting = FALSE; /* EAGAIN / EOF / WAIT_CONN */ - } - else - data_waiting = FALSE; - } + data_waiting = serf_pump__data_pending(&client->pump); if (data_waiting) { desc.reqevents |= APR_POLLOUT; Modified: serf/trunk/protocols/fcgi_protocol.c URL: http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.c?rev=1715005&r1=1715004&r2=1715005&view=diff ============================================================================== --- serf/trunk/protocols/fcgi_protocol.c (original) +++ serf/trunk/protocols/fcgi_protocol.c Wed Nov 18 14:45:58 2015 @@ -562,8 +562,8 @@ static apr_status_t fcgi_server_read(ser serf_fcgi_protocol_t *fcgi = client->protocol_baton; if (! fcgi->stream) { - fcgi->stream = client->stream; - fcgi->ostream = client->ostream_tail; + fcgi->stream = client->pump.stream; + fcgi->ostream = client->pump.ostream_tail; } return fcgi_read(fcgi); @@ -574,8 +574,8 @@ static apr_status_t fcgi_server_write(se serf_fcgi_protocol_t *fcgi = client->protocol_baton; if (!fcgi->stream) { - fcgi->stream = client->stream; - fcgi->ostream = client->ostream_tail; + fcgi->stream = client->pump.stream; + fcgi->ostream = client->pump.ostream_tail; } return fcgi_write(fcgi); @@ -606,8 +606,8 @@ void serf__fcgi_protocol_init_server(ser fcgi->pool = protocol_pool; fcgi->client = client; fcgi->io = &client->io; - fcgi->stream = client->stream; - fcgi->ostream = client->ostream_tail; + fcgi->stream = client->pump.stream; + fcgi->ostream = client->pump.ostream_tail; fcgi->allocator = client->allocator; fcgi->config = client->config; Modified: serf/trunk/protocols/http2_protocol.c URL: http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1715005&r1=1715004&r2=1715005&view=diff ============================================================================== --- serf/trunk/protocols/http2_protocol.c (original) +++ serf/trunk/protocols/http2_protocol.c Wed Nov 18 14:45:58 2015 @@ -331,8 +331,8 @@ void serf__http2_protocol_init_server(se h2->pool = protocol_pool; h2->client = client; h2->io = &client->io; - h2->stream = client->stream; - h2->ostream = client->ostream_tail; + h2->stream = client->pump.stream; + h2->ostream = client->pump.ostream_tail; h2->allocator = client->allocator; h2->config = client->config; @@ -1728,9 +1728,9 @@ http2_incoming_read(serf_incoming_t *cli /* If the stop_writing flag was set on the connection, reset it now because there is some data to read. */ - if (client->stop_writing) + if (client->pump.stop_writing) { - client->stop_writing = 0; + client->pump.stop_writing = false; serf_io__set_pollset_dirty(&client->io); } @@ -1740,7 +1740,7 @@ http2_incoming_read(serf_incoming_t *cli if (client->proto_peek_bkt) stream = client->proto_peek_bkt; else - stream = client->stream; + stream = client->pump.stream; do { const char *data; @@ -1766,7 +1766,7 @@ http2_incoming_read(serf_incoming_t *cli serf_bucket_destroy(client->proto_peek_bkt); client->proto_peek_bkt = NULL; - h2->stream = client->stream; + h2->stream = client->pump.stream; } if (APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN) Copied: serf/trunk/pump.c (from r1714992, serf/trunk/outgoing.c) URL: http://svn.apache.org/viewvc/serf/trunk/pump.c?p2=serf/trunk/pump.c&p1=serf/trunk/outgoing.c&r1=1714992&r2=1715005&rev=1715005&view=diff ============================================================================== --- serf/trunk/outgoing.c (original) +++ serf/trunk/pump.c Wed Nov 18 14:45:58 2015 @@ -29,353 +29,111 @@ #include "serf_private.h" -/* forward definitions */ -static apr_status_t read_from_connection(serf_connection_t *conn); -static apr_status_t write_to_connection(serf_connection_t *conn); -static apr_status_t hangup_connection(serf_connection_t *conn); - -#define REQS_IN_PROGRESS(conn) \ - ((conn)->completed_requests - (conn)->completed_responses) - -/* cleanup for sockets */ -static apr_status_t clean_skt(void *data) +apr_status_t pump_cleanup(void *baton) { - serf_connection_t *conn = data; - apr_status_t status = APR_SUCCESS; + serf_pump_t *pump = baton; - if (conn->skt) { - status = apr_socket_close(conn->skt); - conn->skt = NULL; - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, - "closed socket, status %d\n", status); - serf_config_remove_value(conn->config, SERF_CONFIG_CONN_LOCALIP); - serf_config_remove_value(conn->config, SERF_CONFIG_CONN_REMOTEIP); + if (pump->ostream_head != NULL) { +#ifdef SERF_DEBUG_BUCKET_USE + serf__bucket_drain(conn->ostream_head); +#endif + serf_bucket_destroy(pump->ostream_head); + pump->ostream_head = NULL; + pump->ostream_tail = NULL; } - return status; + return APR_SUCCESS; } -/* cleanup for conns */ -static apr_status_t clean_conn(void *data) -{ - serf_connection_t *conn = data; - - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, - "cleaning up connection 0x%x\n", conn); - serf_connection_close(conn); +void serf_pump__init(serf_pump_t *pump, + serf_io_baton_t *io, + apr_socket_t *skt, + serf_config_t *config, + serf_bucket_alloc_t *allocator, + apr_pool_t *pool) +{ + memset(pump, 0, sizeof(*pump)); + + pump->io = io; + pump->allocator = allocator; + pump->config = config; + pump->skt = skt; - return APR_SUCCESS; + apr_pool_cleanup_register(pool, pump, pump_cleanup, + apr_pool_cleanup_null); } /* Safely check if there is still data pending on the connection, carefull to not accidentally make it invalid. */ -static int -data_pending(serf_connection_t *conn) +bool serf_pump__data_pending(serf_pump_t *pump) { - if (conn->vec_len > 0) + if (pump->vec_len > 0) return TRUE; /* We can't poll right now! */ - if (conn->ostream_head) { - const char *dummy; + if (pump->ostream_head) { + const char *data; apr_size_t len; apr_status_t status; - status = serf_bucket_peek(conn->ostream_head, &dummy, - &len); + status = serf_bucket_peek(pump->ostream_head, &data, &len); if (!SERF_BUCKET_READ_ERROR(status)) { if (len > 0) { - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, + serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, pump->config, "Extra data to be written after sending complete " "requests.\n"); - return TRUE; + return true; } } else - return TRUE; /* Sure, we have data (an error) */ - } - - return FALSE; -} - -static int -request_pending(serf_request_t **next_req, serf_connection_t *conn) -{ - /* Prepare the next request */ - if (conn->framing_type != SERF_CONNECTION_FRAMING_TYPE_NONE - && (conn->pipelining || (!conn->pipelining && REQS_IN_PROGRESS(conn) == 0))) - { - /* Skip all requests that have been written completely but we're still - waiting for a response. */ - serf_request_t *request = conn->unwritten_reqs; - - if (next_req) - *next_req = request; - - if (request != NULL) { - return TRUE; - } + return true; /* Sure, we have data (an error) */ } - else if (next_req) - *next_req = NULL; - - return FALSE; -} - -/* Check if there is data waiting to be sent over the socket. This can happen - in two situations: - - The connection queue has atleast one request with unwritten data. - - All requests are written and the ssl layer wrote some data while reading - the response. This can happen when the server triggers a renegotiation, - e.g. after the first and only request on that connection was received. - Returns 1 if data is pending on CONN, NULL if not. - If NEXT_REQ is not NULL, it will be filled in with the next available request - with unwritten data. */ -static int -request_or_data_pending(serf_request_t **next_req, serf_connection_t *conn) -{ - if (request_pending(next_req, conn)) - return TRUE; - - return data_pending(conn); -} - -/* Update the pollset for this connection. We tweak the pollset based on - * whether we want to read and/or write, given conditions within the - * connection. If the connection is not (yet) in the pollset, then it - * will be added. - */ -apr_status_t serf__conn_update_pollset(serf_connection_t *conn) -{ - serf_context_t *ctx = conn->ctx; - apr_status_t status; - apr_pollfd_t desc = { 0 }; - int data_waiting; - if (!conn->skt) { - return APR_SUCCESS; - } - - /* Remove the socket from the poll set. */ - desc.desc_type = APR_POLL_SOCKET; - desc.desc.s = conn->skt; - desc.reqevents = conn->io.reqevents; - - status = ctx->pollset_rm(ctx->pollset_baton, - &desc, &conn->io); - if (status && !APR_STATUS_IS_NOTFOUND(status)) - return status; - - /* Now put it back in with the correct read/write values. */ - desc.reqevents = APR_POLLHUP | APR_POLLERR; - - /* If we are not connected yet, we just want to know when we are */ - if (conn->wait_for_connect) { - data_waiting = TRUE; - desc.reqevents |= APR_POLLOUT; - } - else { - /* Directly look at the connection data. While this may look - more expensive than the cheap checks later this peek is - just checking a bit of ram. - - But it also has the nice side effect of removing references - from the aggregate to requests that are done. - */ - if (conn->vec_len) { - /* We still have vecs in the connection, which lifetime is - managed by buckets inside conn->ostream_head. - - Don't touch ostream as that might destroy the vecs */ - - data_waiting = (conn->state != SERF_CONN_CLOSING); - } - else { - serf_bucket_t *ostream; - - ostream = conn->ostream_head; - - if (!ostream) - ostream = conn->ssltunnel_ostream; - - if (ostream) { - const char *dummy_data; - apr_size_t len; - - status = serf_bucket_peek(ostream, &dummy_data, &len); - - if (SERF_BUCKET_READ_ERROR(status) || len > 0) { - /* DATA or error waiting */ - data_waiting = TRUE; /* Error waiting */ - } - else if (! status || APR_STATUS_IS_EOF(status)) { - data_waiting = FALSE; - } - else - data_waiting = FALSE; /* EAGAIN / EOF / WAIT_CONN */ - } - else - data_waiting = FALSE; - } - - if (data_waiting) { - desc.reqevents |= APR_POLLOUT; - } - } - - if ((conn->written_reqs || conn->unwritten_reqs) && - conn->state != SERF_CONN_INIT) { - /* If there are any outstanding events, then we want to read. */ - /* ### not true. we only want to read IF we have sent some data */ - desc.reqevents |= APR_POLLIN; - - /* Don't write if OpenSSL told us that it needs to read data first. */ - if (! conn->stop_writing && !data_waiting) { - - /* This check is duplicated in write_to_connection() */ - if ((conn->probable_keepalive_limit && - conn->completed_requests > conn->probable_keepalive_limit) || - (conn->max_outstanding_requests && - REQS_IN_PROGRESS(conn) >= conn->max_outstanding_requests)) { - - /* we wouldn't try to write any way right now. */ - } - else if (request_pending(NULL, conn)) { - desc.reqevents |= APR_POLLOUT; - } - } - } - - /* If we can have async responses, always look for something to read. */ - if (conn->framing_type != SERF_CONNECTION_FRAMING_TYPE_HTTP1 - || conn->async_responses) - { - desc.reqevents |= APR_POLLIN; - } - - /* save our reqevents, so we can pass it in to remove later. */ - conn->io.reqevents = desc.reqevents; - - /* Note: even if we don't want to read/write this socket, we still - * want to poll it for hangups and errors. - */ - return ctx->pollset_add(ctx->pollset_baton, - &desc, &conn->io); + return false; } -#ifdef SERF_DEBUG_BUCKET_USE - -/* Make sure all response buckets were drained. */ -static void check_buckets_drained(serf_connection_t *conn) +static apr_status_t detect_eof(void *baton, serf_bucket_t *aggregate_bucket) { - serf_request_t *request = conn->written_reqs; + serf_pump_t *pump = baton; + pump->hit_eof = true; - for ( ; request ; request = request->next ) { - if (request->resp_bkt != NULL) { - /* ### crap. can't do this. this allocator may have un-drained - * ### REQUEST buckets. - */ - /* serf_debug__entered_loop(request->resp_bkt->allocator); */ - /* ### for now, pretend we closed the conn (resets the tracking) */ - serf_debug__closed_conn(request->resp_bkt->allocator); - } + if (pump->done_writing) { + pump->ostream_tail = NULL; + return APR_EOF; } + else + return APR_EAGAIN; } -#endif - -/* Destroys all outstanding write information, to allow cleanup of subpools - that may still have data in these buckets to continue */ -void serf__connection_pre_cleanup(serf_connection_t *conn) +void serf_pump__prepare_setup(serf_pump_t *pump) { - serf_request_t *rq; - conn->vec_len = 0; - - if (conn->ostream_head != NULL) { -#ifdef SERF_DEBUG_BUCKET_USE - serf__bucket_drain(conn->ostream_head); -#endif - serf_bucket_destroy(conn->ostream_head); - conn->ostream_head = NULL; - conn->ostream_tail = NULL; - } - if (conn->ssltunnel_ostream != NULL) { - serf_bucket_destroy(conn->ssltunnel_ostream); - conn->ssltunnel_ostream = NULL; + if (pump->ostream_head == NULL) { + pump->ostream_head = serf_bucket_aggregate_create(pump->allocator); } - /* Tell all written request that they are free to destroy themselves */ - rq = conn->written_reqs; - while (rq != NULL) { - if (rq->writing == SERF_WRITING_STARTED - || rq->writing == SERF_WRITING_DONE) { + if (pump->ostream_tail == NULL) { + pump->ostream_tail = serf_bucket_aggregate_create(pump->allocator); - rq->writing = SERF_WRITING_FINISHED; - } - rq = rq->next; + serf_bucket_aggregate_hold_open(pump->ostream_tail, detect_eof, pump); } - - /* Destroy the requests that were queued up to destroy later */ - while ((rq = conn->done_reqs)) { - conn->done_reqs = rq->next; - - rq->writing = SERF_WRITING_FINISHED; - serf__destroy_request(rq); - } - conn->done_reqs = conn->done_reqs_tail = NULL; } -static apr_status_t detect_eof(void *baton, serf_bucket_t *aggregate_bucket) +void serf_pump__complete_setup(serf_pump_t *pump, + serf_bucket_t *ostream) { - serf_connection_t *conn = baton; - conn->hit_eof = 1; - return APR_EAGAIN; -} - -static apr_status_t do_conn_setup(serf_connection_t *conn) -{ - apr_status_t status; - serf_bucket_t *ostream; - - /* ### dunno what the hell this is about. this latency stuff got - ### added, and who knows whether it should stay... */ - conn->latency = apr_time_now() - conn->connect_time; - - if (conn->ostream_head == NULL) { - conn->ostream_head = serf_bucket_aggregate_create(conn->allocator); - } - - if (conn->ostream_tail == NULL) { - conn->ostream_tail = serf_bucket_aggregate_create(conn->allocator); - - serf_bucket_aggregate_hold_open(conn->ostream_tail, detect_eof, conn); - } - - ostream = conn->ostream_tail; - - status = (*conn->setup)(conn->skt, - &conn->stream, - &ostream, - conn->setup_baton, - conn->pool); - if (status) { - /* extra destroy here since it wasn't added to the head bucket yet. */ - serf_bucket_destroy(conn->ostream_tail); - serf__connection_pre_cleanup(conn); - return status; - } + if (ostream) + serf_bucket_aggregate_append(pump->ostream_head, ostream); + else + serf_bucket_aggregate_append(pump->ostream_head, pump->ostream_tail); /* Share the configuration with all the buckets in the newly created output chain (see PLAIN or ENCRYPTED scenario's), including the request buckets created by the application (ostream_tail will handle this for us). */ - serf_bucket_set_config(conn->ostream_head, conn->config); + serf_bucket_set_config(pump->ostream_head, pump->config); /* Share the configuration with the ssl_decrypt and socket buckets. The response buckets wrapping the ssl_decrypt/socket buckets won't get the config automatically because they are upstream. */ - serf_bucket_set_config(conn->stream, conn->config); - - serf_bucket_aggregate_append(conn->ostream_head, - ostream); + serf_bucket_set_config(pump->stream, pump->config); /* We typically have one of two scenarios, based on whether the application decided to encrypt this connection: @@ -394,381 +152,53 @@ static apr_status_t do_conn_setup(serf_c where STREAM is an internal variant of AGGREGATE. */ - - return status; } -/* Set up the input and output stream buckets. - When a tunnel over an http proxy is needed, create a socket bucket and - empty aggregate bucket for sending and receiving unencrypted requests - over the socket. - - After the tunnel is there, or no tunnel was needed, ask the application - to create the input and output buckets, which should take care of the - [en/de]cryption. - */ - -static apr_status_t prepare_conn_streams(serf_connection_t *conn, - serf_bucket_t **ostreamt, - serf_bucket_t **ostreamh) +void serf_pump__store_ipaddresses_in_config(serf_pump_t *pump) { - apr_status_t status; + apr_sockaddr_t *sa; - /* Do we need a SSL tunnel first? */ - if (conn->state == SERF_CONN_CONNECTED) { - /* If the connection does not have an associated bucket, then - * call the setup callback to get one. - */ - if (conn->stream == NULL) { - status = do_conn_setup(conn); - if (status) { - return status; - } - } - *ostreamt = conn->ostream_tail; - *ostreamh = conn->ostream_head; - } else if (conn->state == SERF_CONN_SETUP_SSLTUNNEL) { - - /* SSL tunnel needed and not set up yet, get a direct unencrypted - stream for this socket */ - if (conn->stream == NULL) { - conn->stream = serf_context_bucket_socket_create(conn->ctx, - conn->skt, - conn->allocator); - } - - /* Don't create the ostream bucket chain including the ssl_encrypt - bucket yet. This ensure the CONNECT request is sent unencrypted - to the proxy. */ - *ostreamt = *ostreamh = conn->ssltunnel_ostream; - } else { - /* SERF_CONN_CLOSING or SERF_CONN_INIT */ - - *ostreamt = conn->ostream_tail; - *ostreamh = conn->ostream_head; - } - - return APR_SUCCESS; -} - -static void store_ipaddresses_in_config(serf_config_t *config, - apr_socket_t *skt) -{ - apr_sockaddr_t *sa; - - if (apr_socket_addr_get(&sa, APR_LOCAL, skt) == APR_SUCCESS) { + if (apr_socket_addr_get(&sa, APR_LOCAL, pump->skt) == APR_SUCCESS) { char buf[48]; if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa)) - serf_config_set_stringf(config, SERF_CONFIG_CONN_LOCALIP, + serf_config_set_stringf(pump->config, SERF_CONFIG_CONN_LOCALIP, "%s:%d", buf, sa->port); } - if (apr_socket_addr_get(&sa, APR_REMOTE, skt) == APR_SUCCESS) { + if (apr_socket_addr_get(&sa, APR_REMOTE, pump->skt) == APR_SUCCESS) { char buf[48]; if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa)) - serf_config_set_stringf(config, SERF_CONFIG_CONN_REMOTEIP, + serf_config_set_stringf(pump->config, SERF_CONFIG_CONN_REMOTEIP, "%s:%d", buf, sa->port); } } -static apr_status_t connect_connection(serf_connection_t *conn) -{ - serf_context_t *ctx = conn->ctx; - apr_status_t status; - - store_ipaddresses_in_config(conn->config, conn->skt); - - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, - "socket for conn 0x%x connected\n", conn); - - /* If the authentication was already started on another connection, - prepare this connection (it might be possible to skip some - part of the handshaking). */ - if (ctx->proxy_address) { - status = serf__auth_setup_connection(PROXY, conn); - if (status) { - return status; - } - } - - status = serf__auth_setup_connection(HOST, conn); - if (status) - return status; - - /* Does this connection require a SSL tunnel over the proxy? */ - if (ctx->proxy_address && strcmp(conn->host_info.scheme, "https") == 0) - serf__ssltunnel_connect(conn); - else { - conn->state = SERF_CONN_CONNECTED; - status = do_conn_setup(conn); - } - - return APR_SUCCESS; -} - -/* Create and connect sockets for any connections which don't have them - * yet. This is the core of our lazy-connect behavior. - */ -apr_status_t serf__open_connections(serf_context_t *ctx) -{ - int i; - - for (i = ctx->conns->nelts; i--; ) { - serf_connection_t *conn = GET_CONN(ctx, i); - apr_status_t status; - apr_socket_t *skt; - - conn->seen_in_pollset = 0; - - if (conn->skt != NULL) { -#ifdef SERF_DEBUG_BUCKET_USE - check_buckets_drained(conn); -#endif - continue; - } - - /* Delay opening until we have something to deliver! */ - if (conn->unwritten_reqs == NULL) { - continue; - } - - apr_pool_clear(conn->skt_pool); - status = apr_socket_create(&skt, conn->address->family, - SOCK_STREAM, -#if APR_MAJOR_VERSION > 0 - APR_PROTO_TCP, -#endif - conn->skt_pool); - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, - "created socket for conn 0x%x, status %d\n", conn, status); - if (status != APR_SUCCESS) - return status; - - apr_pool_cleanup_register(conn->skt_pool, conn, clean_skt, - apr_pool_cleanup_null); - - /* Set the socket to be non-blocking */ - if ((status = apr_socket_timeout_set(skt, 0)) != APR_SUCCESS) - return status; - - /* Disable Nagle's algorithm */ - if ((status = apr_socket_opt_set(skt, - APR_TCP_NODELAY, 1)) != APR_SUCCESS) - return status; - - /* Configured. Store it into the connection now. */ - conn->skt = skt; - - /* Remember time when we started connecting to server to calculate - network latency. */ - conn->connect_time = apr_time_now(); - - /* Now that the socket is set up, let's connect it. This should - * return immediately. - */ - status = apr_socket_connect(skt, conn->address); - if (status != APR_SUCCESS) { - if (!APR_STATUS_IS_EINPROGRESS(status)) - return status; - - /* Keep track of when we really connect */ - conn->wait_for_connect = TRUE; - } - - status = serf_config_set_string(conn->config, - SERF_CONFIG_CONN_PIPELINING, - (conn->max_outstanding_requests != 1 && - conn->pipelining == 1) ? "Y" : "N"); - if (status) - return status; - - /* Flag our pollset as dirty now that we have a new socket. */ - serf_io__set_pollset_dirty(&conn->io); - - if (! conn->wait_for_connect) { - status = connect_connection(conn); - - if (status) - return status; - } - } - - return APR_SUCCESS; -} - -static apr_status_t no_more_writes(serf_connection_t *conn) +static apr_status_t no_more_writes(serf_pump_t *pump) { /* Note that we should hold new requests until we open our new socket. */ - conn->state = SERF_CONN_CLOSING; - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, - "stop writing on conn 0x%x\n", conn); + pump->done_writing = true; + serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, pump->config, + "stop writing on 0x%x\n", pump->io->u.conn); /* Clear our iovec. */ - conn->vec_len = 0; + pump->vec_len = 0; /* Update the pollset to know we don't want to write on this socket any * more. */ - serf_io__set_pollset_dirty(&conn->io); - return APR_SUCCESS; -} - -/* Read the 'Connection' header from the response. Return SERF_ERROR_CLOSING if - * the header contains value 'close' indicating the server is closing the - * connection right after this response. - * Otherwise returns APR_SUCCESS. - */ -static apr_status_t is_conn_closing(serf_bucket_t *response) -{ - serf_bucket_t *hdrs; - const char *val; - - hdrs = serf_bucket_response_get_headers(response); - val = serf_bucket_headers_get(hdrs, "Connection"); - if (val && strcasecmp("close", val) == 0) - { - return SERF_ERROR_CLOSING; - } - - return APR_SUCCESS; -} - -static apr_status_t remove_connection(serf_context_t *ctx, - serf_connection_t *conn) -{ - apr_pollfd_t desc = { 0 }; - - desc.desc_type = APR_POLL_SOCKET; - desc.desc.s = conn->skt; - desc.reqevents = conn->io.reqevents; - - return ctx->pollset_rm(ctx->pollset_baton, - &desc, &conn->io); -} - -/* A socket was closed, inform the application. */ -static void handle_conn_closed(serf_connection_t *conn, apr_status_t status) -{ - (*conn->closed)(conn, conn->closed_baton, status, - conn->pool); -} - -static apr_status_t reset_connection(serf_connection_t *conn, - int requeue_requests) -{ - serf_context_t *ctx = conn->ctx; - apr_status_t status; - serf_request_t *old_reqs; - - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, - "reset connection 0x%x\n", conn); - - conn->probable_keepalive_limit = conn->completed_responses; - conn->completed_requests = 0; - conn->completed_responses = 0; - - /* Clear the unwritten_reqs queue, so the application can requeue cancelled - requests on it for the new socket. */ - old_reqs = conn->unwritten_reqs; - conn->unwritten_reqs = NULL; - conn->unwritten_reqs_tail = NULL; - - serf__connection_pre_cleanup(conn); - - /* First, cancel all written requests for which we haven't received a - response yet. Inform the application that the request is cancelled, - so it can requeue them if needed. */ - while (conn->written_reqs) { - serf__cancel_request(conn->written_reqs, &conn->written_reqs, - requeue_requests); - } - conn->written_reqs_tail = NULL; - - /* Handle all outstanding unwritten requests. - TODO: what about a partially written request? */ - while (old_reqs) { - /* If we haven't started to write the connection, bring it over - * unchanged to our new socket. - * Do not copy a CONNECT request to the new connection, the ssl tunnel - * setup code will create a new CONNECT request already. - */ - if (requeue_requests && (old_reqs->writing == SERF_WRITING_NONE) && - !old_reqs->ssltunnel) { - - serf_request_t *req = old_reqs; - old_reqs = old_reqs->next; - req->next = NULL; - serf__link_requests(&conn->unwritten_reqs, - &conn->unwritten_reqs_tail, - req); - } - else { - /* We don't want to requeue the request or this request was partially - written. Inform the application that the request is cancelled. */ - serf__cancel_request(old_reqs, &old_reqs, requeue_requests); - } - } - - /* Requests queue has been prepared for a new socket, close the old one. */ - if (conn->skt != NULL) { - remove_connection(ctx, conn); - status = clean_skt(conn); - if (conn->closed != NULL) { - handle_conn_closed(conn, status); - } - } - - if (conn->stream != NULL) { - serf_bucket_destroy(conn->stream); - conn->stream = NULL; - } - - /* Don't try to resume any writes */ - conn->vec_len = 0; - - serf_io__set_pollset_dirty(&conn->io); - conn->state = SERF_CONN_INIT; - - conn->hit_eof = 0; - conn->connect_time = 0; - conn->latency = -1; - conn->stop_writing = 0; - conn->write_now = 0; - /* conn->pipelining */ - - conn->framing_type = SERF_CONNECTION_FRAMING_TYPE_HTTP1; - - if (conn->protocol_baton) { - conn->perform_teardown(conn); - conn->protocol_baton = NULL; - } - - conn->perform_read = read_from_connection; - conn->perform_write = write_to_connection; - conn->perform_hangup = hangup_connection; - conn->perform_teardown = NULL; - - conn->status = APR_SUCCESS; - - /* Let our context know that we've 'reset' the socket already. */ - conn->seen_in_pollset |= APR_POLLHUP; - - /* Recalculate the current list length */ - conn->nr_of_written_reqs = 0; - conn->nr_of_unwritten_reqs = serf__req_list_length(conn->unwritten_reqs); - - /* Found the connection. Closed it. All done. */ + serf_io__set_pollset_dirty(pump->io); return APR_SUCCESS; } -static apr_status_t socket_writev(serf_connection_t *conn) +static apr_status_t socket_writev(serf_pump_t *pump) { apr_size_t written; apr_status_t status; + serf_pump_t *conn = pump; - status = apr_socket_sendv(conn->skt, conn->vec, - conn->vec_len, &written); + status = apr_socket_sendv(pump->skt, pump->vec, + pump->vec_len, &written); if (status && !APR_STATUS_IS_EAGAIN(status)) - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, + serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, pump->config, "socket_sendv error %d\n", status); /* did we write everything? */ @@ -805,18 +235,18 @@ static apr_status_t socket_writev(serf_c serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config, "\n"); /* Log progress information */ - serf__context_progress_delta(conn->ctx, 0, written); + serf__context_progress_delta(conn->io->ctx, 0, written); } return status; } -apr_status_t serf__connection_flush(serf_connection_t *conn, - int pump) +apr_status_t serf_pump__write(serf_pump_t *pump, + bool fetch_new) { apr_status_t status = APR_SUCCESS; apr_status_t read_status = APR_SUCCESS; - serf_bucket_t *ostreamh = NULL; + serf_pump_t *const conn = pump; conn->hit_eof = FALSE; @@ -841,23 +271,13 @@ apr_status_t serf__connection_flush(serf else if (read_status || conn->vec_len || conn->hit_eof) return read_status; - /* Ok, with the vecs written, we can now refill the per connection - output vecs */ - if (!ostreamh) { - serf_bucket_t *ostreamt; - - status = prepare_conn_streams(conn, &ostreamt, &ostreamh); - if (status) - return status; - } - /* ### optimize at some point by using read_for_sendfile */ /* TODO: now that read_iovec will effectively try to return as much data as available, we probably don't want to read ALL_AVAIL, but a lower number, like the size of one or a few TCP packets, the available TCP buffer size ... */ conn->hit_eof = 0; - read_status = serf_bucket_read_iovec(ostreamh, + read_status = serf_bucket_read_iovec(pump->ostream_head, SERF_READ_ALL_AVAIL, IOV_MAX, conn->vec, @@ -873,8 +293,8 @@ apr_status_t serf__connection_flush(serf we can actually write something. otherwise, we could end up in a CPU spin: socket wants something, but we don't have anything (and keep returning EAGAIN) */ - conn->stop_writing = 1; - serf_io__set_pollset_dirty(&conn->io); + conn->stop_writing = true; + serf_io__set_pollset_dirty(conn->io); read_status = APR_EAGAIN; } @@ -894,936 +314,3 @@ apr_status_t serf__connection_flush(serf return status; } -/* Implements serf_bucket_event_callback_t and is called (potentially - more than once) after the request buckets are completely read. - - At this time we know the request is written, but we can't destroy - the buckets yet as they might still be referenced by the connection - vecs. */ -static apr_status_t request_writing_done(void *baton, - apr_uint64_t bytes_read) -{ - serf_request_t *request = baton; - - if (request->writing == SERF_WRITING_STARTED) { - request->writing = SERF_WRITING_DONE; - - /* TODO: Handle request done */ - } - return APR_EOF; /* Done with the event bucket */ -} - - -/* Implements serf_bucket_event_callback_t and is called after the - request buckets are no longer needed. More precisely the outgoing - buckets are already destroyed. */ -static apr_status_t request_writing_finished(void *baton, - apr_uint64_t bytes_read) -{ - serf_request_t *request = baton; - serf_connection_t *conn = request->conn; - - request->req_bkt = NULL; /* Bucket is destroyed by now */ - - if (request->writing == SERF_WRITING_DONE) { - request->writing = SERF_WRITING_FINISHED; - - /* Move the request to the written queue */ - serf__link_requests(&conn->written_reqs, &conn->written_reqs_tail, - request); - conn->nr_of_written_reqs++; - conn->unwritten_reqs = conn->unwritten_reqs->next; - conn->nr_of_unwritten_reqs--; - request->next = NULL; - - /* If our connection has async responses enabled, we're not - * going to get a reply back, so kill the request. - */ - if (conn->async_responses) { - conn->unwritten_reqs = request->next; - conn->nr_of_unwritten_reqs--; - serf__destroy_request(request); - } - - conn->completed_requests++; - } - /* Destroy (all) requests that are now safe to destroy, - Typically non or just the finished one */ - { - serf_request_t *last = NULL; - serf_request_t **rq = &conn->done_reqs; - while (*rq) { - request = *rq; - if ((*rq)->writing == SERF_WRITING_FINISHED) { - request = *rq; - *rq = request->next; - serf__destroy_request(request); - } - else { - last = *rq; - rq = &last->next; - } - } - - conn->done_reqs_tail = last; - } - - return APR_EOF; /* Done with event bucket. Status is ignored */ -} - -/* write data out to the connection */ -static apr_status_t write_to_connection(serf_connection_t *conn) -{ - /* Keep reading and sending until we run out of stuff to read, or - * writing would block. - */ - while (1) { - serf_request_t *request; - apr_status_t status; - apr_status_t read_status; - serf_bucket_t *ostreamt; - serf_bucket_t *ostreamh; - - /* If we have unwritten data in iovecs, then write what we can - directly. */ - status = serf__connection_flush(conn, FALSE); - if (APR_STATUS_IS_EAGAIN(status)) - return APR_SUCCESS; - else if (status) - return status; - - /* If we're setting up an ssl tunnel, we can't send real requests - as yet, as they need to be encrypted and our encrypt buckets - aren't created yet as we still need to read the unencrypted - response of the CONNECT request. */ - if (conn->state == SERF_CONN_SETUP_SSLTUNNEL - && REQS_IN_PROGRESS(conn) > 0) - { - /* But flush out SSL data when necessary! */ - status = serf__connection_flush(conn, TRUE); - if (APR_STATUS_IS_EAGAIN(status)) - return APR_SUCCESS; - - return status; - } - - /* We try to limit the number of in-flight requests so that we - don't have to repeat too many if the connection drops. - - This check matches that in serf__conn_update_pollset() - */ - if ((conn->probable_keepalive_limit && - conn->completed_requests > conn->probable_keepalive_limit) || - (conn->max_outstanding_requests && - REQS_IN_PROGRESS(conn) >= conn->max_outstanding_requests)) { - - serf_io__set_pollset_dirty(&conn->io); - - /* backoff for now. */ - return APR_SUCCESS; - } - - /* We may need to move forward to a request which has something - * to write. - */ - if (!request_or_data_pending(&request, conn)) { - /* No more requests (with data) are registered with the - * connection, and no data is pending on the outgoing stream. - * Let's update the pollset so that we don't try to write to this - * socket again. - */ - serf_io__set_pollset_dirty(&conn->io); - return APR_SUCCESS; - } - - status = prepare_conn_streams(conn, &ostreamt, &ostreamh); - if (status) { - return status; - } - - if (request && request->writing == SERF_WRITING_NONE) { - serf_bucket_t *event_bucket; - - if (request->req_bkt == NULL) { - read_status = serf__setup_request(request); - if (read_status) { - /* Something bad happened. Propagate any errors. */ - return read_status; - } - } - - request->writing = SERF_WRITING_STARTED; - - /* And now add an event bucket to keep track of when the request - has been completely written */ - event_bucket = serf__bucket_event_create(request->req_bkt, - request, - NULL, - request_writing_done, - request_writing_finished, - conn->allocator); - serf_bucket_aggregate_append(ostreamt, event_bucket); - } - - /* If we got some data, then deliver it. */ - /* ### what to do if we got no data?? is that a problem? */ - status = serf__connection_flush(conn, TRUE); - if (APR_STATUS_IS_EAGAIN(status)) - return APR_SUCCESS; - else if (status) - return status; - - } - /* NOTREACHED */ -} - - - -/* An async response message was received from the server. */ -static apr_status_t handle_async_response(serf_connection_t *conn, - apr_pool_t *pool) -{ - apr_status_t status; - - if (conn->current_async_response == NULL) { - conn->current_async_response = - (*conn->async_acceptor)(NULL, conn->stream, - conn->async_acceptor_baton, pool); - } - - status = (*conn->async_handler)(NULL, conn->current_async_response, - conn->async_handler_baton, pool); - - if (APR_STATUS_IS_EOF(status)) { - serf_bucket_destroy(conn->current_async_response); - conn->current_async_response = NULL; - status = APR_SUCCESS; - } - - return status; -} - -/* read data from the connection */ -static apr_status_t read_from_connection(serf_connection_t *conn) -{ - apr_status_t status; - apr_pool_t *tmppool; - apr_status_t close_connection = APR_SUCCESS; - - /* Whatever is coming in on the socket corresponds to the first request - * on our chain. - */ - serf_request_t *request = conn->written_reqs; - if (!request) { - /* Request wasn't completely written yet! */ - request = conn->unwritten_reqs; - } - - /* If the stop_writing flag was set on the connection, reset it now because - there is some data to read. */ - if (conn->stop_writing) { - conn->stop_writing = 0; - serf_io__set_pollset_dirty(&conn->io); - } - - /* assert: request != NULL */ - - if ((status = apr_pool_create(&tmppool, conn->pool)) != APR_SUCCESS) - return status; - - /* Invoke response handlers until we have no more work. */ - while (1) { - serf_bucket_t *dummy1, *dummy2; - - apr_pool_clear(tmppool); - - /* Only interested in the input stream here. */ - status = prepare_conn_streams(conn, &dummy1, &dummy2); - if (status) { - goto error; - } - - /* We have a different codepath when we can have async responses. */ - if (conn->async_responses) { - /* TODO What about socket errors? */ - status = handle_async_response(conn, tmppool); - if (APR_STATUS_IS_EAGAIN(status)) { - status = APR_SUCCESS; - goto error; - } - if (status) { - goto error; - } - continue; - } - - /* We are reading a response for a request we haven't - * written yet! - * - * This shouldn't normally happen EXCEPT: - * - * 1) when the other end has closed the socket and we're - * pending an EOF return. - * 2) Doing the initial SSL handshake - we'll get EAGAIN - * as the SSL buckets will hide the handshake from us - * but not return any data. - * 3) When the server sends us an SSL alert. - * - * In these cases, we should not receive any actual user data. - * - * 4) When the server sends a error response, like 408 Request timeout. - * This response should be passed to the application. - * - * If we see an EOF (due to either an expired timeout or the server - * sending the SSL 'close notify' shutdown alert), we'll reset the - * connection and open a new one. - */ - if (request->req_bkt || request->writing == SERF_WRITING_NONE) { - const char *data; - apr_size_t len; - - status = serf_bucket_peek(conn->stream, &data, &len); - - if (APR_STATUS_IS_EOF(status)) { - reset_connection(conn, 1); - status = APR_SUCCESS; - goto error; - } - else if (APR_STATUS_IS_EAGAIN(status) && !len) { - status = APR_SUCCESS; - goto error; - } else if (status && !APR_STATUS_IS_EAGAIN(status)) { - /* Read error */ - goto error; - } - - /* Unexpected response from the server */ - if (conn->write_now) { - conn->write_now = 0; - status = conn->perform_write(conn); - - if (!SERF_BUCKET_READ_ERROR(status)) - status = APR_SUCCESS; - } - } - - if (conn->framing_type != SERF_CONNECTION_FRAMING_TYPE_HTTP1) - break; - - /* If the request doesn't have a response bucket, then call the - * acceptor to get one created. - */ - if (request->resp_bkt == NULL) { - if (! request->acceptor) { - /* Request wasn't even setup. - Server replying before it received anything? */ - return SERF_ERROR_BAD_HTTP_RESPONSE; - } - - request->resp_bkt = (*request->acceptor)(request, conn->stream, - request->acceptor_baton, - tmppool); - apr_pool_clear(tmppool); - - /* Share the configuration with the response bucket(s) */ - serf_bucket_set_config(request->resp_bkt, conn->config); - } - - status = serf__handle_response(request, tmppool); - - /* If we received APR_SUCCESS, run this loop again. */ - if (!status) { - continue; - } - - /* If our response handler says it can't do anything more, we now - * treat that as a success. - */ - if (APR_STATUS_IS_EAGAIN(status)) { - /* It is possible that while reading the response, the ssl layer - has prepared some data to send. If this was the last request, - serf will not check for socket writability, so force this here. - */ - if (request_or_data_pending(&request, conn) && !request) { - serf_io__set_pollset_dirty(&conn->io); - } - status = APR_SUCCESS; - goto error; - } - - close_connection = is_conn_closing(request->resp_bkt); - - if (!APR_STATUS_IS_EOF(status) && - close_connection != SERF_ERROR_CLOSING) { - /* Whether success, or an error, there is no more to do unless - * this request has been completed. - */ - goto error; - } - - /* The response has been fully-read, so that means the request has - * either been fully-delivered (most likely), or that we don't need to - * write the rest of it anymore, e.g. when a 408 Request timeout was - $ received. - * Remove it from our queue and loop to read another response. - */ - if (request == conn->written_reqs) { - conn->written_reqs = request->next; - conn->nr_of_written_reqs--; - } else { - conn->unwritten_reqs = request->next; - conn->nr_of_unwritten_reqs--; - } - - serf__destroy_request(request); - - request = conn->written_reqs; - if (!request) { - /* Received responses for all written requests */ - conn->written_reqs_tail = NULL; - /* Request wasn't completely written yet! */ - request = conn->unwritten_reqs; - if (!request) - conn->unwritten_reqs_tail = NULL; - } - - conn->completed_responses++; - - /* We have received a response. If there are no more outstanding - requests on this connection, we should stop polling for READ events - for now. */ - if (!conn->written_reqs && !conn->unwritten_reqs) { - serf_io__set_pollset_dirty(&conn->io); - } - - /* This means that we're being advised that the connection is done. */ - if (close_connection == SERF_ERROR_CLOSING) { - reset_connection(conn, 1); - if (APR_STATUS_IS_EOF(status)) - status = APR_SUCCESS; - goto error; - } - - /* The server is suddenly deciding to serve more responses than we've - * seen before. - * - * Let our requests go. - */ - if (conn->probable_keepalive_limit && - conn->completed_responses >= conn->probable_keepalive_limit) { - conn->probable_keepalive_limit = 0; - } - - /* If we just ran out of requests or have unwritten requests, then - * update the pollset. We don't want to read from this socket any - * more. We are definitely done with this loop, too. - */ - if (request == NULL || request->writing == SERF_WRITING_NONE) { - serf_io__set_pollset_dirty(&conn->io); - status = APR_SUCCESS; - goto error; - } - } - -error: - /* ### This code handles some specific errors as a retry. - Eventually we should move to a handling where the application - can tell us if this is really a good idea for specific requests */ - - if (status == SERF_ERROR_SSL_NEGOTIATE_IN_PROGRESS) { - /* This connection uses HTTP pipelining and the server asked for a - renegotiation (e.g. to access the requested resource a specific - client certificate is required). - - Because of a known problem in OpenSSL this won't work most of the - time, so as a workaround, when the server asks for a renegotiation - on a connection using HTTP pipelining, we reset the connection, - disable pipelining and reconnect to the server. */ - serf__log(LOGLVL_WARNING, LOGCOMP_CONN, __FILE__, conn->config, - "The server requested renegotiation. Disable HTTP " - "pipelining and reset the connection.\n", conn); - - serf__connection_set_pipelining(conn, 0); - reset_connection(conn, 1); - status = APR_SUCCESS; - } - else if (status == SERF_ERROR_REQUEST_LOST - || APR_STATUS_IS_ECONNRESET(status) - || APR_STATUS_IS_ECONNABORTED(status)) { - - /* Some systems will not generate a HUP poll event for these errors - so we handle the ECONNRESET issue and ECONNABORT here. */ - - /* If the connection was ever good, be optimistic & try again. - If it has never tried again (incl. a retry), fail. */ - if (conn->completed_responses) { - reset_connection(conn, 1); - status = APR_SUCCESS; - } - else if (status == SERF_ERROR_REQUEST_LOST) { - status = SERF_ERROR_ABORTED_CONNECTION; - } - } - - apr_pool_destroy(tmppool); - return status; -} - -/* The connection got reset by the server. On Windows this can happen - when all data is read, so just cleanup the connection and open a new one. - - If we haven't had any successful responses on this connection, - then error out as it is likely a server issue. */ -static apr_status_t hangup_connection(serf_connection_t *conn) -{ - if (conn->completed_responses) { - return reset_connection(conn, 1); - } - - return SERF_ERROR_ABORTED_CONNECTION; -} - -/* process all events on the connection */ -static apr_status_t process_connection(serf_connection_t *conn, - apr_int16_t events) -{ - apr_status_t status; -#ifdef SERF_DEBUG_BUCKET_USE - serf_request_t *rq; -#endif - -#ifdef SERF_DEBUG_BUCKET_USE - serf_debug__entered_loop(conn->allocator); - - for (rq = conn->written_reqs; rq; rq = rq->next) { - if (rq->allocator) - serf_debug__entered_loop(rq->allocator); - } - - for (rq = conn->done_reqs; rq; rq = rq->next) { - if (rq->allocator) - serf_debug__entered_loop(rq->allocator); - } -#endif - - /* POLLHUP/ERR should come after POLLIN so if there's an error message or - * the like sitting on the connection, we give the app a chance to read - * it before we trigger a reset condition. - */ - if ((events & APR_POLLIN) != 0 - && !conn->wait_for_connect) { - - if ((status = conn->perform_read(conn)) != APR_SUCCESS) - return status; - - /* If we decided to reset our connection, return now as we don't - * want to write. - */ - if ((conn->seen_in_pollset & APR_POLLHUP) != 0) { - return APR_SUCCESS; - } - } - if ((events & APR_POLLHUP) != 0) { - /* The connection got reset by the server. */ - return conn->perform_hangup(conn); - } - if ((events & APR_POLLERR) != 0) { - /* We might be talking to a buggy HTTP server that doesn't - * do lingering-close. (httpd < 2.1.8 does this.) - * - * See: - * - * http://issues.apache.org/bugzilla/show_bug.cgi?id=35292 - */ - if (conn->completed_requests && !conn->probable_keepalive_limit) { - return reset_connection(conn, 1); - } -#ifdef SO_ERROR - /* If possible, get the error from the platform's socket layer and - convert it to an APR status code. */ - { - apr_os_sock_t osskt; - if (!apr_os_sock_get(&osskt, conn->skt)) { - int error; - apr_socklen_t l = sizeof(error); - - if (!getsockopt(osskt, SOL_SOCKET, SO_ERROR, (char*)&error, - &l)) { - status = APR_FROM_OS_ERROR(error); - - /* Handle fallback for multi-homed servers. - - ### Improve algorithm to find better than just 'next'? - - Current Windows versions already handle re-ordering for - api users by using statistics on the recently failed - connections to order the list of addresses. */ - if (conn->completed_requests == 0 - && conn->address->next != NULL - && (APR_STATUS_IS_ECONNREFUSED(status) - || APR_STATUS_IS_TIMEUP(status) - || APR_STATUS_IS_ENETUNREACH(status))) { - - conn->address = conn->address->next; - return reset_connection(conn, 1); - } - - return status; - } - } - } -#endif - return APR_EGENERAL; - } - if ((events & APR_POLLOUT) != 0) { - if (conn->wait_for_connect) { - conn->wait_for_connect = FALSE; - - /* We are now connected. Socket is now usable */ - serf_io__set_pollset_dirty(&conn->io); - - if ((status = connect_connection(conn)) != APR_SUCCESS) - return status; - } - - if ((status = conn->perform_write(conn)) != APR_SUCCESS) - return status; - } - return APR_SUCCESS; -} - -apr_status_t serf__process_connection(serf_connection_t *conn, - apr_int16_t events) -{ - serf_context_t *ctx = conn->ctx; - apr_pollfd_t tdesc = { 0 }; - - /* If this connection has already failed, return the error again, and try - * to remove it from the pollset again - */ - if (conn->status) { - tdesc.desc_type = APR_POLL_SOCKET; - tdesc.desc.s = conn->skt; - tdesc.reqevents = conn->io.reqevents; - ctx->pollset_rm(ctx->pollset_baton, - &tdesc, &conn->io); - return conn->status; - } - /* apr_pollset_poll() can return a conn multiple times... */ - if ((conn->seen_in_pollset & events) != 0 || - (conn->seen_in_pollset & APR_POLLHUP) != 0) { - return APR_SUCCESS; - } - - conn->seen_in_pollset |= events; - - if ((conn->status = process_connection(conn, events)) != APR_SUCCESS) - { - /* it's possible that the connection was already reset and thus the - socket cleaned up. */ - if (conn->skt) { - tdesc.desc_type = APR_POLL_SOCKET; - tdesc.desc.s = conn->skt; - tdesc.reqevents = conn->io.reqevents; - ctx->pollset_rm(ctx->pollset_baton, - &tdesc, &conn->io); - } - return conn->status; - } - return APR_SUCCESS; -} - -serf_connection_t *serf_connection_create( - serf_context_t *ctx, - apr_sockaddr_t *address, - serf_connection_setup_t setup, - void *setup_baton, - serf_connection_closed_t closed, - void *closed_baton, - apr_pool_t *pool) -{ - serf_connection_t *conn = apr_pcalloc(pool, sizeof(*conn)); - - conn->ctx = ctx; - conn->status = APR_SUCCESS; - /* Ignore server address if proxy was specified. */ - conn->address = ctx->proxy_address ? ctx->proxy_address : address; - conn->setup = setup; - conn->setup_baton = setup_baton; - conn->closed = closed; - conn->closed_baton = closed_baton; - conn->pool = pool; - conn->allocator = serf_bucket_allocator_create(pool, NULL, NULL); - conn->stream = NULL; - conn->ostream_head = NULL; - conn->ostream_tail = NULL; - conn->io.type = SERF_IO_CONN; - conn->io.u.conn = conn; - conn->io.ctx = ctx; - conn->io.dirty_conn = false; - conn->io.reqevents = 0; - conn->hit_eof = 0; - conn->state = SERF_CONN_INIT; - conn->latency = -1; /* unknown */ - conn->stop_writing = 0; - conn->write_now = 0; - conn->wait_for_connect = 0; - conn->pipelining = 1; - conn->framing_type = SERF_CONNECTION_FRAMING_TYPE_HTTP1; - conn->perform_read = read_from_connection; - conn->perform_write = write_to_connection; - conn->perform_hangup = hangup_connection; - conn->perform_teardown = NULL; - conn->protocol_baton = NULL; - - conn->written_reqs = conn->written_reqs_tail = NULL; - conn->nr_of_written_reqs = 0; - - conn->unwritten_reqs = conn->unwritten_reqs_tail = NULL; - conn->nr_of_unwritten_reqs; - - conn->done_reqs = conn->done_reqs_tail = 0; - - /* Create a subpool for our connection. */ - apr_pool_create(&conn->skt_pool, conn->pool); - - /* register a cleanup */ - apr_pool_cleanup_register(conn->pool, conn, clean_conn, - apr_pool_cleanup_null); - - /* Add the connection to the context. */ - *(serf_connection_t **)apr_array_push(ctx->conns) = conn; - - return conn; -} - -apr_status_t serf_connection_create2( - serf_connection_t **conn, - serf_context_t *ctx, - apr_uri_t host_info, - serf_connection_setup_t setup, - void *setup_baton, - serf_connection_closed_t closed, - void *closed_baton, - apr_pool_t *pool) -{ - apr_status_t status = APR_SUCCESS; - serf_config_t *config; - serf_connection_t *c; - apr_sockaddr_t *host_address = NULL; - - /* Set the port number explicitly, needed to create the socket later. */ - if (!host_info.port) { - host_info.port = apr_uri_port_of_scheme(host_info.scheme); - } - - /* Only lookup the address of the server if no proxy server was - configured. */ - if (!ctx->proxy_address) { - status = apr_sockaddr_info_get(&host_address, - host_info.hostname, - APR_UNSPEC, host_info.port, 0, pool); - if (status) - return status; - } - - c = serf_connection_create(ctx, host_address, setup, setup_baton, - closed, closed_baton, pool); - - /* We're not interested in the path following the hostname. */ - c->host_url = apr_uri_unparse(c->pool, - &host_info, - APR_URI_UNP_OMITPATHINFO | - APR_URI_UNP_OMITUSERINFO); - - /* Store the host info without the path on the connection. */ - (void)apr_uri_parse(c->pool, c->host_url, &(c->host_info)); - if (!c->host_info.port) { - c->host_info.port = apr_uri_port_of_scheme(c->host_info.scheme); - } - - /* Store the connection specific info in the configuration store */ - status = serf__config_store_get_config(ctx, c, &config, pool); - if (status) - return status; - c->config = config; - serf_config_set_stringc(config, SERF_CONFIG_HOST_NAME, - c->host_info.hostname); - serf_config_set_stringc(config, SERF_CONFIG_HOST_PORT, - apr_itoa(ctx->pool, c->host_info.port)); - - *conn = c; - - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, c->config, - "created connection 0x%x\n", c); - - return status; -} - -apr_status_t serf_connection_reset( - serf_connection_t *conn) -{ - return reset_connection(conn, 0); -} - - -apr_status_t serf_connection_close( - serf_connection_t *conn) -{ - int i; - serf_context_t *ctx = conn->ctx; - apr_status_t status; - - for (i = ctx->conns->nelts; i--; ) { - serf_connection_t *conn_seq = GET_CONN(ctx, i); - - if (conn_seq == conn) { - - /* Clean up the write bucket first, as this marks all partially written - requests as fully written, allowing more efficient cleanup */ - serf__connection_pre_cleanup(conn); - - /* The application asked to close the connection, no need to notify - it for each cancelled request. */ - while (conn->written_reqs) { - serf__cancel_request(conn->written_reqs, &conn->written_reqs, 0); - } - while (conn->unwritten_reqs) { - serf__cancel_request(conn->unwritten_reqs, &conn->unwritten_reqs, 0); - } - if (conn->skt != NULL) { - remove_connection(ctx, conn); - status = clean_skt(conn); - if (conn->closed != NULL) { - handle_conn_closed(conn, status); - } - } - if (conn->stream != NULL) { - serf_bucket_destroy(conn->stream); - conn->stream = NULL; - } - - if (conn->protocol_baton) { - conn->perform_teardown(conn); - conn->protocol_baton = NULL; - } - - /* Remove the connection from the context. We don't want to - * deal with it any more. - */ - if (i < ctx->conns->nelts - 1) { - /* move later connections over this one. */ - memmove( - &GET_CONN(ctx, i), - &GET_CONN(ctx, i + 1), - (ctx->conns->nelts - i - 1) * sizeof(serf_connection_t *)); - } - --ctx->conns->nelts; - - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, - "closed connection 0x%x\n", conn); - - /* Found the connection. Closed it. All done. */ - return APR_SUCCESS; - } - } - - /* We didn't find the specified connection. */ - /* ### doc talks about this w.r.t poll structures. use something else? */ - return APR_NOTFOUND; -} - - -void serf_connection_set_max_outstanding_requests( - serf_connection_t *conn, - unsigned int max_requests) -{ - if (max_requests == 0) - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, - "Set max. nr. of outstanding requests for this " - "connection to unlimited.\n"); - else - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, - "Limit max. nr. of outstanding requests for this " - "connection to %u.\n", max_requests); - - conn->max_outstanding_requests = max_requests; -} - -/* Disable HTTP pipelining, ensure that only one request is outstanding at a - time. This is an internal method, an application that wants to disable - HTTP pipelining can achieve this by calling: - serf_connection_set_max_outstanding_requests(conn, 1) . - */ -void serf__connection_set_pipelining(serf_connection_t *conn, int enabled) -{ - conn->pipelining = enabled; -} - -void serf_connection_set_async_responses( - serf_connection_t *conn, - serf_response_acceptor_t acceptor, - void *acceptor_baton, - serf_response_handler_t handler, - void *handler_baton) -{ - conn->async_responses = 1; - conn->async_acceptor = acceptor; - conn->async_acceptor_baton = acceptor_baton; - conn->async_handler = handler; - conn->async_handler_baton = handler_baton; -} - -void serf_connection_set_framing_type( - serf_connection_t *conn, - serf_connection_framing_type_t framing_type) -{ - conn->framing_type = framing_type; - - if (conn->skt) { - serf_io__set_pollset_dirty(&conn->io); - conn->stop_writing = 0; - conn->write_now = 1; - - /* Close down existing protocol */ - if (conn->protocol_baton) { - conn->perform_teardown(conn); - conn->protocol_baton = NULL; - } - } - - /* Reset to default */ - conn->perform_read = read_from_connection; - conn->perform_write = write_to_connection; - conn->perform_hangup = hangup_connection; - conn->perform_teardown = NULL; - - switch (framing_type) { - case SERF_CONNECTION_FRAMING_TYPE_HTTP2: - serf__http2_protocol_init(conn); - break; - default: - break; - } -} - -apr_interval_time_t serf_connection_get_latency(serf_connection_t *conn) -{ - if (conn->ctx->proxy_address) { - /* Detecting network latency for proxied connection is not implemented - yet. */ - return -1; - } - - return conn->latency; -} - -unsigned int serf_connection_queued_requests(serf_connection_t *conn) -{ - return conn->nr_of_unwritten_reqs; -} - -unsigned int serf_connection_pending_requests(serf_connection_t *conn) -{ - return conn->nr_of_unwritten_reqs + conn->nr_of_written_reqs; -} Modified: serf/trunk/serf_private.h URL: http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1715005&r1=1715004&r2=1715005&view=diff ============================================================================== --- serf/trunk/serf_private.h (original) +++ serf/trunk/serf_private.h Wed Nov 18 14:45:58 2015 @@ -147,6 +147,33 @@ typedef struct serf_io_baton_t { } serf_io_baton_t; +typedef struct serf_pump_io_t +{ + serf_io_baton_t *io; + + serf_bucket_alloc_t *allocator; + serf_config_t *config; + + serf_bucket_t *stream; + serf_bucket_t *ostream_head; + serf_bucket_t *ostream_tail; + + apr_socket_t *skt; + + /* Outgoing vecs, waiting to be written. + Read from ostream_head */ + struct iovec vec[IOV_MAX]; + int vec_len; + + /* True when connection failed while writing */ + bool done_writing; + bool stop_writing; /* Wait for read (E.g. SSL) */ + + /* Set to true when ostream_tail was read to EOF */ + bool hit_eof; +} serf_pump_t; + + /* Should we use static APR_INLINE instead? */ #define serf_io__set_pollset_dirty(io_baton) \ do \ @@ -381,6 +408,7 @@ struct serf_incoming_t { serf_context_t *ctx; serf_io_baton_t io; + serf_pump_t pump; serf_incoming_request_setup_t req_setup; void *req_setup_baton; @@ -403,8 +431,6 @@ struct serf_incoming_t { serf_connection_framing_type_t framing_type; bool wait_for_connect; - bool hit_eof; - bool stop_writing; /* Event callbacks, called from serf__process_client() to do the actual processing. */ @@ -416,13 +442,6 @@ struct serf_incoming_t { void(*perform_teardown)(serf_incoming_t *conn); void *protocol_baton; - /* A bucket wrapped around our socket (for reading responses). */ - serf_bucket_t *stream; - /* A reference to the aggregate bucket that provides the boundary between - * request level buckets and connection level buckets. - */ - serf_bucket_t *ostream_head; - serf_bucket_t *ostream_tail; serf_config_t *config; @@ -742,6 +761,25 @@ void serf__link_requests(serf_request_t apr_status_t serf__handle_response(serf_request_t *request, apr_pool_t *pool); +/* From pump.c */ +void serf_pump__init(serf_pump_t *pump, + serf_io_baton_t *io, + apr_socket_t *skt, + serf_config_t *config, + serf_bucket_alloc_t *allocator, + apr_pool_t *pool); + +bool serf_pump__data_pending(serf_pump_t *pump); +void serf_pump__store_ipaddresses_in_config(serf_pump_t *pump); + +apr_status_t serf_pump__write(serf_pump_t *pump, + bool fetch_new); + +/* These must always be called as a pair to avoid a memory leak */ +void serf_pump__prepare_setup(serf_pump_t *pump); +void serf_pump__complete_setup(serf_pump_t *pump, serf_bucket_t *ostream); + + /** Logging functions. **/ /* Initialize the logging subsystem. This will store a log baton in the