Author: rhuijben Date: Wed Nov 18 17:43:23 2015 New Revision: 1715026 URL: http://svn.apache.org/viewvc?rev=1715026&view=rev Log: Use the pump to write data for outgoing connections, by wrapping the pump writer with a helper that switches streams in a few nasty places.
* outgoing.c (no_more_writes): Remove function. (serf__connection_flush) Reimplement as wrapper around serf_pump__write. * pump.c (serf_pump__write): Complete variable rename and fix using the wrong 'pump' variable (line 296). * serf_private.h (serf__connection_flush): Use bool argument. Modified: serf/trunk/outgoing.c serf/trunk/pump.c serf/trunk/serf_private.h Modified: serf/trunk/outgoing.c URL: http://svn.apache.org/viewvc/serf/trunk/outgoing.c?rev=1715026&r1=1715025&r2=1715026&view=diff ============================================================================== --- serf/trunk/outgoing.c (original) +++ serf/trunk/outgoing.c Wed Nov 18 17:43:23 2015 @@ -485,23 +485,6 @@ apr_status_t serf__open_connections(serf return APR_SUCCESS; } -static apr_status_t no_more_writes(serf_connection_t *conn) -{ - /* Note that we should hold new requests until we open our new socket. */ - conn->state = SERF_CONN_CLOSING; - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, - "stop writing on conn 0x%x\n", conn); - - /* Clear our iovec. */ - conn->pump.vec_len = 0; - - /* Update the pollset to know we don't want to write on this socket any - * more. - */ - serf_io__set_pollset_dirty(&conn->io); - return APR_SUCCESS; -} - /* Read the 'Connection' header from the response. Return SERF_ERROR_CLOSING if * the header contains value 'close' indicating the server is closing the * connection right after this response. @@ -650,137 +633,33 @@ static apr_status_t reset_connection(ser return APR_SUCCESS; } -static apr_status_t socket_writev(serf_pump_t *conn) -{ - apr_size_t written; - apr_status_t status; - - status = apr_socket_sendv(conn->skt, conn->vec, - conn->vec_len, &written); - if (status && !APR_STATUS_IS_EAGAIN(status)) - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, - "socket_sendv error %d\n", status); - - /* did we write everything? */ - if (written) { - apr_size_t len = 0; - int i; - - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, - "--- socket_sendv: %d bytes. --\n", written); - - for (i = 0; i < conn->vec_len; i++) { - len += conn->vec[i].iov_len; - if (written < len) { - serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config, - "%.*s", conn->vec[i].iov_len - (len - written), - conn->vec[i].iov_base); - if (i) { - memmove(conn->vec, &conn->vec[i], - sizeof(struct iovec) * (conn->vec_len - i)); - conn->vec_len -= i; - } - conn->vec[0].iov_base = (char *)conn->vec[0].iov_base + (conn->vec[0].iov_len - (len - written)); - conn->vec[0].iov_len = len - written; - break; - } else { - serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config, - "%.*s", - conn->vec[i].iov_len, conn->vec[i].iov_base); - } - } - if (len == written) { - conn->vec_len = 0; - } - serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config, "\n"); - - /* Log progress information */ - serf__context_progress_delta(conn->io->ctx, 0, written); - } - - return status; -} apr_status_t serf__connection_flush(serf_connection_t *conn, - int pump) + bool fetch_new) { - apr_status_t status = APR_SUCCESS; - apr_status_t read_status = APR_SUCCESS; - serf_bucket_t *ostreamh = NULL; - - conn->pump.hit_eof = FALSE; - - while (status == APR_SUCCESS) { - - /* First try to write out what is already stored in the - connection vecs. */ - while (conn->pump.vec_len && !status) { - status = socket_writev(&conn->pump); - - /* If the write would have blocked, then we're done. - * Don't try to write anything else to the socket. - */ - if (APR_STATUS_IS_EPIPE(status) - || APR_STATUS_IS_ECONNRESET(status) - || APR_STATUS_IS_ECONNABORTED(status)) - return no_more_writes(conn); - } + apr_status_t status; + serf_bucket_t *tmp_bkt = NULL; - if (status || !pump) - return status; - else if (read_status || conn->pump.vec_len || conn->pump.hit_eof) - return read_status; + if (fetch_new) { + serf_bucket_t *ostreamh, *ostreamt; - /* Ok, with the vecs written, we can now refill the per connection - output vecs */ - if (!ostreamh) { - serf_bucket_t *ostreamt; - - status = prepare_conn_streams(conn, &ostreamt, &ostreamh); - if (status) - return status; - } + status = prepare_conn_streams(conn, &ostreamt, &ostreamh); + if (status) + return status; - /* ### optimize at some point by using read_for_sendfile */ - /* TODO: now that read_iovec will effectively try to return as much - data as available, we probably don't want to read ALL_AVAIL, but - a lower number, like the size of one or a few TCP packets, the - available TCP buffer size ... */ - conn->pump.hit_eof = 0; - read_status = serf_bucket_read_iovec(ostreamh, - SERF_READ_ALL_AVAIL, - IOV_MAX, - conn->pump.vec, - &conn->pump.vec_len); - - if (read_status == SERF_ERROR_WAIT_CONN) { - /* The bucket told us that it can't provide more data until - more data is read from the socket. This normally happens - during a SSL handshake. - - We should avoid looking for writability for a while so - that (hopefully) something will appear in the bucket so - we can actually write something. otherwise, we could - end up in a CPU spin: socket wants something, but we - don't have anything (and keep returning EAGAIN) */ - conn->pump.stop_writing = 1; - serf_io__set_pollset_dirty(&conn->io); + tmp_bkt = conn->pump.ostream_head; + conn->pump.ostream_head = ostreamh; + } - read_status = APR_EAGAIN; - } - else if (APR_STATUS_IS_EAGAIN(read_status)) { + status = serf_pump__write(&conn->pump, fetch_new); - /* We read some stuff, but did we read everything ? */ - if (conn->pump.hit_eof) - read_status = APR_SUCCESS; - } - else if (SERF_BUCKET_READ_ERROR(read_status)) { - - /* Something bad happened. Propagate any errors. */ - return read_status; - } + if (fetch_new) { + conn->pump.ostream_head = tmp_bkt; } + if (conn->pump.done_writing) + conn->state = SERF_CONN_CLOSING; + return status; } Modified: serf/trunk/pump.c URL: http://svn.apache.org/viewvc/serf/trunk/pump.c?rev=1715026&r1=1715025&r2=1715026&view=diff ============================================================================== --- serf/trunk/pump.c (original) +++ serf/trunk/pump.c Wed Nov 18 17:43:23 2015 @@ -274,16 +274,15 @@ apr_status_t serf_pump__write(serf_pump_ { apr_status_t status = APR_SUCCESS; apr_status_t read_status = APR_SUCCESS; - serf_pump_t *const conn = pump; - conn->hit_eof = FALSE; + pump->hit_eof = FALSE; while (status == APR_SUCCESS) { /* First try to write out what is already stored in the connection vecs. */ - while (conn->vec_len && !status) { - status = socket_writev(conn); + while (pump->vec_len && !status) { + status = socket_writev(pump); /* If the write would have blocked, then we're done. * Don't try to write anything else to the socket. @@ -291,12 +290,12 @@ apr_status_t serf_pump__write(serf_pump_ if (APR_STATUS_IS_EPIPE(status) || APR_STATUS_IS_ECONNRESET(status) || APR_STATUS_IS_ECONNABORTED(status)) - return no_more_writes(conn); + return no_more_writes(pump); } - if (status || !pump) + if (status || !fetch_new) return status; - else if (read_status || conn->vec_len || conn->hit_eof) + else if (read_status || pump->vec_len || pump->hit_eof) return read_status; /* ### optimize at some point by using read_for_sendfile */ @@ -304,12 +303,12 @@ apr_status_t serf_pump__write(serf_pump_ data as available, we probably don't want to read ALL_AVAIL, but a lower number, like the size of one or a few TCP packets, the available TCP buffer size ... */ - conn->hit_eof = 0; + pump->hit_eof = false; read_status = serf_bucket_read_iovec(pump->ostream_head, SERF_READ_ALL_AVAIL, IOV_MAX, - conn->vec, - &conn->vec_len); + pump->vec, + &pump->vec_len); if (read_status == SERF_ERROR_WAIT_CONN) { /* The bucket told us that it can't provide more data until @@ -321,15 +320,15 @@ apr_status_t serf_pump__write(serf_pump_ we can actually write something. otherwise, we could end up in a CPU spin: socket wants something, but we don't have anything (and keep returning EAGAIN) */ - conn->stop_writing = true; - serf_io__set_pollset_dirty(conn->io); + pump->stop_writing = true; + serf_io__set_pollset_dirty(pump->io); read_status = APR_EAGAIN; } else if (APR_STATUS_IS_EAGAIN(read_status)) { /* We read some stuff, but did we read everything ? */ - if (conn->hit_eof) + if (pump->hit_eof) read_status = APR_SUCCESS; } else if (SERF_BUCKET_READ_ERROR(read_status)) { @@ -341,4 +340,3 @@ apr_status_t serf_pump__write(serf_pump_ return status; } - Modified: serf/trunk/serf_private.h URL: http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1715026&r1=1715025&r2=1715026&view=diff ============================================================================== --- serf/trunk/serf_private.h (original) +++ serf/trunk/serf_private.h Wed Nov 18 17:43:23 2015 @@ -682,7 +682,7 @@ serf_request_t *serf__ssltunnel_request_ void *setup_baton); void serf__connection_set_pipelining(serf_connection_t *conn, int enabled); apr_status_t serf__connection_flush(serf_connection_t *conn, - int pump); + bool fetch_new); apr_status_t serf__provide_credentials(serf_context_t *ctx, char **username,