Author: rhuijben Date: Mon Nov 16 17:12:05 2015 New Revision: 1714631 URL: http://svn.apache.org/viewvc?rev=1714631&view=rev Log: Add initial generic support for sending http2 request and response bodies.
In theory this should make our http/2 support fully functional, although I expect that we will need quite some tweaking to make this production ready. This patch implements a simple linked list of streams that still have writing to do and walks over them until it finds one that can be written given the window rules. This code may need a lot of improvements to fully support the http/2 priority queueing, but the current implementation matches the expectations of a limited server/client within the specs. (Improvement is optional). * protocols/http2_protocol.c (serf_http2_protocol_t): Add linked list of writable requests. (http2_write_data): New function. (http2_outgoing_write, http2_incoming_write): Call http2_write_data when buffered data was flushed. (serf_http2__ensure_writable): New function. * protocols/http2_protocol.h (serf_http2_stream_t): Add fields. * protocols/http2_stream.c (serf_http2_stream_data_t): Rename body to data. (serf_http2__stream_create): Initialize fields. (stream_send_body): Rename to... (stream_send_data): ... this. (serf_http2__stream_write_data): New function. (destroy_request_bucket): New function. (serf_http2__stream_setup_next_request): Schedule body writing. (http2_stream_enqueue_response): Update caller. * test/test_server.c (test_listen_http2): Send bodies, like the http1 test. Modified: serf/trunk/protocols/http2_protocol.c serf/trunk/protocols/http2_protocol.h serf/trunk/protocols/http2_stream.c serf/trunk/test/test_server.c Modified: serf/trunk/protocols/http2_protocol.c URL: http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1714631&r1=1714630&r2=1714631&view=diff ============================================================================== --- serf/trunk/protocols/http2_protocol.c (original) +++ serf/trunk/protocols/http2_protocol.c Mon Nov 16 17:12:05 2015 @@ -160,6 +160,8 @@ struct serf_http2_protocol_t serf_bucket_t *continuation_bucket; apr_int32_t continuation_streamid; + + serf_http2_stream_t *first_writable, *last_writable, *cur_writable; }; /* Forward definition */ @@ -1595,6 +1597,47 @@ http2_process(serf_http2_protocol_t *h2) } /* while(TRUE) */ } +static apr_status_t http2_write_data(serf_http2_protocol_t *h2) +{ + serf_http2_stream_t *stream = h2->cur_writable; + + while (TRUE) + { + apr_status_t status; + + if (!stream) + stream = h2->first_writable; + + if (!stream) + return APR_SUCCESS; + + if (stream->status != H2S_OPEN + && stream->status != H2S_HALFCLOSED_REMOTE) + { + /* This stream is NOT writable. Remove it */ + if (stream->prev_writable) + stream->prev_writable->next_writable = stream->next_writable; + else + h2->first_writable = stream->next_writable; + + if (stream->next_writable) + stream->next_writable->prev_writable = stream->prev_writable; + else + h2->last_writable = stream->prev_writable; + + stream = stream->next_writable; + continue; + } + + status = serf_http2__stream_write_data(stream); + if (status || stream->lr_window == 0) + h2->cur_writable = stream->next_writable; + + return status ? status : APR_EAGAIN; + } + +} + static apr_status_t http2_outgoing_read(serf_connection_t *conn) { @@ -1649,6 +1692,10 @@ http2_outgoing_write(serf_connection_t * else if (status) return status; + status = http2_write_data(h2); + if (status) + return status; + /* Probably nothing to write. Connection will check new requests */ conn->dirty_conn = 1; conn->ctx->dirty_pollset = 1; @@ -1754,7 +1801,7 @@ http2_incoming_read(serf_incoming_t *cli static apr_status_t http2_incoming_write(serf_incoming_t *client) { - /* serf_http2_protocol_t *h2 = client->protocol_baton; */ + serf_http2_protocol_t *h2 = client->protocol_baton; apr_status_t status; status = serf__incoming_client_flush(client, TRUE); @@ -1764,6 +1811,10 @@ http2_incoming_write(serf_incoming_t *cl else if (status) return status; + status = http2_write_data(h2); + if (status) + return status; + /* Probably nothing to write. Connection will check new requests */ client->dirty_conn = true; client->ctx->dirty_pollset = true; @@ -1952,3 +2003,20 @@ void serf_http2__return_window(serf_http h2->lr_window += returned; stream->lr_window += returned; } + +void serf_http2__ensure_writable(serf_http2_stream_t *stream) +{ + serf_http2_protocol_t *h2 = stream->h2; + SERF_H2_assert(stream->status == H2S_OPEN + || stream->status == H2S_HALFCLOSED_REMOTE); + + if (stream->next_writable || stream->prev_writable) + return; + + stream->prev_writable = h2->last_writable; + h2->last_writable = stream; + if (h2->first_writable) + h2->last_writable->next_writable = stream; + else + h2->first_writable = stream; +} Modified: serf/trunk/protocols/http2_protocol.h URL: http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.h?rev=1714631&r1=1714630&r2=1714631&view=diff ============================================================================== --- serf/trunk/protocols/http2_protocol.h (original) +++ serf/trunk/protocols/http2_protocol.h Mon Nov 16 17:12:05 2015 @@ -146,6 +146,7 @@ typedef struct serf_http2_stream_t struct serf_http2_stream_t *new_reserved_stream; /* TODO: Priority, etc. */ + struct serf_http2_stream_t *next_writable, *prev_writable; } serf_http2_stream_t; typedef apr_status_t (* serf_http2_processor_t)(void *baton, @@ -216,6 +217,8 @@ void serf_http2__return_window(serf_http serf_http2_stream_t *stream, apr_size_t returned); +void serf_http2__ensure_writable(serf_http2_stream_t *stream); + apr_status_t serf_http2__stream_reset(serf_http2_stream_t *stream, apr_status_t reason, @@ -244,6 +247,9 @@ serf_http2__stream_processor(void *baton serf_http2_protocol_t *h2, serf_bucket_t *bucket); +apr_status_t +serf_http2__stream_write_data(serf_http2_stream_t *stream); + #ifdef __cplusplus } #endif Modified: serf/trunk/protocols/http2_stream.c URL: http://svn.apache.org/viewvc/serf/trunk/protocols/http2_stream.c?rev=1714631&r1=1714630&r2=1714631&view=diff ============================================================================== --- serf/trunk/protocols/http2_stream.c (original) +++ serf/trunk/protocols/http2_stream.c Mon Nov 16 17:12:05 2015 @@ -36,7 +36,7 @@ struct serf_http2_stream_data_t serf_incoming_request_t *in_request; serf_bucket_t *response_agg; serf_hpack_table_t *tbl; - serf_bucket_t *body_tail; + serf_bucket_t *data_tail; }; serf_http2_stream_t * @@ -60,7 +60,7 @@ serf_http2__stream_create(serf_http2_pro stream->data->in_request = NULL; stream->data->response_agg = NULL; stream->data->tbl = NULL; - stream->data->body_tail = NULL; + stream->data->data_tail = NULL; stream->lr_window = lr_window; stream->rl_window = rl_window; @@ -73,6 +73,8 @@ serf_http2__stream_create(serf_http2_pro stream->status = (streamid >= 0) ? H2S_IDLE : H2S_INIT; stream->new_reserved_stream = NULL; + stream->prev_writable = stream->next_writable = NULL; + return stream; } @@ -196,8 +198,8 @@ static apr_status_t data_write_done(void } -static apr_status_t stream_send_body(serf_http2_stream_t *stream, - serf_bucket_t *body) +static apr_status_t stream_send_data(serf_http2_stream_t *stream, + serf_bucket_t *data) { apr_uint64_t remaining; serf_bucket_t *next; @@ -207,16 +209,16 @@ static apr_status_t stream_send_body(ser SERF_H2_assert(stream->status == H2S_OPEN || stream->status == H2S_HALFCLOSED_REMOTE); - SERF_H2_assert(!stream->data->body_tail || (body == - stream->data->body_tail)); + SERF_H2_assert(!stream->data->data_tail || (data == + stream->data->data_tail)); /* Sending DATA frames over HTTP/2 is not easy as this usually requires handling windowing, priority, etc. This code will improve over time */ - if (!body) + if (!data) remaining = 0; else - remaining = serf_bucket_get_remaining(body); + remaining = serf_bucket_get_remaining(data); /* If the stream decided we are already done */ if (remaining == 0) { @@ -225,12 +227,14 @@ static apr_status_t stream_send_body(ser else stream->status = H2S_CLOSED; + serf_bucket_destroy(data); + next = serf__bucket_http2_frame_create(NULL, HTTP2_FRAME_TYPE_DATA, HTTP2_FLAG_END_STREAM, &stream->streamid, serf_http2__allocate_stream_id, stream, 0, stream->alloc); - stream->data->body_tail = NULL; + stream->data->data_tail = NULL; return serf_http2__enqueue_frame(stream->h2, next, false); } @@ -241,20 +245,20 @@ static apr_status_t stream_send_body(ser if (prefix_len == 0) { /* No window left */ - stream->data->body_tail = body; + stream->data->data_tail = data; return APR_SUCCESS; } if (prefix_len < remaining) { window_allocate_info_t *wai; - serf_bucket_split_create(&body, &stream->data->body_tail, body, + serf_bucket_split_create(&data, &stream->data->data_tail, data, MIN(prefix_len, 1024), prefix_len); wai = serf_bucket_mem_alloc(stream->alloc, sizeof(*wai)); wai->stream = stream; wai->allocated = prefix_len; - body = serf__bucket_event_create(body, wai, + data = serf__bucket_event_create(data, wai, data_write_started, data_write_done, NULL, stream->alloc); end_stream = false; @@ -262,23 +266,46 @@ static apr_status_t stream_send_body(ser else end_stream = true; - next = serf__bucket_http2_frame_create(body, HTTP2_FRAME_TYPE_DATA, + next = serf__bucket_http2_frame_create(data, HTTP2_FRAME_TYPE_DATA, end_stream ? HTTP2_FLAG_END_STREAM : 0, &stream->streamid, serf_http2__allocate_stream_id, stream, prefix_len, - body->allocator); + data->allocator); status = serf_http2__enqueue_frame(stream->h2, next, TRUE); - if (!end_stream) - return APR_ENOTIMPL; /* The rest of the body won't be sent yet */ + if (!end_stream) { + /* Write more later */ + serf_http2__ensure_writable(stream); + } return status; } apr_status_t +serf_http2__stream_write_data(serf_http2_stream_t *stream) +{ + SERF_H2_assert(stream->status == H2S_OPEN + || stream->status == H2S_HALFCLOSED_REMOTE); + SERF_H2_assert(stream->data->data_tail != NULL); + + return stream_send_data(stream, stream->data->data_tail); +} + +apr_status_t destroy_request_bucket(void *baton, + apr_uint64_t bytes_read) +{ + serf_request_t *request = baton; + + serf_bucket_destroy(request->req_bkt); + request->req_bkt = NULL; + request->writing = SERF_WRITING_FINISHED; + return APR_SUCCESS; +} + +apr_status_t serf_http2__stream_setup_next_request(serf_http2_stream_t *stream, serf_connection_t *conn, apr_size_t max_payload_size, @@ -337,13 +364,20 @@ serf_http2__stream_setup_next_request(se if (end_stream) { stream->status = H2S_HALFCLOSED_LOCAL; /* Headers sent; no body */ - } - else { - stream->status = H2S_OPEN; /* Headers sent. Body to go */ - /* ### TODO: Schedule body to be sent */ + return APR_SUCCESS; } - return APR_SUCCESS; + /* Yuck... we are not allowed to destroy body */ + body = serf_bucket_barrier_create(body, request->allocator); + + /* Setup an event bucket to destroy the actual request bucket when + the body is done */ + body = serf__bucket_event_create(body, request, + NULL, NULL, destroy_request_bucket, + request->allocator); + + stream->status = H2S_OPEN; /* Headers sent. Body to go */ + return stream_send_data(stream, body); } apr_status_t @@ -442,7 +476,7 @@ http2_stream_enqueue_response(serf_incom if (status) return status; - return stream_send_body(stream, response_bkt); + return stream_send_data(stream, response_bkt); } static apr_status_t Modified: serf/trunk/test/test_server.c URL: http://svn.apache.org/viewvc/serf/trunk/test/test_server.c?rev=1714631&r1=1714630&r2=1714631&view=diff ============================================================================== --- serf/trunk/test/test_server.c (original) +++ serf/trunk/test/test_server.c Mon Nov 16 17:12:05 2015 @@ -232,9 +232,8 @@ void test_listen_http2(CuTest *tc) tb->pool); CuAssertIntEquals(tc, APR_SUCCESS, status); - /* Our http2 implementation doesn't support request bodies yet */ - create_new_request(tb, &handler_ctx[0], "GET", "/", -1); - create_new_request(tb, &handler_ctx[1], "GET", "/", -1); + create_new_request(tb, &handler_ctx[0], "GET", "/", 1); + create_new_request(tb, &handler_ctx[1], "GET", "/", 2); status = run_client_server_loop(tb, num_requests, handler_ctx, tb->pool);