Author: rhuijben Date: Mon Nov 23 17:03:04 2015 New Revision: 1715887 URL: http://svn.apache.org/viewvc?rev=1715887&view=rev Log: * incoming.c On the investigate branch: switch back to the old writing code, from before introducing the pump.
Modified: serf/branches/pump-investigate/incoming.c Modified: serf/branches/pump-investigate/incoming.c URL: http://svn.apache.org/viewvc/serf/branches/pump-investigate/incoming.c?rev=1715887&r1=1715886&r2=1715887&view=diff ============================================================================== --- serf/branches/pump-investigate/incoming.c (original) +++ serf/branches/pump-investigate/incoming.c Mon Nov 23 17:03:04 2015 @@ -350,10 +350,144 @@ static apr_status_t read_from_client(ser return status; } +static apr_status_t socket_writev(serf_incoming_t *client) +{ + apr_size_t written; + apr_status_t status; + + status = apr_socket_sendv(client->skt, client->vec, + client->vec_len, &written); + if (status && !APR_STATUS_IS_EAGAIN(status)) + serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config, + "socket_sendv error %d\n", status); + + /* did we write everything? */ + if (written) { + apr_size_t len = 0; + int i; + + serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config, + "--- socket_sendv: %d bytes. --\n", written); + + for (i = 0; i < client->vec_len; i++) { + len += client->vec[i].iov_len; + if (written < len) { + serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, client->config, + "%.*s", client->vec[i].iov_len - (len - written), + client->vec[i].iov_base); + if (i) { + memmove(client->vec, &client->vec[i], + sizeof(struct iovec) * (client->vec_len - i)); + client->vec_len -= i; + } + client->vec[0].iov_base = (char *)client->vec[0].iov_base + (client->vec[0].iov_len - (len - written)); + client->vec[0].iov_len = len - written; + break; + } else { + serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, client->config, + "%.*s", + client->vec[i].iov_len, client->vec[i].iov_base); + } + } + if (len == written) { + client->vec_len = 0; + } + serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, client->config, "\n"); + + /* Log progress information */ + serf__context_progress_delta(client->ctx, 0, written); + } + + return status; +} + +static apr_status_t no_more_writes(serf_incoming_t *client) +{ + /* Note that we should hold new requests until we open our new socket. */ + serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config, + "stop writing on client 0x%x\n", client); + + /* Clear our iovec. */ + client->vec_len = 0; + + /* Update the pollset to know we don't want to write on this socket any + * more. + */ + serf_io__set_pollset_dirty(&client->io); + return APR_SUCCESS; +} + apr_status_t serf__incoming_client_flush(serf_incoming_t *client, bool pump) { - return serf_pump__write(&client->pump, pump); + apr_status_t status = APR_SUCCESS; + apr_status_t read_status = APR_SUCCESS; + serf_bucket_t *ostreamh = client->pump.ostream_head; + + client->pump.hit_eof = FALSE; + + while (status == APR_SUCCESS) { + + /* First try to write out what is already stored in the + connection vecs. */ + while (client->vec_len && !status) { + status = socket_writev(client); + + /* If the write would have blocked, then we're done. + * Don't try to write anything else to the socket. + */ + if (APR_STATUS_IS_EPIPE(status) + || APR_STATUS_IS_ECONNRESET(status) + || APR_STATUS_IS_ECONNABORTED(status)) + return no_more_writes(client); + } + + if (status || !pump) + return status; + else if (read_status || client->vec_len || client->pump.hit_eof) + return read_status; + + /* ### optimize at some point by using read_for_sendfile */ + /* TODO: now that read_iovec will effectively try to return as much + data as available, we probably don't want to read ALL_AVAIL, but + a lower number, like the size of one or a few TCP packets, the + available TCP buffer size ... */ + client->pump.hit_eof = 0; + read_status = serf_bucket_read_iovec(ostreamh, + SERF_READ_ALL_AVAIL, + IOV_MAX, + client->vec, + &client->vec_len); + + if (read_status == SERF_ERROR_WAIT_CONN) { + /* The bucket told us that it can't provide more data until + more data is read from the socket. This normally happens + during a SSL handshake. + + We should avoid looking for writability for a while so + that (hopefully) something will appear in the bucket so + we can actually write something. otherwise, we could + end up in a CPU spin: socket wants something, but we + don't have anything (and keep returning EAGAIN) */ + client->pump.stop_writing = true; + serf_io__set_pollset_dirty(&client->io); + + read_status = APR_EAGAIN; + } + else if (APR_STATUS_IS_EAGAIN(read_status)) { + + /* We read some stuff, but did we read everything ? */ + if (client->pump.hit_eof) + read_status = APR_SUCCESS; + } + else if (SERF_BUCKET_READ_ERROR(read_status)) { + + /* Something bad happened. Propagate any errors. */ + return read_status; + } + } + + return status; } static apr_status_t write_to_client(serf_incoming_t *client) @@ -387,7 +521,7 @@ void serf_incoming_set_framing_type( if (client->skt) { serf_io__set_pollset_dirty(&client->io); - client->pump.stop_writing = false; + client->pump.stop_writing = 0; /* Close down existing protocol */ if (client->protocol_baton && client->perform_teardown) { @@ -714,7 +848,6 @@ apr_status_t serf__incoming_update_polls } client->ctx->incomings->nelts--; apr_pool_destroy(client->pool); -#endif if (cid >= ctx->incomings->nelts) { /* We skipped updating the pollset on this item as we moved it. @@ -722,6 +855,7 @@ apr_status_t serf__incoming_update_polls return serf__incoming_update_pollset(GET_INCOMING(ctx, cid)); } +#endif return APR_SUCCESS; } @@ -752,8 +886,38 @@ apr_status_t serf__incoming_update_polls But it also has the nice side effect of removing references from the aggregate to requests that are done. */ + if (client->vec_len) { + /* We still have vecs in the connection, which lifetime is + managed by buckets inside client->ostream_head. - data_waiting = serf_pump__data_pending(&client->pump); + Don't touch ostream as that might destroy the vecs */ + + data_waiting = true; + } + else { + serf_bucket_t *ostream; + + ostream = client->pump.ostream_head; + + if (ostream) { + const char *dummy_data; + apr_size_t len; + + status = serf_bucket_peek(ostream, &dummy_data, &len); + + if (SERF_BUCKET_READ_ERROR(status) || len > 0) { + /* DATA or error waiting */ + data_waiting = TRUE; /* Error waiting */ + } + else if (! status || APR_STATUS_IS_EOF(status)) { + data_waiting = FALSE; + } + else + data_waiting = FALSE; /* EAGAIN / EOF / WAIT_CONN */ + } + else + data_waiting = FALSE; + } if (data_waiting) { desc.reqevents |= APR_POLLOUT;