Author: rhuijben Date: Mon Nov 23 16:41:12 2015 New Revision: 1715871 URL: http://svn.apache.org/viewvc?rev=1715871&view=rev Log: Backport recent pump fixes to the pump-investigate branch to allow testing them on OS/X.
* buckets/ssl_buckets.c * incoming.c * pump.c * serf_private.h * test/serf_httpd.c * test/test_server.c Merge some recent work. Modified: serf/branches/pump-investigate/buckets/ssl_buckets.c serf/branches/pump-investigate/incoming.c serf/branches/pump-investigate/pump.c serf/branches/pump-investigate/serf_private.h serf/branches/pump-investigate/test/test_server.c Modified: serf/branches/pump-investigate/buckets/ssl_buckets.c URL: http://svn.apache.org/viewvc/serf/branches/pump-investigate/buckets/ssl_buckets.c?rev=1715871&r1=1715870&r2=1715871&view=diff ============================================================================== --- serf/branches/pump-investigate/buckets/ssl_buckets.c (original) +++ serf/branches/pump-investigate/buckets/ssl_buckets.c Mon Nov 23 16:41:12 2015 @@ -177,7 +177,7 @@ struct serf_ssl_context_t { const char *selected_protocol; /* Cached protocol value once available */ /* Protocol callback */ - serf_ssl_protocol_result_cb_t protocol_callback; + serf_ssl_protocol_result_cb_t protocol_callback; void *protocol_userdata; serf_config_t *config; @@ -255,7 +255,7 @@ apps_ssl_info_callback(const SSL *s, int if (ret > 0) { /* ret > 0: Just a state change; not an error */ serf__log(level, LOGCOMP_SSL, __FILE__, ctx->config, - "%s: %s\n", + "%s: %s (%d)\n", str, SSL_state_string_long(s), ctx->crypt_status); } @@ -282,7 +282,7 @@ apps_ssl_info_callback(const SSL *s, int #endif -/* Listens for the SSL renegotiate ciphers alert and report it back to the +/* Listens for the SSL renegotiate ciphers alert and report it back to the serf context. */ static void detect_renegotiate(const SSL *s, int where, int ret) @@ -647,7 +647,7 @@ get_subject_alt_names(apr_array_header_t } sk_GENERAL_NAME_pop_free(names, GENERAL_NAME_free); } - + return APR_SUCCESS; } @@ -698,7 +698,7 @@ validate_server_certificate(int cert_val int err = X509_STORE_CTX_get_error(store_ctx); switch(err) { - case X509_V_ERR_CERT_NOT_YET_VALID: + case X509_V_ERR_CERT_NOT_YET_VALID: failures |= SERF_SSL_CERT_NOTYETVALID; break; case X509_V_ERR_CERT_HAS_EXPIRED: @@ -840,7 +840,7 @@ validate_server_certificate(int cert_val { ctx->pending_err = SERF_ERROR_SSL_CERT_FAILED; } - + return cert_valid; } @@ -1005,7 +1005,7 @@ static apr_status_t ssl_decrypt(void *ba serf__log(LOGLVL_DEBUG, LOGCOMP_SSLMSG, __FILE__, ctx->config, "---\n%.*s\n-(%d)-\n", *len, buf, *len); } - + if (!ctx->handshake_finished && !SERF_BUCKET_READ_ERROR(status)) { @@ -1166,7 +1166,7 @@ static apr_status_t ssl_encrypt(void *ba serf__log(LOGLVL_DEBUG, LOGCOMP_SSL, __FILE__, ctx->config, "---\n%.*s\n-(%d)-\n", interim_len, vecs_data, interim_len); - + } } } @@ -1352,10 +1352,10 @@ static void init_ssl_libraries(void) thread has completed */ while (val != INIT_DONE) { apr_sleep(APR_USEC_PER_SEC / 1000); - + val = apr_atomic_cas32(&have_init_ssl, INIT_UNINITIALIZED, - INIT_UNINITIALIZED); + INIT_UNINITIALIZED); } } } @@ -2196,7 +2196,7 @@ const char *serf_ssl_cert_export( encoded_cert = apr_palloc(pool, apr_base64_encode_len(len)); apr_base64_encode(encoded_cert, binary_cert, len); - + return encoded_cert; } Modified: serf/branches/pump-investigate/incoming.c URL: http://svn.apache.org/viewvc/serf/branches/pump-investigate/incoming.c?rev=1715871&r1=1715870&r2=1715871&view=diff ============================================================================== --- serf/branches/pump-investigate/incoming.c (original) +++ serf/branches/pump-investigate/incoming.c Mon Nov 23 16:41:12 2015 @@ -32,12 +32,13 @@ static apr_status_t client_connected(ser { /* serf_context_t *ctx = client->ctx; */ apr_status_t status; + serf_bucket_t *stream; serf_bucket_t *ostream; serf_pump__store_ipaddresses_in_config(&client->pump); serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config, - "socket for client 0x%x connected\n", client); + "socket for client 0x%p connected\n", client); /* ### Connection does auth setup here */ @@ -46,17 +47,17 @@ static apr_status_t client_connected(ser ostream = client->pump.ostream_tail; status = client->setup(client->skt, - &client->pump.stream, + &stream, &ostream, client->setup_baton, client->pool); if (status) { - serf_pump__complete_setup(&client->pump, NULL); + serf_pump__complete_setup(&client->pump, NULL, NULL); /* ### Cleanup! (serf__connection_pre_cleanup) */ return status; } - serf_pump__complete_setup(&client->pump, ostream); + serf_pump__complete_setup(&client->pump, stream, ostream); if (client->framing_type == SERF_CONNECTION_FRAMING_TYPE_NONE) { client->proto_peek_bkt = serf_bucket_aggregate_create( Modified: serf/branches/pump-investigate/pump.c URL: http://svn.apache.org/viewvc/serf/branches/pump-investigate/pump.c?rev=1715871&r1=1715870&r2=1715871&view=diff ============================================================================== --- serf/branches/pump-investigate/pump.c (original) +++ serf/branches/pump-investigate/pump.c Mon Nov 23 16:41:12 2015 @@ -29,7 +29,7 @@ #include "serf_private.h" -apr_status_t pump_cleanup(void *baton) +static apr_status_t pump_cleanup(void *baton) { serf_pump_t *pump = baton; @@ -42,6 +42,11 @@ apr_status_t pump_cleanup(void *baton) pump->ostream_tail = NULL; } + pump->pool = NULL; /* Don't run again */ + pump->allocator = NULL; + pump->skt = NULL; + pump->vec_len = 0; + return APR_SUCCESS; } @@ -58,11 +63,34 @@ void serf_pump__init(serf_pump_t *pump, pump->allocator = allocator; pump->config = config; pump->skt = skt; + pump->pool = pool; apr_pool_cleanup_register(pool, pump, pump_cleanup, apr_pool_cleanup_null); } +void serf_pump__done(serf_pump_t *pump) +{ + if (pump->pool) { + apr_pool_cleanup_run(pump->pool, pump, pump_cleanup); + } + + pump->io = NULL; + pump->allocator = NULL; + pump->config = NULL; + + /* pump->stream is managed by the current reader! */ + + pump->ostream_head = NULL; + pump->ostream_tail = NULL; + + pump->done_writing = false; + pump->stop_writing = false; + pump->hit_eof = false; + + pump->pool = NULL; +} + /* Safely check if there is still data pending on the connection, carefull to not accidentally make it invalid. */ bool serf_pump__data_pending(serf_pump_t *pump) @@ -78,9 +106,6 @@ bool serf_pump__data_pending(serf_pump_t status = serf_bucket_peek(pump->ostream_head, &data, &len); if (!SERF_BUCKET_READ_ERROR(status)) { if (len > 0) { - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, pump->config, - "Extra data to be written after sending complete " - "requests.\n"); return true; } } @@ -118,8 +143,10 @@ void serf_pump__prepare_setup(serf_pump_ } void serf_pump__complete_setup(serf_pump_t *pump, + serf_bucket_t *stream, serf_bucket_t *ostream) { + pump->stream = stream; if (ostream) serf_bucket_aggregate_append(pump->ostream_head, ostream); else @@ -133,7 +160,10 @@ void serf_pump__complete_setup(serf_pump /* Share the configuration with the ssl_decrypt and socket buckets. The response buckets wrapping the ssl_decrypt/socket buckets won't get the config automatically because they are upstream. */ - serf_bucket_set_config(pump->stream, pump->config); + if (stream != NULL) { + pump->stream = stream; + serf_bucket_set_config(pump->stream, pump->config); + } /* We typically have one of two scenarios, based on whether the application decided to encrypt this connection: @@ -162,13 +192,13 @@ void serf_pump__store_ipaddresses_in_con char buf[48]; if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa)) serf_config_set_stringf(pump->config, SERF_CONFIG_CONN_LOCALIP, - "%s:%d", buf, sa->port); + "%s:%d", buf, (int)sa->port); } if (apr_socket_addr_get(&sa, APR_REMOTE, pump->skt) == APR_SUCCESS) { char buf[48]; if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa)) serf_config_set_stringf(pump->config, SERF_CONFIG_CONN_REMOTEIP, - "%s:%d", buf, sa->port); + "%s:%d", buf, (int)sa->port); } } @@ -177,7 +207,7 @@ static apr_status_t no_more_writes(serf_ /* Note that we should hold new requests until we open our new socket. */ pump->done_writing = true; serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, pump->config, - "stop writing on 0x%x\n", pump->io->u.conn); + "stop writing on 0x%p\n", pump->io->u.v); /* Clear our iovec. */ pump->vec_len = 0; @@ -199,7 +229,7 @@ static apr_status_t socket_writev(serf_p pump->vec_len, &written); if (status && !APR_STATUS_IS_EAGAIN(status)) serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, pump->config, - "socket_sendv error %d\n", status); + "socket_sendv error %d on 0x%p\n", status, pump->io->u.v); /* did we write everything? */ if (written) { @@ -207,26 +237,31 @@ static apr_status_t socket_writev(serf_p int i; serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, - "--- socket_sendv: %d bytes. --\n", written); + "--- socket_sendv: %d bytes on 0x%p. --\n", + (int)written, pump->io->u.v); for (i = 0; i < conn->vec_len; i++) { len += conn->vec[i].iov_len; if (written < len) { serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config, - "%.*s", conn->vec[i].iov_len - (len - written), - conn->vec[i].iov_base); + "%.*s", + (int)(conn->vec[i].iov_len - (len - written)), + (const char *)conn->vec[i].iov_base); if (i) { memmove(conn->vec, &conn->vec[i], sizeof(struct iovec) * (conn->vec_len - i)); conn->vec_len -= i; } - conn->vec[0].iov_base = (char *)conn->vec[0].iov_base + (conn->vec[0].iov_len - (len - written)); + conn->vec[0].iov_base = (char *)conn->vec[0].iov_base + + conn->vec[0].iov_len + - (len - written); conn->vec[0].iov_len = len - written; break; } else { serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config, "%.*s", - conn->vec[i].iov_len, conn->vec[i].iov_base); + (int)conn->vec[i].iov_len, + (const char*)conn->vec[i].iov_base); } } if (len == written) { @@ -246,16 +281,15 @@ apr_status_t serf_pump__write(serf_pump_ { apr_status_t status = APR_SUCCESS; apr_status_t read_status = APR_SUCCESS; - serf_pump_t *const conn = pump; - conn->hit_eof = FALSE; + pump->hit_eof = FALSE; while (status == APR_SUCCESS) { /* First try to write out what is already stored in the connection vecs. */ - while (conn->vec_len && !status) { - status = socket_writev(conn); + while (pump->vec_len && !status) { + status = socket_writev(pump); /* If the write would have blocked, then we're done. * Don't try to write anything else to the socket. @@ -263,12 +297,22 @@ apr_status_t serf_pump__write(serf_pump_ if (APR_STATUS_IS_EPIPE(status) || APR_STATUS_IS_ECONNRESET(status) || APR_STATUS_IS_ECONNABORTED(status)) - return no_more_writes(conn); + return no_more_writes(pump); } - if (status || !pump) + if (status || !fetch_new) { + + /* If we couldn't write everything that we tried, + make sure that we will receive a write event next time */ + if (APR_STATUS_IS_EAGAIN(status) + && !pump->io->dirty_conn + && !(pump->io->reqevents & APR_POLLOUT)) + { + serf_io__set_pollset_dirty(pump->io); + } return status; - else if (read_status || conn->vec_len || conn->hit_eof) + } + else if (read_status || pump->vec_len || pump->hit_eof) return read_status; /* ### optimize at some point by using read_for_sendfile */ @@ -276,12 +320,12 @@ apr_status_t serf_pump__write(serf_pump_ data as available, we probably don't want to read ALL_AVAIL, but a lower number, like the size of one or a few TCP packets, the available TCP buffer size ... */ - conn->hit_eof = 0; + pump->hit_eof = false; read_status = serf_bucket_read_iovec(pump->ostream_head, SERF_READ_ALL_AVAIL, IOV_MAX, - conn->vec, - &conn->vec_len); + pump->vec, + &pump->vec_len); if (read_status == SERF_ERROR_WAIT_CONN) { /* The bucket told us that it can't provide more data until @@ -293,15 +337,20 @@ apr_status_t serf_pump__write(serf_pump_ we can actually write something. otherwise, we could end up in a CPU spin: socket wants something, but we don't have anything (and keep returning EAGAIN) */ - conn->stop_writing = true; - serf_io__set_pollset_dirty(conn->io); + + serf__log(LOGLVL_INFO, LOGCOMP_CONN, __FILE__, pump->config, + "Output stream requested temporary write delay " + "on 0x%p\n", pump->io->u.v); + + pump->stop_writing = true; + serf_io__set_pollset_dirty(pump->io); read_status = APR_EAGAIN; } else if (APR_STATUS_IS_EAGAIN(read_status)) { /* We read some stuff, but did we read everything ? */ - if (conn->hit_eof) + if (pump->hit_eof) read_status = APR_SUCCESS; } else if (SERF_BUCKET_READ_ERROR(read_status)) { @@ -314,3 +363,37 @@ apr_status_t serf_pump__write(serf_pump_ return status; } +apr_status_t serf_pump__add_output(serf_pump_t *pump, + serf_bucket_t *bucket, + bool flush) +{ + apr_status_t status; + + if (!flush + && !pump->io->dirty_conn + && !pump->stop_writing + && !(pump->io->reqevents & APR_POLLOUT) + && !serf_pump__data_pending(pump)) + { + /* If not writing now, + * and not already dirty + * and nothing pending yet + Then mark the pollset dirty to trigger a write */ + + serf_io__set_pollset_dirty(pump->io); + } + + serf_bucket_aggregate_append(pump->ostream_tail, bucket); + + if (!flush) + return APR_SUCCESS; + + /* Flush final output buffer (after ssl, etc.) */ + status = serf_pump__write(pump, TRUE); + + if (SERF_BUCKET_READ_ERROR(status)) + return status; + else + return APR_SUCCESS; +} + Modified: serf/branches/pump-investigate/serf_private.h URL: http://svn.apache.org/viewvc/serf/branches/pump-investigate/serf_private.h?rev=1715871&r1=1715870&r2=1715871&view=diff ============================================================================== --- serf/branches/pump-investigate/serf_private.h (original) +++ serf/branches/pump-investigate/serf_private.h Mon Nov 23 16:41:12 2015 @@ -136,6 +136,7 @@ typedef struct serf_io_baton_t { serf_incoming_t *client; serf_connection_t *conn; serf_listener_t *listener; + const void *const v; } u; /* are we a dirty connection that needs its poll status updated? */ @@ -147,14 +148,18 @@ typedef struct serf_io_baton_t { } serf_io_baton_t; -typedef struct serf_pump_io_t +typedef struct serf_pump_t { serf_io_baton_t *io; serf_bucket_alloc_t *allocator; serf_config_t *config; + /* The incoming stream. Stored here for easy access by users, + but not managed as part of the pump */ serf_bucket_t *stream; + + /* The outgoing stream */ serf_bucket_t *ostream_head; serf_bucket_t *ostream_tail; @@ -171,6 +176,8 @@ typedef struct serf_pump_io_t /* Set to true when ostream_tail was read to EOF */ bool hit_eof; + + apr_pool_t *pool; } serf_pump_t; @@ -769,15 +776,23 @@ void serf_pump__init(serf_pump_t *pump, serf_bucket_alloc_t *allocator, apr_pool_t *pool); +void serf_pump__done(serf_pump_t *pump); + bool serf_pump__data_pending(serf_pump_t *pump); void serf_pump__store_ipaddresses_in_config(serf_pump_t *pump); apr_status_t serf_pump__write(serf_pump_t *pump, bool fetch_new); +apr_status_t serf_pump__add_output(serf_pump_t *pump, + serf_bucket_t *bucket, + bool flush); + /* These must always be called as a pair to avoid a memory leak */ void serf_pump__prepare_setup(serf_pump_t *pump); -void serf_pump__complete_setup(serf_pump_t *pump, serf_bucket_t *ostream); +void serf_pump__complete_setup(serf_pump_t *pump, + serf_bucket_t *stream, + serf_bucket_t *ostream); /** Logging functions. **/ Modified: serf/branches/pump-investigate/test/test_server.c URL: http://svn.apache.org/viewvc/serf/branches/pump-investigate/test/test_server.c?rev=1715871&r1=1715870&r2=1715871&view=diff ============================================================================== --- serf/branches/pump-investigate/test/test_server.c (original) +++ serf/branches/pump-investigate/test/test_server.c Mon Nov 23 16:41:12 2015 @@ -247,8 +247,8 @@ CuSuite *test_server(void) CuSuiteSetSetupTeardownCallbacks(suite, test_setup, test_teardown); - SUITE_ADD_TEST(suite, test_listen_http); - SUITE_ADD_TEST(suite, test_listen_http2); + /*SUITE_ADD_TEST(suite, test_listen_http); + SUITE_ADD_TEST(suite, test_listen_http2);*/ return suite; }