If anybody can spot a possible cause of a bus error on the MAC bots in this, please let me know.
I'm unable to reproduce anything like this problem on Windows, FreeBSD, OpenSUSE or Debian. But all 4 mac bots produce a bus error since this release. (I wish I knew about this earlier, but the bot was down until last night) Bert > -----Original Message----- > From: rhuij...@apache.org [mailto:rhuij...@apache.org] > Sent: woensdag 18 november 2015 15:46 > To: dev@serf.apache.org > Subject: svn commit: r1715005 - in /serf/trunk: incoming.c > protocols/fcgi_protocol.c protocols/http2_protocol.c pump.c serf_private.h > > Author: rhuijben > Date: Wed Nov 18 14:45:58 2015 > New Revision: 1715005 > > URL: http://svn.apache.org/viewvc?rev=1715005&view=rev > Log: > Introduce a 'pump' layer that contains the stream pumping logic that was > originally only part of outgoing.c, but is now partially duplicated in > incoming.c. > > The implementation is currently directly copied (with svn history) from > the outgoing connection, but this patch only uses it for the incoming > connections yet. > > * io.c > New file copied from outgoing.c. Removing parts that are not necessary > and making things serf private where needed. This was committed as pump.c. Propedited later. > > * incoming.c > (client_detect_eof): Remove function. > (client_connected): Use several pump functions to avoid duplicated code. > (http1_enqueue_reponse, > perform_peek_protocol, > read_from_client): Update usage. > (socket_writev, > no_more_writes): Remove functions. > (serf__incoming_client_flush): Replace implementation with call to > serf_pump__write(). > (serf_incoming_set_framing_type): Update usage. > (serf_incoming_create2): Init pump. > (serf__incoming_update_pollset): Use data pending helper. > > * protocols/fcgi_protocol.c > (fcgi_server_read, > fcgi_server_write, > fcgi_server_teardown): Update usage. > > * protocols/http2_protocol.c > (serf__http2_protocol_init_server, > http2_incoming_read): Update usage. > > * pump.c > (pump_cleanup): New function. > (serf_pump__init): New function. > > (data_pending): Turn into... > (serf_pump__data_pending): ... this. > > (detect_eof): Use extended baton. Return final EOF. > > (do_conn_setup): Split into... > (serf_pump__prepare_setup): ... this and > (serf_pump__complete_setup): ... this. > > (serf__connection_flush): Tweak to... > (serf_pump__write): ... this. > > * serf_private.h > (serf_pump_io_t): New struct. > > Added: > serf/trunk/pump.c > - copied, changed from r1714992, serf/trunk/outgoing.c > Modified: > serf/trunk/incoming.c > serf/trunk/protocols/fcgi_protocol.c > serf/trunk/protocols/http2_protocol.c > serf/trunk/serf_private.h > > Modified: serf/trunk/incoming.c > URL: > http://svn.apache.org/viewvc/serf/trunk/incoming.c?rev=1715005&r1=1715 > 004&r2=1715005&view=diff > ========================================================== > ==================== > --- serf/trunk/incoming.c (original) > +++ serf/trunk/incoming.c Wed Nov 18 14:45:58 2015 > @@ -28,76 +28,35 @@ > > #include "serf_private.h" > > -static apr_status_t client_detect_eof(void *baton, > - serf_bucket_t *aggregator) > -{ > - serf_incoming_t *client = baton; > - client->hit_eof = true; > - return APR_EAGAIN; > -} > - > static apr_status_t client_connected(serf_incoming_t *client) > { > /* serf_context_t *ctx = client->ctx; */ > apr_status_t status; > serf_bucket_t *ostream; > - apr_sockaddr_t *sa; > > - if (apr_socket_addr_get(&sa, APR_LOCAL, client->skt) == APR_SUCCESS) { > - char buf[48]; > - if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa)) > - serf_config_set_stringf(client->config, > SERF_CONFIG_CONN_LOCALIP, > - "%s:%d", buf, sa->port); > - } > - if (apr_socket_addr_get(&sa, APR_REMOTE, client->skt) == > APR_SUCCESS) { > - char buf[48]; > - if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa)) > - serf_config_set_stringf(client->config, > SERF_CONFIG_CONN_REMOTEIP, > - "%s:%d", buf, sa->port); > - } > + serf_pump__store_ipaddresses_in_config(&client->pump); > > serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config, > "socket for client 0x%x connected\n", client); > > /* ### Connection does auth setup here */ > > - if (client->ostream_head == NULL) { > - client->ostream_head = serf_bucket_aggregate_create(client- > >allocator); > - } > - > - if (client->ostream_tail == NULL) { > - client->ostream_tail = > serf_bucket_aggregate_create(client->allocator); > - > - serf_bucket_aggregate_hold_open(client->ostream_tail, > - client_detect_eof, client); > - } > + serf_pump__prepare_setup(&client->pump); > > - ostream = client->ostream_tail; > + ostream = client->pump.ostream_tail; > > status = client->setup(client->skt, > - &client->stream, > + &client->pump.stream, > &ostream, > client->setup_baton, client->pool); > > if (status) { > - /* extra destroy here since it wasn't added to the head bucket yet. > */ > - serf_bucket_destroy(client->ostream_tail); > + serf_pump__complete_setup(&client->pump, NULL); > /* ### Cleanup! (serf__connection_pre_cleanup) */ > return status; > } > > - /* Share the configuration with all the buckets in the newly created > output > - chain (see PLAIN or ENCRYPTED scenario's), including the request buckets > - created by the application (ostream_tail will handle this for us). */ > - serf_bucket_set_config(client->ostream_head, client->config); > - > - /* Share the configuration with the ssl_decrypt and socket buckets. The > - response buckets wrapping the ssl_decrypt/socket buckets won't get the > - config automatically because they are upstream. */ > - serf_bucket_set_config(client->stream, client->config); > - > - serf_bucket_aggregate_append(client->ostream_head, > - ostream); > + serf_pump__complete_setup(&client->pump, ostream); > > if (client->framing_type == SERF_CONNECTION_FRAMING_TYPE_NONE) { > client->proto_peek_bkt = serf_bucket_aggregate_create( > @@ -105,7 +64,7 @@ static apr_status_t client_connected(ser > > serf_bucket_aggregate_append( > client->proto_peek_bkt, > - serf_bucket_barrier_create(client->stream, > + serf_bucket_barrier_create(client->pump.stream, > client->allocator)); > } > > @@ -140,7 +99,7 @@ static apr_status_t http1_enqueue_repons > void *enqueue_baton, > serf_bucket_t *bucket) > { > - serf_bucket_aggregate_append(request->incoming->ostream_tail, > + serf_bucket_aggregate_append(request->incoming- > >pump.ostream_tail, > serf__bucket_event_create(bucket, > request, > NULL, > @@ -194,7 +153,7 @@ apr_status_t perform_peek_protocol(serf_ > > if (!peek_data) { > > - status = serf_bucket_peek(client->stream, &data, &len); > + status = serf_bucket_peek(client->pump.stream, &data, &len); > > if (len > h2prefixlen) > len = h2prefixlen; > @@ -227,7 +186,7 @@ apr_status_t perform_peek_protocol(serf_ > } > > do { > - status = serf_bucket_read(client->stream, > + status = serf_bucket_read(client->pump.stream, > h2prefixlen - peek_data->read, > &data, &len); > > @@ -314,7 +273,7 @@ static apr_status_t read_from_client(ser > client->proto_peek_bkt = NULL; > } > else > - read_bkt = serf_bucket_barrier_create(client->stream, > + read_bkt = serf_bucket_barrier_create(client->pump.stream, > client->allocator); > > status = client->req_setup(&rq->req_bkt, read_bkt, rq, > @@ -355,7 +314,7 @@ static apr_status_t read_from_client(ser > const char *data; > apr_size_t len; > > - status = serf_bucket_peek(client->stream, &data, &len); > + status = serf_bucket_peek(client->pump.stream, &data, &len); > } > } > } > @@ -390,144 +349,10 @@ static apr_status_t read_from_client(ser > return status; > } > > -static apr_status_t socket_writev(serf_incoming_t *client) > -{ > - apr_size_t written; > - apr_status_t status; > - > - status = apr_socket_sendv(client->skt, client->vec, > - client->vec_len, &written); > - if (status && !APR_STATUS_IS_EAGAIN(status)) > - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config, > - "socket_sendv error %d\n", status); > - > - /* did we write everything? */ > - if (written) { > - apr_size_t len = 0; > - int i; > - > - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config, > - "--- socket_sendv: %d bytes. --\n", written); > - > - for (i = 0; i < client->vec_len; i++) { > - len += client->vec[i].iov_len; > - if (written < len) { > - serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, client- > >config, > - "%.*s", client->vec[i].iov_len - (len - > written), > - client->vec[i].iov_base); > - if (i) { > - memmove(client->vec, &client->vec[i], > - sizeof(struct iovec) * (client->vec_len - i)); > - client->vec_len -= i; > - } > - client->vec[0].iov_base = (char *)client->vec[0].iov_base + > (client- > >vec[0].iov_len - (len - written)); > - client->vec[0].iov_len = len - written; > - break; > - } else { > - serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, client- > >config, > - "%.*s", > - client->vec[i].iov_len, > client->vec[i].iov_base); > - } > - } > - if (len == written) { > - client->vec_len = 0; > - } > - serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, client- > >config, "\n"); > - > - /* Log progress information */ > - serf__context_progress_delta(client->ctx, 0, written); > - } > - > - return status; > -} > - > -static apr_status_t no_more_writes(serf_incoming_t *client) > -{ > - /* Note that we should hold new requests until we open our new socket. > */ > - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config, > - "stop writing on client 0x%x\n", client); > - > - /* Clear our iovec. */ > - client->vec_len = 0; > - > - /* Update the pollset to know we don't want to write on this socket any > - * more. > - */ > - serf_io__set_pollset_dirty(&client->io); > - return APR_SUCCESS; > -} > - > apr_status_t serf__incoming_client_flush(serf_incoming_t *client, > bool pump) > { > - apr_status_t status = APR_SUCCESS; > - apr_status_t read_status = APR_SUCCESS; > - serf_bucket_t *ostreamh = client->ostream_head; > - > - client->hit_eof = FALSE; > - > - while (status == APR_SUCCESS) { > - > - /* First try to write out what is already stored in the > - connection vecs. */ > - while (client->vec_len && !status) { > - status = socket_writev(client); > - > - /* If the write would have blocked, then we're done. > - * Don't try to write anything else to the socket. > - */ > - if (APR_STATUS_IS_EPIPE(status) > - || APR_STATUS_IS_ECONNRESET(status) > - || APR_STATUS_IS_ECONNABORTED(status)) > - return no_more_writes(client); > - } > - > - if (status || !pump) > - return status; > - else if (read_status || client->vec_len || client->hit_eof) > - return read_status; > - > - /* ### optimize at some point by using read_for_sendfile */ > - /* TODO: now that read_iovec will effectively try to return as much > - data as available, we probably don't want to read ALL_AVAIL, but > - a lower number, like the size of one or a few TCP packets, the > - available TCP buffer size ... */ > - client->hit_eof = 0; > - read_status = serf_bucket_read_iovec(ostreamh, > - SERF_READ_ALL_AVAIL, > - IOV_MAX, > - client->vec, > - &client->vec_len); > - > - if (read_status == SERF_ERROR_WAIT_CONN) { > - /* The bucket told us that it can't provide more data until > - more data is read from the socket. This normally happens > - during a SSL handshake. > - > - We should avoid looking for writability for a while so > - that (hopefully) something will appear in the bucket so > - we can actually write something. otherwise, we could > - end up in a CPU spin: socket wants something, but we > - don't have anything (and keep returning EAGAIN) */ > - client->stop_writing = true; > - serf_io__set_pollset_dirty(&client->io); > - > - read_status = APR_EAGAIN; > - } > - else if (APR_STATUS_IS_EAGAIN(read_status)) { > - > - /* We read some stuff, but did we read everything ? */ > - if (client->hit_eof) > - read_status = APR_SUCCESS; > - } > - else if (SERF_BUCKET_READ_ERROR(read_status)) { > - > - /* Something bad happened. Propagate any errors. */ > - return read_status; > - } > - } > - > - return status; > + return serf_pump__write(&client->pump, pump); > } > > static apr_status_t write_to_client(serf_incoming_t *client) > @@ -561,7 +386,7 @@ void serf_incoming_set_framing_type( > > if (client->skt) { > serf_io__set_pollset_dirty(&client->io); > - client->stop_writing = 0; > + client->pump.stop_writing = false; > > /* Close down existing protocol */ > if (client->protocol_baton && client->perform_teardown) { > @@ -745,10 +570,16 @@ apr_status_t serf_incoming_create2( > ic->closed = closed; > ic->closed_baton = closed_baton; > > - /* A bucket wrapped around our socket (for reading responses). */ > - ic->stream = NULL; > - ic->ostream_head = NULL; > - ic->ostream_tail = NULL; > + /* Store the connection specific info in the configuration store */ > + rv = serf__config_store_get_client_config(ctx, ic, &config, pool); > + if (rv) { > + apr_pool_destroy(ic->pool); > + return rv; > + } > + ic->config = config; > + > + /* Prepare wrapping the socket with buckets. */ > + serf_pump__init(&ic->pump, &ic->io, ic->skt, config, ic->allocator, ic- > >pool); > > ic->protocol_baton = NULL; > ic->perform_read = read_from_client; > @@ -762,14 +593,6 @@ apr_status_t serf_incoming_create2( > ic->desc.reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP; > ic->seen_in_pollset = 0; > > - /* Store the connection specific info in the configuration store */ > - rv = serf__config_store_get_client_config(ctx, ic, &config, pool); > - if (rv) { > - apr_pool_destroy(ic->pool); > - return rv; > - } > - ic->config = config; > - > rv = ctx->pollset_add(ctx->pollset_baton, > &ic->desc, &ic->io); > > @@ -928,38 +751,8 @@ apr_status_t serf__incoming_update_polls > But it also has the nice side effect of removing references > from the aggregate to requests that are done. > */ > - if (client->vec_len) { > - /* We still have vecs in the connection, which lifetime is > - managed by buckets inside client->ostream_head. > - > - Don't touch ostream as that might destroy the vecs */ > - > - data_waiting = true; > - } > - else { > - serf_bucket_t *ostream; > - > - ostream = client->ostream_head; > > - if (ostream) { > - const char *dummy_data; > - apr_size_t len; > - > - status = serf_bucket_peek(ostream, &dummy_data, &len); > - > - if (SERF_BUCKET_READ_ERROR(status) || len > 0) { > - /* DATA or error waiting */ > - data_waiting = TRUE; /* Error waiting */ > - } > - else if (! status || APR_STATUS_IS_EOF(status)) { > - data_waiting = FALSE; > - } > - else > - data_waiting = FALSE; /* EAGAIN / EOF / WAIT_CONN */ > - } > - else > - data_waiting = FALSE; > - } > + data_waiting = serf_pump__data_pending(&client->pump); > > if (data_waiting) { > desc.reqevents |= APR_POLLOUT; > > Modified: serf/trunk/protocols/fcgi_protocol.c > URL: > http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.c?rev=171 > 5005&r1=1715004&r2=1715005&view=diff > ========================================================== > ==================== > --- serf/trunk/protocols/fcgi_protocol.c (original) > +++ serf/trunk/protocols/fcgi_protocol.c Wed Nov 18 14:45:58 2015 > @@ -562,8 +562,8 @@ static apr_status_t fcgi_server_read(ser > serf_fcgi_protocol_t *fcgi = client->protocol_baton; > > if (! fcgi->stream) { > - fcgi->stream = client->stream; > - fcgi->ostream = client->ostream_tail; > + fcgi->stream = client->pump.stream; > + fcgi->ostream = client->pump.ostream_tail; > } > > return fcgi_read(fcgi); > @@ -574,8 +574,8 @@ static apr_status_t fcgi_server_write(se > serf_fcgi_protocol_t *fcgi = client->protocol_baton; > > if (!fcgi->stream) { > - fcgi->stream = client->stream; > - fcgi->ostream = client->ostream_tail; > + fcgi->stream = client->pump.stream; > + fcgi->ostream = client->pump.ostream_tail; > } > > return fcgi_write(fcgi); > @@ -606,8 +606,8 @@ void serf__fcgi_protocol_init_server(ser > fcgi->pool = protocol_pool; > fcgi->client = client; > fcgi->io = &client->io; > - fcgi->stream = client->stream; > - fcgi->ostream = client->ostream_tail; > + fcgi->stream = client->pump.stream; > + fcgi->ostream = client->pump.ostream_tail; > fcgi->allocator = client->allocator; > fcgi->config = client->config; > > > Modified: serf/trunk/protocols/http2_protocol.c > URL: > http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1 > 715005&r1=1715004&r2=1715005&view=diff > ========================================================== > ==================== > --- serf/trunk/protocols/http2_protocol.c (original) > +++ serf/trunk/protocols/http2_protocol.c Wed Nov 18 14:45:58 2015 > @@ -331,8 +331,8 @@ void serf__http2_protocol_init_server(se > h2->pool = protocol_pool; > h2->client = client; > h2->io = &client->io; > - h2->stream = client->stream; > - h2->ostream = client->ostream_tail; > + h2->stream = client->pump.stream; > + h2->ostream = client->pump.ostream_tail; > h2->allocator = client->allocator; > h2->config = client->config; > > @@ -1728,9 +1728,9 @@ http2_incoming_read(serf_incoming_t *cli > > /* If the stop_writing flag was set on the connection, reset it now > because > there is some data to read. */ > - if (client->stop_writing) > + if (client->pump.stop_writing) > { > - client->stop_writing = 0; > + client->pump.stop_writing = false; > serf_io__set_pollset_dirty(&client->io); > } > > @@ -1740,7 +1740,7 @@ http2_incoming_read(serf_incoming_t *cli > if (client->proto_peek_bkt) > stream = client->proto_peek_bkt; > else > - stream = client->stream; > + stream = client->pump.stream; > > do { > const char *data; > @@ -1766,7 +1766,7 @@ http2_incoming_read(serf_incoming_t *cli > serf_bucket_destroy(client->proto_peek_bkt); > client->proto_peek_bkt = NULL; > > - h2->stream = client->stream; > + h2->stream = client->pump.stream; > } > > if (APR_STATUS_IS_EAGAIN(status) || status == > SERF_ERROR_WAIT_CONN) > > Copied: serf/trunk/pump.c (from r1714992, serf/trunk/outgoing.c) > URL: > http://svn.apache.org/viewvc/serf/trunk/pump.c?p2=serf/trunk/pump.c&p > 1=serf/trunk/outgoing.c&r1=1714992&r2=1715005&rev=1715005&view=diff > ========================================================== > ==================== > --- serf/trunk/outgoing.c (original) > +++ serf/trunk/pump.c Wed Nov 18 14:45:58 2015 > @@ -29,353 +29,111 @@ > > #include "serf_private.h" > > -/* forward definitions */ > -static apr_status_t read_from_connection(serf_connection_t *conn); > -static apr_status_t write_to_connection(serf_connection_t *conn); > -static apr_status_t hangup_connection(serf_connection_t *conn); > - > -#define REQS_IN_PROGRESS(conn) \ > - ((conn)->completed_requests - (conn)->completed_responses) > - > -/* cleanup for sockets */ > -static apr_status_t clean_skt(void *data) > +apr_status_t pump_cleanup(void *baton) > { > - serf_connection_t *conn = data; > - apr_status_t status = APR_SUCCESS; > + serf_pump_t *pump = baton; > > - if (conn->skt) { > - status = apr_socket_close(conn->skt); > - conn->skt = NULL; > - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, > - "closed socket, status %d\n", status); > - serf_config_remove_value(conn->config, > SERF_CONFIG_CONN_LOCALIP); > - serf_config_remove_value(conn->config, > SERF_CONFIG_CONN_REMOTEIP); > + if (pump->ostream_head != NULL) { > +#ifdef SERF_DEBUG_BUCKET_USE > + serf__bucket_drain(conn->ostream_head); > +#endif > + serf_bucket_destroy(pump->ostream_head); > + pump->ostream_head = NULL; > + pump->ostream_tail = NULL; > } > > - return status; > + return APR_SUCCESS; > } > > -/* cleanup for conns */ > -static apr_status_t clean_conn(void *data) > -{ > - serf_connection_t *conn = data; > - > - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, > - "cleaning up connection 0x%x\n", conn); > - serf_connection_close(conn); > +void serf_pump__init(serf_pump_t *pump, > + serf_io_baton_t *io, > + apr_socket_t *skt, > + serf_config_t *config, > + serf_bucket_alloc_t *allocator, > + apr_pool_t *pool) > +{ > + memset(pump, 0, sizeof(*pump)); > + > + pump->io = io; > + pump->allocator = allocator; > + pump->config = config; > + pump->skt = skt; > > - return APR_SUCCESS; > + apr_pool_cleanup_register(pool, pump, pump_cleanup, > + apr_pool_cleanup_null); > } > > /* Safely check if there is still data pending on the connection, carefull > to not accidentally make it invalid. */ > -static int > -data_pending(serf_connection_t *conn) > +bool serf_pump__data_pending(serf_pump_t *pump) > { > - if (conn->vec_len > 0) > + if (pump->vec_len > 0) > return TRUE; /* We can't poll right now! */ > > - if (conn->ostream_head) { > - const char *dummy; > + if (pump->ostream_head) { > + const char *data; > apr_size_t len; > apr_status_t status; > > - status = serf_bucket_peek(conn->ostream_head, &dummy, > - &len); > + status = serf_bucket_peek(pump->ostream_head, &data, &len); > if (!SERF_BUCKET_READ_ERROR(status)) { > if (len > 0) { > - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn- > >config, > + serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, pump- > >config, > "Extra data to be written after sending complete " > "requests.\n"); > - return TRUE; > + return true; > } > } > else > - return TRUE; /* Sure, we have data (an error) */ > - } > - > - return FALSE; > -} > - > -static int > -request_pending(serf_request_t **next_req, serf_connection_t *conn) > -{ > - /* Prepare the next request */ > - if (conn->framing_type != SERF_CONNECTION_FRAMING_TYPE_NONE > - && (conn->pipelining || (!conn->pipelining && > REQS_IN_PROGRESS(conn) == 0))) > - { > - /* Skip all requests that have been written completely but we're > still > - waiting for a response. */ > - serf_request_t *request = conn->unwritten_reqs; > - > - if (next_req) > - *next_req = request; > - > - if (request != NULL) { > - return TRUE; > - } > + return true; /* Sure, we have data (an error) */ > } > - else if (next_req) > - *next_req = NULL; > - > - return FALSE; > -} > - > -/* Check if there is data waiting to be sent over the socket. This can happen > - in two situations: > - - The connection queue has atleast one request with unwritten data. > - - All requests are written and the ssl layer wrote some data while reading > - the response. This can happen when the server triggers a renegotiation, > - e.g. after the first and only request on that connection was received. > - Returns 1 if data is pending on CONN, NULL if not. > - If NEXT_REQ is not NULL, it will be filled in with the next available > request > - with unwritten data. */ > -static int > -request_or_data_pending(serf_request_t **next_req, serf_connection_t > *conn) > -{ > - if (request_pending(next_req, conn)) > - return TRUE; > - > - return data_pending(conn); > -} > - > -/* Update the pollset for this connection. We tweak the pollset based on > - * whether we want to read and/or write, given conditions within the > - * connection. If the connection is not (yet) in the pollset, then it > - * will be added. > - */ > -apr_status_t serf__conn_update_pollset(serf_connection_t *conn) > -{ > - serf_context_t *ctx = conn->ctx; > - apr_status_t status; > - apr_pollfd_t desc = { 0 }; > - int data_waiting; > > - if (!conn->skt) { > - return APR_SUCCESS; > - } > - > - /* Remove the socket from the poll set. */ > - desc.desc_type = APR_POLL_SOCKET; > - desc.desc.s = conn->skt; > - desc.reqevents = conn->io.reqevents; > - > - status = ctx->pollset_rm(ctx->pollset_baton, > - &desc, &conn->io); > - if (status && !APR_STATUS_IS_NOTFOUND(status)) > - return status; > - > - /* Now put it back in with the correct read/write values. */ > - desc.reqevents = APR_POLLHUP | APR_POLLERR; > - > - /* If we are not connected yet, we just want to know when we are */ > - if (conn->wait_for_connect) { > - data_waiting = TRUE; > - desc.reqevents |= APR_POLLOUT; > - } > - else { > - /* Directly look at the connection data. While this may look > - more expensive than the cheap checks later this peek is > - just checking a bit of ram. > - > - But it also has the nice side effect of removing references > - from the aggregate to requests that are done. > - */ > - if (conn->vec_len) { > - /* We still have vecs in the connection, which lifetime is > - managed by buckets inside conn->ostream_head. > - > - Don't touch ostream as that might destroy the vecs */ > - > - data_waiting = (conn->state != SERF_CONN_CLOSING); > - } > - else { > - serf_bucket_t *ostream; > - > - ostream = conn->ostream_head; > - > - if (!ostream) > - ostream = conn->ssltunnel_ostream; > - > - if (ostream) { > - const char *dummy_data; > - apr_size_t len; > - > - status = serf_bucket_peek(ostream, &dummy_data, &len); > - > - if (SERF_BUCKET_READ_ERROR(status) || len > 0) { > - /* DATA or error waiting */ > - data_waiting = TRUE; /* Error waiting */ > - } > - else if (! status || APR_STATUS_IS_EOF(status)) { > - data_waiting = FALSE; > - } > - else > - data_waiting = FALSE; /* EAGAIN / EOF / WAIT_CONN */ > - } > - else > - data_waiting = FALSE; > - } > - > - if (data_waiting) { > - desc.reqevents |= APR_POLLOUT; > - } > - } > - > - if ((conn->written_reqs || conn->unwritten_reqs) && > - conn->state != SERF_CONN_INIT) { > - /* If there are any outstanding events, then we want to read. */ > - /* ### not true. we only want to read IF we have sent some data */ > - desc.reqevents |= APR_POLLIN; > - > - /* Don't write if OpenSSL told us that it needs to read data first. > */ > - if (! conn->stop_writing && !data_waiting) { > - > - /* This check is duplicated in write_to_connection() */ > - if ((conn->probable_keepalive_limit && > - conn->completed_requests > conn->probable_keepalive_limit) > || > - (conn->max_outstanding_requests && > - REQS_IN_PROGRESS(conn) >= conn->max_outstanding_requests)) > { > - > - /* we wouldn't try to write any way right now. */ > - } > - else if (request_pending(NULL, conn)) { > - desc.reqevents |= APR_POLLOUT; > - } > - } > - } > - > - /* If we can have async responses, always look for something to read. */ > - if (conn->framing_type != SERF_CONNECTION_FRAMING_TYPE_HTTP1 > - || conn->async_responses) > - { > - desc.reqevents |= APR_POLLIN; > - } > - > - /* save our reqevents, so we can pass it in to remove later. */ > - conn->io.reqevents = desc.reqevents; > - > - /* Note: even if we don't want to read/write this socket, we still > - * want to poll it for hangups and errors. > - */ > - return ctx->pollset_add(ctx->pollset_baton, > - &desc, &conn->io); > + return false; > } > > -#ifdef SERF_DEBUG_BUCKET_USE > - > -/* Make sure all response buckets were drained. */ > -static void check_buckets_drained(serf_connection_t *conn) > +static apr_status_t detect_eof(void *baton, serf_bucket_t > *aggregate_bucket) > { > - serf_request_t *request = conn->written_reqs; > + serf_pump_t *pump = baton; > + pump->hit_eof = true; > > - for ( ; request ; request = request->next ) { > - if (request->resp_bkt != NULL) { > - /* ### crap. can't do this. this allocator may have un-drained > - * ### REQUEST buckets. > - */ > - /* serf_debug__entered_loop(request->resp_bkt->allocator); */ > - /* ### for now, pretend we closed the conn (resets the tracking) > */ > - serf_debug__closed_conn(request->resp_bkt->allocator); > - } > + if (pump->done_writing) { > + pump->ostream_tail = NULL; > + return APR_EOF; > } > + else > + return APR_EAGAIN; > } > > -#endif > - > -/* Destroys all outstanding write information, to allow cleanup of subpools > - that may still have data in these buckets to continue */ > -void serf__connection_pre_cleanup(serf_connection_t *conn) > +void serf_pump__prepare_setup(serf_pump_t *pump) > { > - serf_request_t *rq; > - conn->vec_len = 0; > - > - if (conn->ostream_head != NULL) { > -#ifdef SERF_DEBUG_BUCKET_USE > - serf__bucket_drain(conn->ostream_head); > -#endif > - serf_bucket_destroy(conn->ostream_head); > - conn->ostream_head = NULL; > - conn->ostream_tail = NULL; > - } > - if (conn->ssltunnel_ostream != NULL) { > - serf_bucket_destroy(conn->ssltunnel_ostream); > - conn->ssltunnel_ostream = NULL; > + if (pump->ostream_head == NULL) { > + pump->ostream_head = serf_bucket_aggregate_create(pump- > >allocator); > } > > - /* Tell all written request that they are free to destroy themselves */ > - rq = conn->written_reqs; > - while (rq != NULL) { > - if (rq->writing == SERF_WRITING_STARTED > - || rq->writing == SERF_WRITING_DONE) { > + if (pump->ostream_tail == NULL) { > + pump->ostream_tail = serf_bucket_aggregate_create(pump- > >allocator); > > - rq->writing = SERF_WRITING_FINISHED; > - } > - rq = rq->next; > + serf_bucket_aggregate_hold_open(pump->ostream_tail, detect_eof, > pump); > } > - > - /* Destroy the requests that were queued up to destroy later */ > - while ((rq = conn->done_reqs)) { > - conn->done_reqs = rq->next; > - > - rq->writing = SERF_WRITING_FINISHED; > - serf__destroy_request(rq); > - } > - conn->done_reqs = conn->done_reqs_tail = NULL; > } > > -static apr_status_t detect_eof(void *baton, serf_bucket_t > *aggregate_bucket) > +void serf_pump__complete_setup(serf_pump_t *pump, > + serf_bucket_t *ostream) > { > - serf_connection_t *conn = baton; > - conn->hit_eof = 1; > - return APR_EAGAIN; > -} > - > -static apr_status_t do_conn_setup(serf_connection_t *conn) > -{ > - apr_status_t status; > - serf_bucket_t *ostream; > - > - /* ### dunno what the hell this is about. this latency stuff got > - ### added, and who knows whether it should stay... */ > - conn->latency = apr_time_now() - conn->connect_time; > - > - if (conn->ostream_head == NULL) { > - conn->ostream_head = serf_bucket_aggregate_create(conn- > >allocator); > - } > - > - if (conn->ostream_tail == NULL) { > - conn->ostream_tail = serf_bucket_aggregate_create(conn->allocator); > - > - serf_bucket_aggregate_hold_open(conn->ostream_tail, detect_eof, > conn); > - } > - > - ostream = conn->ostream_tail; > - > - status = (*conn->setup)(conn->skt, > - &conn->stream, > - &ostream, > - conn->setup_baton, > - conn->pool); > - if (status) { > - /* extra destroy here since it wasn't added to the head bucket yet. > */ > - serf_bucket_destroy(conn->ostream_tail); > - serf__connection_pre_cleanup(conn); > - return status; > - } > + if (ostream) > + serf_bucket_aggregate_append(pump->ostream_head, ostream); > + else > + serf_bucket_aggregate_append(pump->ostream_head, pump- > >ostream_tail); > > /* Share the configuration with all the buckets in the newly created > output > chain (see PLAIN or ENCRYPTED scenario's), including the request buckets > created by the application (ostream_tail will handle this for us). */ > - serf_bucket_set_config(conn->ostream_head, conn->config); > + serf_bucket_set_config(pump->ostream_head, pump->config); > > /* Share the configuration with the ssl_decrypt and socket buckets. The > response buckets wrapping the ssl_decrypt/socket buckets won't get the > config automatically because they are upstream. */ > - serf_bucket_set_config(conn->stream, conn->config); > - > - serf_bucket_aggregate_append(conn->ostream_head, > - ostream); > + serf_bucket_set_config(pump->stream, pump->config); > > /* We typically have one of two scenarios, based on whether the > application decided to encrypt this connection: > @@ -394,381 +152,53 @@ static apr_status_t do_conn_setup(serf_c > > where STREAM is an internal variant of AGGREGATE. > */ > - > - return status; > } > > -/* Set up the input and output stream buckets. > - When a tunnel over an http proxy is needed, create a socket bucket and > - empty aggregate bucket for sending and receiving unencrypted requests > - over the socket. > - > - After the tunnel is there, or no tunnel was needed, ask the application > - to create the input and output buckets, which should take care of the > - [en/de]cryption. > - */ > - > -static apr_status_t prepare_conn_streams(serf_connection_t *conn, > - serf_bucket_t **ostreamt, > - serf_bucket_t **ostreamh) > +void serf_pump__store_ipaddresses_in_config(serf_pump_t *pump) > { > - apr_status_t status; > + apr_sockaddr_t *sa; > > - /* Do we need a SSL tunnel first? */ > - if (conn->state == SERF_CONN_CONNECTED) { > - /* If the connection does not have an associated bucket, then > - * call the setup callback to get one. > - */ > - if (conn->stream == NULL) { > - status = do_conn_setup(conn); > - if (status) { > - return status; > - } > - } > - *ostreamt = conn->ostream_tail; > - *ostreamh = conn->ostream_head; > - } else if (conn->state == SERF_CONN_SETUP_SSLTUNNEL) { > - > - /* SSL tunnel needed and not set up yet, get a direct unencrypted > - stream for this socket */ > - if (conn->stream == NULL) { > - conn->stream = serf_context_bucket_socket_create(conn->ctx, > - conn->skt, > - > conn->allocator); > - } > - > - /* Don't create the ostream bucket chain including the ssl_encrypt > - bucket yet. This ensure the CONNECT request is sent unencrypted > - to the proxy. */ > - *ostreamt = *ostreamh = conn->ssltunnel_ostream; > - } else { > - /* SERF_CONN_CLOSING or SERF_CONN_INIT */ > - > - *ostreamt = conn->ostream_tail; > - *ostreamh = conn->ostream_head; > - } > - > - return APR_SUCCESS; > -} > - > -static void store_ipaddresses_in_config(serf_config_t *config, > - apr_socket_t *skt) > -{ > - apr_sockaddr_t *sa; > - > - if (apr_socket_addr_get(&sa, APR_LOCAL, skt) == APR_SUCCESS) { > + if (apr_socket_addr_get(&sa, APR_LOCAL, pump->skt) == APR_SUCCESS) > { > char buf[48]; > if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa)) > - serf_config_set_stringf(config, SERF_CONFIG_CONN_LOCALIP, > + serf_config_set_stringf(pump->config, > SERF_CONFIG_CONN_LOCALIP, > "%s:%d", buf, sa->port); > } > - if (apr_socket_addr_get(&sa, APR_REMOTE, skt) == APR_SUCCESS) { > + if (apr_socket_addr_get(&sa, APR_REMOTE, pump->skt) == > APR_SUCCESS) { > char buf[48]; > if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa)) > - serf_config_set_stringf(config, SERF_CONFIG_CONN_REMOTEIP, > + serf_config_set_stringf(pump->config, > SERF_CONFIG_CONN_REMOTEIP, > "%s:%d", buf, sa->port); > } > } > > -static apr_status_t connect_connection(serf_connection_t *conn) > -{ > - serf_context_t *ctx = conn->ctx; > - apr_status_t status; > - > - store_ipaddresses_in_config(conn->config, conn->skt); > - > - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, > - "socket for conn 0x%x connected\n", conn); > - > - /* If the authentication was already started on another connection, > - prepare this connection (it might be possible to skip some > - part of the handshaking). */ > - if (ctx->proxy_address) { > - status = serf__auth_setup_connection(PROXY, conn); > - if (status) { > - return status; > - } > - } > - > - status = serf__auth_setup_connection(HOST, conn); > - if (status) > - return status; > - > - /* Does this connection require a SSL tunnel over the proxy? */ > - if (ctx->proxy_address && strcmp(conn->host_info.scheme, "https") == > 0) > - serf__ssltunnel_connect(conn); > - else { > - conn->state = SERF_CONN_CONNECTED; > - status = do_conn_setup(conn); > - } > - > - return APR_SUCCESS; > -} > - > -/* Create and connect sockets for any connections which don't have them > - * yet. This is the core of our lazy-connect behavior. > - */ > -apr_status_t serf__open_connections(serf_context_t *ctx) > -{ > - int i; > - > - for (i = ctx->conns->nelts; i--; ) { > - serf_connection_t *conn = GET_CONN(ctx, i); > - apr_status_t status; > - apr_socket_t *skt; > - > - conn->seen_in_pollset = 0; > - > - if (conn->skt != NULL) { > -#ifdef SERF_DEBUG_BUCKET_USE > - check_buckets_drained(conn); > -#endif > - continue; > - } > - > - /* Delay opening until we have something to deliver! */ > - if (conn->unwritten_reqs == NULL) { > - continue; > - } > - > - apr_pool_clear(conn->skt_pool); > - status = apr_socket_create(&skt, conn->address->family, > - SOCK_STREAM, > -#if APR_MAJOR_VERSION > 0 > - APR_PROTO_TCP, > -#endif > - conn->skt_pool); > - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, > - "created socket for conn 0x%x, status %d\n", conn, status); > - if (status != APR_SUCCESS) > - return status; > - > - apr_pool_cleanup_register(conn->skt_pool, conn, clean_skt, > - apr_pool_cleanup_null); > - > - /* Set the socket to be non-blocking */ > - if ((status = apr_socket_timeout_set(skt, 0)) != APR_SUCCESS) > - return status; > - > - /* Disable Nagle's algorithm */ > - if ((status = apr_socket_opt_set(skt, > - APR_TCP_NODELAY, 1)) != APR_SUCCESS) > - return status; > - > - /* Configured. Store it into the connection now. */ > - conn->skt = skt; > - > - /* Remember time when we started connecting to server to calculate > - network latency. */ > - conn->connect_time = apr_time_now(); > - > - /* Now that the socket is set up, let's connect it. This should > - * return immediately. > - */ > - status = apr_socket_connect(skt, conn->address); > - if (status != APR_SUCCESS) { > - if (!APR_STATUS_IS_EINPROGRESS(status)) > - return status; > - > - /* Keep track of when we really connect */ > - conn->wait_for_connect = TRUE; > - } > - > - status = serf_config_set_string(conn->config, > - SERF_CONFIG_CONN_PIPELINING, > - (conn->max_outstanding_requests != 1 && > - conn->pipelining == 1) ? "Y" : "N"); > - if (status) > - return status; > - > - /* Flag our pollset as dirty now that we have a new socket. */ > - serf_io__set_pollset_dirty(&conn->io); > - > - if (! conn->wait_for_connect) { > - status = connect_connection(conn); > - > - if (status) > - return status; > - } > - } > - > - return APR_SUCCESS; > -} > - > -static apr_status_t no_more_writes(serf_connection_t *conn) > +static apr_status_t no_more_writes(serf_pump_t *pump) > { > /* Note that we should hold new requests until we open our new socket. > */ > - conn->state = SERF_CONN_CLOSING; > - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, > - "stop writing on conn 0x%x\n", conn); > + pump->done_writing = true; > + serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, pump->config, > + "stop writing on 0x%x\n", pump->io->u.conn); > > /* Clear our iovec. */ > - conn->vec_len = 0; > + pump->vec_len = 0; > > /* Update the pollset to know we don't want to write on this socket any > * more. > */ > - serf_io__set_pollset_dirty(&conn->io); > - return APR_SUCCESS; > -} > - > -/* Read the 'Connection' header from the response. Return > SERF_ERROR_CLOSING if > - * the header contains value 'close' indicating the server is closing the > - * connection right after this response. > - * Otherwise returns APR_SUCCESS. > - */ > -static apr_status_t is_conn_closing(serf_bucket_t *response) > -{ > - serf_bucket_t *hdrs; > - const char *val; > - > - hdrs = serf_bucket_response_get_headers(response); > - val = serf_bucket_headers_get(hdrs, "Connection"); > - if (val && strcasecmp("close", val) == 0) > - { > - return SERF_ERROR_CLOSING; > - } > - > - return APR_SUCCESS; > -} > - > -static apr_status_t remove_connection(serf_context_t *ctx, > - serf_connection_t *conn) > -{ > - apr_pollfd_t desc = { 0 }; > - > - desc.desc_type = APR_POLL_SOCKET; > - desc.desc.s = conn->skt; > - desc.reqevents = conn->io.reqevents; > - > - return ctx->pollset_rm(ctx->pollset_baton, > - &desc, &conn->io); > -} > - > -/* A socket was closed, inform the application. */ > -static void handle_conn_closed(serf_connection_t *conn, apr_status_t > status) > -{ > - (*conn->closed)(conn, conn->closed_baton, status, > - conn->pool); > -} > - > -static apr_status_t reset_connection(serf_connection_t *conn, > - int requeue_requests) > -{ > - serf_context_t *ctx = conn->ctx; > - apr_status_t status; > - serf_request_t *old_reqs; > - > - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, > - "reset connection 0x%x\n", conn); > - > - conn->probable_keepalive_limit = conn->completed_responses; > - conn->completed_requests = 0; > - conn->completed_responses = 0; > - > - /* Clear the unwritten_reqs queue, so the application can requeue > cancelled > - requests on it for the new socket. */ > - old_reqs = conn->unwritten_reqs; > - conn->unwritten_reqs = NULL; > - conn->unwritten_reqs_tail = NULL; > - > - serf__connection_pre_cleanup(conn); > - > - /* First, cancel all written requests for which we haven't received a > - response yet. Inform the application that the request is cancelled, > - so it can requeue them if needed. */ > - while (conn->written_reqs) { > - serf__cancel_request(conn->written_reqs, &conn->written_reqs, > - requeue_requests); > - } > - conn->written_reqs_tail = NULL; > - > - /* Handle all outstanding unwritten requests. > - TODO: what about a partially written request? */ > - while (old_reqs) { > - /* If we haven't started to write the connection, bring it over > - * unchanged to our new socket. > - * Do not copy a CONNECT request to the new connection, the ssl > tunnel > - * setup code will create a new CONNECT request already. > - */ > - if (requeue_requests && (old_reqs->writing == SERF_WRITING_NONE) > && > - !old_reqs->ssltunnel) { > - > - serf_request_t *req = old_reqs; > - old_reqs = old_reqs->next; > - req->next = NULL; > - serf__link_requests(&conn->unwritten_reqs, > - &conn->unwritten_reqs_tail, > - req); > - } > - else { > - /* We don't want to requeue the request or this request was > partially > - written. Inform the application that the request is > cancelled. */ > - serf__cancel_request(old_reqs, &old_reqs, requeue_requests); > - } > - } > - > - /* Requests queue has been prepared for a new socket, close the old > one. */ > - if (conn->skt != NULL) { > - remove_connection(ctx, conn); > - status = clean_skt(conn); > - if (conn->closed != NULL) { > - handle_conn_closed(conn, status); > - } > - } > - > - if (conn->stream != NULL) { > - serf_bucket_destroy(conn->stream); > - conn->stream = NULL; > - } > - > - /* Don't try to resume any writes */ > - conn->vec_len = 0; > - > - serf_io__set_pollset_dirty(&conn->io); > - conn->state = SERF_CONN_INIT; > - > - conn->hit_eof = 0; > - conn->connect_time = 0; > - conn->latency = -1; > - conn->stop_writing = 0; > - conn->write_now = 0; > - /* conn->pipelining */ > - > - conn->framing_type = SERF_CONNECTION_FRAMING_TYPE_HTTP1; > - > - if (conn->protocol_baton) { > - conn->perform_teardown(conn); > - conn->protocol_baton = NULL; > - } > - > - conn->perform_read = read_from_connection; > - conn->perform_write = write_to_connection; > - conn->perform_hangup = hangup_connection; > - conn->perform_teardown = NULL; > - > - conn->status = APR_SUCCESS; > - > - /* Let our context know that we've 'reset' the socket already. */ > - conn->seen_in_pollset |= APR_POLLHUP; > - > - /* Recalculate the current list length */ > - conn->nr_of_written_reqs = 0; > - conn->nr_of_unwritten_reqs = serf__req_list_length(conn- > >unwritten_reqs); > - > - /* Found the connection. Closed it. All done. */ > + serf_io__set_pollset_dirty(pump->io); > return APR_SUCCESS; > } > > -static apr_status_t socket_writev(serf_connection_t *conn) > +static apr_status_t socket_writev(serf_pump_t *pump) > { > apr_size_t written; > apr_status_t status; > + serf_pump_t *conn = pump; > > - status = apr_socket_sendv(conn->skt, conn->vec, > - conn->vec_len, &written); > + status = apr_socket_sendv(pump->skt, pump->vec, > + pump->vec_len, &written); > if (status && !APR_STATUS_IS_EAGAIN(status)) > - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, > + serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, pump- > >config, > "socket_sendv error %d\n", status); > > /* did we write everything? */ > @@ -805,18 +235,18 @@ static apr_status_t socket_writev(serf_c > serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config, > "\n"); > > /* Log progress information */ > - serf__context_progress_delta(conn->ctx, 0, written); > + serf__context_progress_delta(conn->io->ctx, 0, written); > } > > return status; > } > > -apr_status_t serf__connection_flush(serf_connection_t *conn, > - int pump) > +apr_status_t serf_pump__write(serf_pump_t *pump, > + bool fetch_new) > { > apr_status_t status = APR_SUCCESS; > apr_status_t read_status = APR_SUCCESS; > - serf_bucket_t *ostreamh = NULL; > + serf_pump_t *const conn = pump; > > conn->hit_eof = FALSE; > > @@ -841,23 +271,13 @@ apr_status_t serf__connection_flush(serf > else if (read_status || conn->vec_len || conn->hit_eof) > return read_status; > > - /* Ok, with the vecs written, we can now refill the per connection > - output vecs */ > - if (!ostreamh) { > - serf_bucket_t *ostreamt; > - > - status = prepare_conn_streams(conn, &ostreamt, &ostreamh); > - if (status) > - return status; > - } > - > /* ### optimize at some point by using read_for_sendfile */ > /* TODO: now that read_iovec will effectively try to return as much > data as available, we probably don't want to read ALL_AVAIL, but > a lower number, like the size of one or a few TCP packets, the > available TCP buffer size ... */ > conn->hit_eof = 0; > - read_status = serf_bucket_read_iovec(ostreamh, > + read_status = serf_bucket_read_iovec(pump->ostream_head, > SERF_READ_ALL_AVAIL, > IOV_MAX, > conn->vec, > @@ -873,8 +293,8 @@ apr_status_t serf__connection_flush(serf > we can actually write something. otherwise, we could > end up in a CPU spin: socket wants something, but we > don't have anything (and keep returning EAGAIN) */ > - conn->stop_writing = 1; > - serf_io__set_pollset_dirty(&conn->io); > + conn->stop_writing = true; > + serf_io__set_pollset_dirty(conn->io); > > read_status = APR_EAGAIN; > } > @@ -894,936 +314,3 @@ apr_status_t serf__connection_flush(serf > return status; > } > > -/* Implements serf_bucket_event_callback_t and is called (potentially > - more than once) after the request buckets are completely read. > - > - At this time we know the request is written, but we can't destroy > - the buckets yet as they might still be referenced by the connection > - vecs. */ > -static apr_status_t request_writing_done(void *baton, > - apr_uint64_t bytes_read) > -{ > - serf_request_t *request = baton; > - > - if (request->writing == SERF_WRITING_STARTED) { > - request->writing = SERF_WRITING_DONE; > - > - /* TODO: Handle request done */ > - } > - return APR_EOF; /* Done with the event bucket */ > -} > - > - > -/* Implements serf_bucket_event_callback_t and is called after the > - request buckets are no longer needed. More precisely the outgoing > - buckets are already destroyed. */ > -static apr_status_t request_writing_finished(void *baton, > - apr_uint64_t bytes_read) > -{ > - serf_request_t *request = baton; > - serf_connection_t *conn = request->conn; > - > - request->req_bkt = NULL; /* Bucket is destroyed by now */ > - > - if (request->writing == SERF_WRITING_DONE) { > - request->writing = SERF_WRITING_FINISHED; > - > - /* Move the request to the written queue */ > - serf__link_requests(&conn->written_reqs, &conn->written_reqs_tail, > - request); > - conn->nr_of_written_reqs++; > - conn->unwritten_reqs = conn->unwritten_reqs->next; > - conn->nr_of_unwritten_reqs--; > - request->next = NULL; > - > - /* If our connection has async responses enabled, we're not > - * going to get a reply back, so kill the request. > - */ > - if (conn->async_responses) { > - conn->unwritten_reqs = request->next; > - conn->nr_of_unwritten_reqs--; > - serf__destroy_request(request); > - } > - > - conn->completed_requests++; > - } > - /* Destroy (all) requests that are now safe to destroy, > - Typically non or just the finished one */ > - { > - serf_request_t *last = NULL; > - serf_request_t **rq = &conn->done_reqs; > - while (*rq) { > - request = *rq; > - if ((*rq)->writing == SERF_WRITING_FINISHED) { > - request = *rq; > - *rq = request->next; > - serf__destroy_request(request); > - } > - else { > - last = *rq; > - rq = &last->next; > - } > - } > - > - conn->done_reqs_tail = last; > - } > - > - return APR_EOF; /* Done with event bucket. Status is ignored */ > -} > - > -/* write data out to the connection */ > -static apr_status_t write_to_connection(serf_connection_t *conn) > -{ > - /* Keep reading and sending until we run out of stuff to read, or > - * writing would block. > - */ > - while (1) { > - serf_request_t *request; > - apr_status_t status; > - apr_status_t read_status; > - serf_bucket_t *ostreamt; > - serf_bucket_t *ostreamh; > - > - /* If we have unwritten data in iovecs, then write what we can > - directly. */ > - status = serf__connection_flush(conn, FALSE); > - if (APR_STATUS_IS_EAGAIN(status)) > - return APR_SUCCESS; > - else if (status) > - return status; > - > - /* If we're setting up an ssl tunnel, we can't send real requests > - as yet, as they need to be encrypted and our encrypt buckets > - aren't created yet as we still need to read the unencrypted > - response of the CONNECT request. */ > - if (conn->state == SERF_CONN_SETUP_SSLTUNNEL > - && REQS_IN_PROGRESS(conn) > 0) > - { > - /* But flush out SSL data when necessary! */ > - status = serf__connection_flush(conn, TRUE); > - if (APR_STATUS_IS_EAGAIN(status)) > - return APR_SUCCESS; > - > - return status; > - } > - > - /* We try to limit the number of in-flight requests so that we > - don't have to repeat too many if the connection drops. > - > - This check matches that in serf__conn_update_pollset() > - */ > - if ((conn->probable_keepalive_limit && > - conn->completed_requests > conn->probable_keepalive_limit) || > - (conn->max_outstanding_requests && > - REQS_IN_PROGRESS(conn) >= conn->max_outstanding_requests)) { > - > - serf_io__set_pollset_dirty(&conn->io); > - > - /* backoff for now. */ > - return APR_SUCCESS; > - } > - > - /* We may need to move forward to a request which has something > - * to write. > - */ > - if (!request_or_data_pending(&request, conn)) { > - /* No more requests (with data) are registered with the > - * connection, and no data is pending on the outgoing stream. > - * Let's update the pollset so that we don't try to write to this > - * socket again. > - */ > - serf_io__set_pollset_dirty(&conn->io); > - return APR_SUCCESS; > - } > - > - status = prepare_conn_streams(conn, &ostreamt, &ostreamh); > - if (status) { > - return status; > - } > - > - if (request && request->writing == SERF_WRITING_NONE) { > - serf_bucket_t *event_bucket; > - > - if (request->req_bkt == NULL) { > - read_status = serf__setup_request(request); > - if (read_status) { > - /* Something bad happened. Propagate any errors. */ > - return read_status; > - } > - } > - > - request->writing = SERF_WRITING_STARTED; > - > - /* And now add an event bucket to keep track of when the request > - has been completely written */ > - event_bucket = serf__bucket_event_create(request->req_bkt, > - request, > - NULL, > - request_writing_done, > - > request_writing_finished, > - conn->allocator); > - serf_bucket_aggregate_append(ostreamt, event_bucket); > - } > - > - /* If we got some data, then deliver it. */ > - /* ### what to do if we got no data?? is that a problem? */ > - status = serf__connection_flush(conn, TRUE); > - if (APR_STATUS_IS_EAGAIN(status)) > - return APR_SUCCESS; > - else if (status) > - return status; > - > - } > - /* NOTREACHED */ > -} > - > - > - > -/* An async response message was received from the server. */ > -static apr_status_t handle_async_response(serf_connection_t *conn, > - apr_pool_t *pool) > -{ > - apr_status_t status; > - > - if (conn->current_async_response == NULL) { > - conn->current_async_response = > - (*conn->async_acceptor)(NULL, conn->stream, > - conn->async_acceptor_baton, pool); > - } > - > - status = (*conn->async_handler)(NULL, conn->current_async_response, > - conn->async_handler_baton, pool); > - > - if (APR_STATUS_IS_EOF(status)) { > - serf_bucket_destroy(conn->current_async_response); > - conn->current_async_response = NULL; > - status = APR_SUCCESS; > - } > - > - return status; > -} > - > -/* read data from the connection */ > -static apr_status_t read_from_connection(serf_connection_t *conn) > -{ > - apr_status_t status; > - apr_pool_t *tmppool; > - apr_status_t close_connection = APR_SUCCESS; > - > - /* Whatever is coming in on the socket corresponds to the first request > - * on our chain. > - */ > - serf_request_t *request = conn->written_reqs; > - if (!request) { > - /* Request wasn't completely written yet! */ > - request = conn->unwritten_reqs; > - } > - > - /* If the stop_writing flag was set on the connection, reset it now > because > - there is some data to read. */ > - if (conn->stop_writing) { > - conn->stop_writing = 0; > - serf_io__set_pollset_dirty(&conn->io); > - } > - > - /* assert: request != NULL */ > - > - if ((status = apr_pool_create(&tmppool, conn->pool)) != APR_SUCCESS) > - return status; > - > - /* Invoke response handlers until we have no more work. */ > - while (1) { > - serf_bucket_t *dummy1, *dummy2; > - > - apr_pool_clear(tmppool); > - > - /* Only interested in the input stream here. */ > - status = prepare_conn_streams(conn, &dummy1, &dummy2); > - if (status) { > - goto error; > - } > - > - /* We have a different codepath when we can have async responses. */ > - if (conn->async_responses) { > - /* TODO What about socket errors? */ > - status = handle_async_response(conn, tmppool); > - if (APR_STATUS_IS_EAGAIN(status)) { > - status = APR_SUCCESS; > - goto error; > - } > - if (status) { > - goto error; > - } > - continue; > - } > - > - /* We are reading a response for a request we haven't > - * written yet! > - * > - * This shouldn't normally happen EXCEPT: > - * > - * 1) when the other end has closed the socket and we're > - * pending an EOF return. > - * 2) Doing the initial SSL handshake - we'll get EAGAIN > - * as the SSL buckets will hide the handshake from us > - * but not return any data. > - * 3) When the server sends us an SSL alert. > - * > - * In these cases, we should not receive any actual user data. > - * > - * 4) When the server sends a error response, like 408 Request > timeout. > - * This response should be passed to the application. > - * > - * If we see an EOF (due to either an expired timeout or the server > - * sending the SSL 'close notify' shutdown alert), we'll reset the > - * connection and open a new one. > - */ > - if (request->req_bkt || request->writing == SERF_WRITING_NONE) { > - const char *data; > - apr_size_t len; > - > - status = serf_bucket_peek(conn->stream, &data, &len); > - > - if (APR_STATUS_IS_EOF(status)) { > - reset_connection(conn, 1); > - status = APR_SUCCESS; > - goto error; > - } > - else if (APR_STATUS_IS_EAGAIN(status) && !len) { > - status = APR_SUCCESS; > - goto error; > - } else if (status && !APR_STATUS_IS_EAGAIN(status)) { > - /* Read error */ > - goto error; > - } > - > - /* Unexpected response from the server */ > - if (conn->write_now) { > - conn->write_now = 0; > - status = conn->perform_write(conn); > - > - if (!SERF_BUCKET_READ_ERROR(status)) > - status = APR_SUCCESS; > - } > - } > - > - if (conn->framing_type != SERF_CONNECTION_FRAMING_TYPE_HTTP1) > - break; > - > - /* If the request doesn't have a response bucket, then call the > - * acceptor to get one created. > - */ > - if (request->resp_bkt == NULL) { > - if (! request->acceptor) { > - /* Request wasn't even setup. > - Server replying before it received anything? */ > - return SERF_ERROR_BAD_HTTP_RESPONSE; > - } > - > - request->resp_bkt = (*request->acceptor)(request, conn->stream, > - request->acceptor_baton, > - tmppool); > - apr_pool_clear(tmppool); > - > - /* Share the configuration with the response bucket(s) */ > - serf_bucket_set_config(request->resp_bkt, conn->config); > - } > - > - status = serf__handle_response(request, tmppool); > - > - /* If we received APR_SUCCESS, run this loop again. */ > - if (!status) { > - continue; > - } > - > - /* If our response handler says it can't do anything more, we now > - * treat that as a success. > - */ > - if (APR_STATUS_IS_EAGAIN(status)) { > - /* It is possible that while reading the response, the ssl layer > - has prepared some data to send. If this was the last request, > - serf will not check for socket writability, so force this > here. > - */ > - if (request_or_data_pending(&request, conn) && !request) { > - serf_io__set_pollset_dirty(&conn->io); > - } > - status = APR_SUCCESS; > - goto error; > - } > - > - close_connection = is_conn_closing(request->resp_bkt); > - > - if (!APR_STATUS_IS_EOF(status) && > - close_connection != SERF_ERROR_CLOSING) { > - /* Whether success, or an error, there is no more to do unless > - * this request has been completed. > - */ > - goto error; > - } > - > - /* The response has been fully-read, so that means the request has > - * either been fully-delivered (most likely), or that we don't need > to > - * write the rest of it anymore, e.g. when a 408 Request timeout was > - $ received. > - * Remove it from our queue and loop to read another response. > - */ > - if (request == conn->written_reqs) { > - conn->written_reqs = request->next; > - conn->nr_of_written_reqs--; > - } else { > - conn->unwritten_reqs = request->next; > - conn->nr_of_unwritten_reqs--; > - } > - > - serf__destroy_request(request); > - > - request = conn->written_reqs; > - if (!request) { > - /* Received responses for all written requests */ > - conn->written_reqs_tail = NULL; > - /* Request wasn't completely written yet! */ > - request = conn->unwritten_reqs; > - if (!request) > - conn->unwritten_reqs_tail = NULL; > - } > - > - conn->completed_responses++; > - > - /* We have received a response. If there are no more outstanding > - requests on this connection, we should stop polling for READ > events > - for now. */ > - if (!conn->written_reqs && !conn->unwritten_reqs) { > - serf_io__set_pollset_dirty(&conn->io); > - } > - > - /* This means that we're being advised that the connection is done. > */ > - if (close_connection == SERF_ERROR_CLOSING) { > - reset_connection(conn, 1); > - if (APR_STATUS_IS_EOF(status)) > - status = APR_SUCCESS; > - goto error; > - } > - > - /* The server is suddenly deciding to serve more responses than we've > - * seen before. > - * > - * Let our requests go. > - */ > - if (conn->probable_keepalive_limit && > - conn->completed_responses >= conn->probable_keepalive_limit) { > - conn->probable_keepalive_limit = 0; > - } > - > - /* If we just ran out of requests or have unwritten requests, then > - * update the pollset. We don't want to read from this socket any > - * more. We are definitely done with this loop, too. > - */ > - if (request == NULL || request->writing == SERF_WRITING_NONE) { > - serf_io__set_pollset_dirty(&conn->io); > - status = APR_SUCCESS; > - goto error; > - } > - } > - > -error: > - /* ### This code handles some specific errors as a retry. > - Eventually we should move to a handling where the application > - can tell us if this is really a good idea for specific requests */ > - > - if (status == SERF_ERROR_SSL_NEGOTIATE_IN_PROGRESS) { > - /* This connection uses HTTP pipelining and the server asked for a > - renegotiation (e.g. to access the requested resource a specific > - client certificate is required). > - > - Because of a known problem in OpenSSL this won't work most of the > - time, so as a workaround, when the server asks for a renegotiation > - on a connection using HTTP pipelining, we reset the connection, > - disable pipelining and reconnect to the server. */ > - serf__log(LOGLVL_WARNING, LOGCOMP_CONN, __FILE__, conn- > >config, > - "The server requested renegotiation. Disable HTTP " > - "pipelining and reset the connection.\n", conn); > - > - serf__connection_set_pipelining(conn, 0); > - reset_connection(conn, 1); > - status = APR_SUCCESS; > - } > - else if (status == SERF_ERROR_REQUEST_LOST > - || APR_STATUS_IS_ECONNRESET(status) > - || APR_STATUS_IS_ECONNABORTED(status)) { > - > - /* Some systems will not generate a HUP poll event for these errors > - so we handle the ECONNRESET issue and ECONNABORT here. */ > - > - /* If the connection was ever good, be optimistic & try again. > - If it has never tried again (incl. a retry), fail. */ > - if (conn->completed_responses) { > - reset_connection(conn, 1); > - status = APR_SUCCESS; > - } > - else if (status == SERF_ERROR_REQUEST_LOST) { > - status = SERF_ERROR_ABORTED_CONNECTION; > - } > - } > - > - apr_pool_destroy(tmppool); > - return status; > -} > - > -/* The connection got reset by the server. On Windows this can happen > - when all data is read, so just cleanup the connection and open a new one. > - > - If we haven't had any successful responses on this connection, > - then error out as it is likely a server issue. */ > -static apr_status_t hangup_connection(serf_connection_t *conn) > -{ > - if (conn->completed_responses) { > - return reset_connection(conn, 1); > - } > - > - return SERF_ERROR_ABORTED_CONNECTION; > -} > - > -/* process all events on the connection */ > -static apr_status_t process_connection(serf_connection_t *conn, > - apr_int16_t events) > -{ > - apr_status_t status; > -#ifdef SERF_DEBUG_BUCKET_USE > - serf_request_t *rq; > -#endif > - > -#ifdef SERF_DEBUG_BUCKET_USE > - serf_debug__entered_loop(conn->allocator); > - > - for (rq = conn->written_reqs; rq; rq = rq->next) { > - if (rq->allocator) > - serf_debug__entered_loop(rq->allocator); > - } > - > - for (rq = conn->done_reqs; rq; rq = rq->next) { > - if (rq->allocator) > - serf_debug__entered_loop(rq->allocator); > - } > -#endif > - > - /* POLLHUP/ERR should come after POLLIN so if there's an error message > or > - * the like sitting on the connection, we give the app a chance to read > - * it before we trigger a reset condition. > - */ > - if ((events & APR_POLLIN) != 0 > - && !conn->wait_for_connect) { > - > - if ((status = conn->perform_read(conn)) != APR_SUCCESS) > - return status; > - > - /* If we decided to reset our connection, return now as we don't > - * want to write. > - */ > - if ((conn->seen_in_pollset & APR_POLLHUP) != 0) { > - return APR_SUCCESS; > - } > - } > - if ((events & APR_POLLHUP) != 0) { > - /* The connection got reset by the server. */ > - return conn->perform_hangup(conn); > - } > - if ((events & APR_POLLERR) != 0) { > - /* We might be talking to a buggy HTTP server that doesn't > - * do lingering-close. (httpd < 2.1.8 does this.) > - * > - * See: > - * > - * http://issues.apache.org/bugzilla/show_bug.cgi?id=35292 > - */ > - if (conn->completed_requests && !conn->probable_keepalive_limit) { > - return reset_connection(conn, 1); > - } > -#ifdef SO_ERROR > - /* If possible, get the error from the platform's socket layer and > - convert it to an APR status code. */ > - { > - apr_os_sock_t osskt; > - if (!apr_os_sock_get(&osskt, conn->skt)) { > - int error; > - apr_socklen_t l = sizeof(error); > - > - if (!getsockopt(osskt, SOL_SOCKET, SO_ERROR, (char*)&error, > - &l)) { > - status = APR_FROM_OS_ERROR(error); > - > - /* Handle fallback for multi-homed servers. > - > - ### Improve algorithm to find better than just 'next'? > - > - Current Windows versions already handle re-ordering > for > - api users by using statistics on the recently failed > - connections to order the list of addresses. */ > - if (conn->completed_requests == 0 > - && conn->address->next != NULL > - && (APR_STATUS_IS_ECONNREFUSED(status) > - || APR_STATUS_IS_TIMEUP(status) > - || APR_STATUS_IS_ENETUNREACH(status))) { > - > - conn->address = conn->address->next; > - return reset_connection(conn, 1); > - } > - > - return status; > - } > - } > - } > -#endif > - return APR_EGENERAL; > - } > - if ((events & APR_POLLOUT) != 0) { > - if (conn->wait_for_connect) { > - conn->wait_for_connect = FALSE; > - > - /* We are now connected. Socket is now usable */ > - serf_io__set_pollset_dirty(&conn->io); > - > - if ((status = connect_connection(conn)) != APR_SUCCESS) > - return status; > - } > - > - if ((status = conn->perform_write(conn)) != APR_SUCCESS) > - return status; > - } > - return APR_SUCCESS; > -} > - > -apr_status_t serf__process_connection(serf_connection_t *conn, > - apr_int16_t events) > -{ > - serf_context_t *ctx = conn->ctx; > - apr_pollfd_t tdesc = { 0 }; > - > - /* If this connection has already failed, return the error again, and try > - * to remove it from the pollset again > - */ > - if (conn->status) { > - tdesc.desc_type = APR_POLL_SOCKET; > - tdesc.desc.s = conn->skt; > - tdesc.reqevents = conn->io.reqevents; > - ctx->pollset_rm(ctx->pollset_baton, > - &tdesc, &conn->io); > - return conn->status; > - } > - /* apr_pollset_poll() can return a conn multiple times... */ > - if ((conn->seen_in_pollset & events) != 0 || > - (conn->seen_in_pollset & APR_POLLHUP) != 0) { > - return APR_SUCCESS; > - } > - > - conn->seen_in_pollset |= events; > - > - if ((conn->status = process_connection(conn, events)) != APR_SUCCESS) > - { > - /* it's possible that the connection was already reset and thus the > - socket cleaned up. */ > - if (conn->skt) { > - tdesc.desc_type = APR_POLL_SOCKET; > - tdesc.desc.s = conn->skt; > - tdesc.reqevents = conn->io.reqevents; > - ctx->pollset_rm(ctx->pollset_baton, > - &tdesc, &conn->io); > - } > - return conn->status; > - } > - return APR_SUCCESS; > -} > - > -serf_connection_t *serf_connection_create( > - serf_context_t *ctx, > - apr_sockaddr_t *address, > - serf_connection_setup_t setup, > - void *setup_baton, > - serf_connection_closed_t closed, > - void *closed_baton, > - apr_pool_t *pool) > -{ > - serf_connection_t *conn = apr_pcalloc(pool, sizeof(*conn)); > - > - conn->ctx = ctx; > - conn->status = APR_SUCCESS; > - /* Ignore server address if proxy was specified. */ > - conn->address = ctx->proxy_address ? ctx->proxy_address : address; > - conn->setup = setup; > - conn->setup_baton = setup_baton; > - conn->closed = closed; > - conn->closed_baton = closed_baton; > - conn->pool = pool; > - conn->allocator = serf_bucket_allocator_create(pool, NULL, NULL); > - conn->stream = NULL; > - conn->ostream_head = NULL; > - conn->ostream_tail = NULL; > - conn->io.type = SERF_IO_CONN; > - conn->io.u.conn = conn; > - conn->io.ctx = ctx; > - conn->io.dirty_conn = false; > - conn->io.reqevents = 0; > - conn->hit_eof = 0; > - conn->state = SERF_CONN_INIT; > - conn->latency = -1; /* unknown */ > - conn->stop_writing = 0; > - conn->write_now = 0; > - conn->wait_for_connect = 0; > - conn->pipelining = 1; > - conn->framing_type = SERF_CONNECTION_FRAMING_TYPE_HTTP1; > - conn->perform_read = read_from_connection; > - conn->perform_write = write_to_connection; > - conn->perform_hangup = hangup_connection; > - conn->perform_teardown = NULL; > - conn->protocol_baton = NULL; > - > - conn->written_reqs = conn->written_reqs_tail = NULL; > - conn->nr_of_written_reqs = 0; > - > - conn->unwritten_reqs = conn->unwritten_reqs_tail = NULL; > - conn->nr_of_unwritten_reqs; > - > - conn->done_reqs = conn->done_reqs_tail = 0; > - > - /* Create a subpool for our connection. */ > - apr_pool_create(&conn->skt_pool, conn->pool); > - > - /* register a cleanup */ > - apr_pool_cleanup_register(conn->pool, conn, clean_conn, > - apr_pool_cleanup_null); > - > - /* Add the connection to the context. */ > - *(serf_connection_t **)apr_array_push(ctx->conns) = conn; > - > - return conn; > -} > - > -apr_status_t serf_connection_create2( > - serf_connection_t **conn, > - serf_context_t *ctx, > - apr_uri_t host_info, > - serf_connection_setup_t setup, > - void *setup_baton, > - serf_connection_closed_t closed, > - void *closed_baton, > - apr_pool_t *pool) > -{ > - apr_status_t status = APR_SUCCESS; > - serf_config_t *config; > - serf_connection_t *c; > - apr_sockaddr_t *host_address = NULL; > - > - /* Set the port number explicitly, needed to create the socket later. */ > - if (!host_info.port) { > - host_info.port = apr_uri_port_of_scheme(host_info.scheme); > - } > - > - /* Only lookup the address of the server if no proxy server was > - configured. */ > - if (!ctx->proxy_address) { > - status = apr_sockaddr_info_get(&host_address, > - host_info.hostname, > - APR_UNSPEC, host_info.port, 0, pool); > - if (status) > - return status; > - } > - > - c = serf_connection_create(ctx, host_address, setup, setup_baton, > - closed, closed_baton, pool); > - > - /* We're not interested in the path following the hostname. */ > - c->host_url = apr_uri_unparse(c->pool, > - &host_info, > - APR_URI_UNP_OMITPATHINFO | > - APR_URI_UNP_OMITUSERINFO); > - > - /* Store the host info without the path on the connection. */ > - (void)apr_uri_parse(c->pool, c->host_url, &(c->host_info)); > - if (!c->host_info.port) { > - c->host_info.port = apr_uri_port_of_scheme(c->host_info.scheme); > - } > - > - /* Store the connection specific info in the configuration store */ > - status = serf__config_store_get_config(ctx, c, &config, pool); > - if (status) > - return status; > - c->config = config; > - serf_config_set_stringc(config, SERF_CONFIG_HOST_NAME, > - c->host_info.hostname); > - serf_config_set_stringc(config, SERF_CONFIG_HOST_PORT, > - apr_itoa(ctx->pool, c->host_info.port)); > - > - *conn = c; > - > - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, c->config, > - "created connection 0x%x\n", c); > - > - return status; > -} > - > -apr_status_t serf_connection_reset( > - serf_connection_t *conn) > -{ > - return reset_connection(conn, 0); > -} > - > - > -apr_status_t serf_connection_close( > - serf_connection_t *conn) > -{ > - int i; > - serf_context_t *ctx = conn->ctx; > - apr_status_t status; > - > - for (i = ctx->conns->nelts; i--; ) { > - serf_connection_t *conn_seq = GET_CONN(ctx, i); > - > - if (conn_seq == conn) { > - > - /* Clean up the write bucket first, as this marks all partially > written > - requests as fully written, allowing more efficient cleanup */ > - serf__connection_pre_cleanup(conn); > - > - /* The application asked to close the connection, no need to > notify > - it for each cancelled request. */ > - while (conn->written_reqs) { > - serf__cancel_request(conn->written_reqs, &conn->written_reqs, > 0); > - } > - while (conn->unwritten_reqs) { > - serf__cancel_request(conn->unwritten_reqs, &conn- > >unwritten_reqs, 0); > - } > - if (conn->skt != NULL) { > - remove_connection(ctx, conn); > - status = clean_skt(conn); > - if (conn->closed != NULL) { > - handle_conn_closed(conn, status); > - } > - } > - if (conn->stream != NULL) { > - serf_bucket_destroy(conn->stream); > - conn->stream = NULL; > - } > - > - if (conn->protocol_baton) { > - conn->perform_teardown(conn); > - conn->protocol_baton = NULL; > - } > - > - /* Remove the connection from the context. We don't want to > - * deal with it any more. > - */ > - if (i < ctx->conns->nelts - 1) { > - /* move later connections over this one. */ > - memmove( > - &GET_CONN(ctx, i), > - &GET_CONN(ctx, i + 1), > - (ctx->conns->nelts - i - 1) * sizeof(serf_connection_t > *)); > - } > - --ctx->conns->nelts; > - > - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn- > >config, > - "closed connection 0x%x\n", conn); > - > - /* Found the connection. Closed it. All done. */ > - return APR_SUCCESS; > - } > - } > - > - /* We didn't find the specified connection. */ > - /* ### doc talks about this w.r.t poll structures. use something else? */ > - return APR_NOTFOUND; > -} > - > - > -void serf_connection_set_max_outstanding_requests( > - serf_connection_t *conn, > - unsigned int max_requests) > -{ > - if (max_requests == 0) > - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, > - "Set max. nr. of outstanding requests for this " > - "connection to unlimited.\n"); > - else > - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, > - "Limit max. nr. of outstanding requests for this " > - "connection to %u.\n", max_requests); > - > - conn->max_outstanding_requests = max_requests; > -} > - > -/* Disable HTTP pipelining, ensure that only one request is outstanding at a > - time. This is an internal method, an application that wants to disable > - HTTP pipelining can achieve this by calling: > - serf_connection_set_max_outstanding_requests(conn, 1) . > - */ > -void serf__connection_set_pipelining(serf_connection_t *conn, int > enabled) > -{ > - conn->pipelining = enabled; > -} > - > -void serf_connection_set_async_responses( > - serf_connection_t *conn, > - serf_response_acceptor_t acceptor, > - void *acceptor_baton, > - serf_response_handler_t handler, > - void *handler_baton) > -{ > - conn->async_responses = 1; > - conn->async_acceptor = acceptor; > - conn->async_acceptor_baton = acceptor_baton; > - conn->async_handler = handler; > - conn->async_handler_baton = handler_baton; > -} > - > -void serf_connection_set_framing_type( > - serf_connection_t *conn, > - serf_connection_framing_type_t framing_type) > -{ > - conn->framing_type = framing_type; > - > - if (conn->skt) { > - serf_io__set_pollset_dirty(&conn->io); > - conn->stop_writing = 0; > - conn->write_now = 1; > - > - /* Close down existing protocol */ > - if (conn->protocol_baton) { > - conn->perform_teardown(conn); > - conn->protocol_baton = NULL; > - } > - } > - > - /* Reset to default */ > - conn->perform_read = read_from_connection; > - conn->perform_write = write_to_connection; > - conn->perform_hangup = hangup_connection; > - conn->perform_teardown = NULL; > - > - switch (framing_type) { > - case SERF_CONNECTION_FRAMING_TYPE_HTTP2: > - serf__http2_protocol_init(conn); > - break; > - default: > - break; > - } > -} > - > -apr_interval_time_t serf_connection_get_latency(serf_connection_t > *conn) > -{ > - if (conn->ctx->proxy_address) { > - /* Detecting network latency for proxied connection is not > implemented > - yet. */ > - return -1; > - } > - > - return conn->latency; > -} > - > -unsigned int serf_connection_queued_requests(serf_connection_t *conn) > -{ > - return conn->nr_of_unwritten_reqs; > -} > - > -unsigned int serf_connection_pending_requests(serf_connection_t *conn) > -{ > - return conn->nr_of_unwritten_reqs + conn->nr_of_written_reqs; > -} > > Modified: serf/trunk/serf_private.h > URL: > http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1715005&r1=1 > 715004&r2=1715005&view=diff > ========================================================== > ==================== > --- serf/trunk/serf_private.h (original) > +++ serf/trunk/serf_private.h Wed Nov 18 14:45:58 2015 > @@ -147,6 +147,33 @@ typedef struct serf_io_baton_t { > > } serf_io_baton_t; > > +typedef struct serf_pump_io_t > +{ > + serf_io_baton_t *io; > + > + serf_bucket_alloc_t *allocator; > + serf_config_t *config; > + > + serf_bucket_t *stream; > + serf_bucket_t *ostream_head; > + serf_bucket_t *ostream_tail; > + > + apr_socket_t *skt; > + > + /* Outgoing vecs, waiting to be written. > + Read from ostream_head */ > + struct iovec vec[IOV_MAX]; > + int vec_len; > + > + /* True when connection failed while writing */ > + bool done_writing; > + bool stop_writing; /* Wait for read (E.g. SSL) */ > + > + /* Set to true when ostream_tail was read to EOF */ > + bool hit_eof; > +} serf_pump_t; > + > + > /* Should we use static APR_INLINE instead? */ > #define serf_io__set_pollset_dirty(io_baton) \ > do \ > @@ -381,6 +408,7 @@ struct serf_incoming_t { > serf_context_t *ctx; > > serf_io_baton_t io; > + serf_pump_t pump; > serf_incoming_request_setup_t req_setup; > void *req_setup_baton; > > @@ -403,8 +431,6 @@ struct serf_incoming_t { > serf_connection_framing_type_t framing_type; > > bool wait_for_connect; > - bool hit_eof; > - bool stop_writing; > > /* Event callbacks, called from serf__process_client() to do the actual > processing. */ > @@ -416,13 +442,6 @@ struct serf_incoming_t { > void(*perform_teardown)(serf_incoming_t *conn); > void *protocol_baton; > > - /* A bucket wrapped around our socket (for reading responses). */ > - serf_bucket_t *stream; > - /* A reference to the aggregate bucket that provides the boundary > between > - * request level buckets and connection level buckets. > - */ > - serf_bucket_t *ostream_head; > - serf_bucket_t *ostream_tail; > > serf_config_t *config; > > @@ -742,6 +761,25 @@ void serf__link_requests(serf_request_t > apr_status_t serf__handle_response(serf_request_t *request, > apr_pool_t *pool); > > +/* From pump.c */ > +void serf_pump__init(serf_pump_t *pump, > + serf_io_baton_t *io, > + apr_socket_t *skt, > + serf_config_t *config, > + serf_bucket_alloc_t *allocator, > + apr_pool_t *pool); > + > +bool serf_pump__data_pending(serf_pump_t *pump); > +void serf_pump__store_ipaddresses_in_config(serf_pump_t *pump); > + > +apr_status_t serf_pump__write(serf_pump_t *pump, > + bool fetch_new); > + > +/* These must always be called as a pair to avoid a memory leak */ > +void serf_pump__prepare_setup(serf_pump_t *pump); > +void serf_pump__complete_setup(serf_pump_t *pump, serf_bucket_t > *ostream); > + > + > /** Logging functions. **/ > > /* Initialize the logging subsystem. This will store a log baton in the >