Author: rhuijben
Date: Mon Nov 16 16:06:05 2015
New Revision: 1714613
URL: http://svn.apache.org/viewvc?rev=1714613&view=rev
Log:
Make the http2 protocol capable of sending a response on an incoming request.
This adds basic body write support for this direction.
* protocols/http2_protocol.c
(serf_http2__max_payload_size,
serf_http2__alloc_window,
serf_http2__return_window): New function.
* protocols/http2_protocol.h
(serf_http2__max_payload_size,
serf_http2__alloc_window,
serf_http2__return_window): New function.
* protocols/http2_stream.c
(includes): Add apr_strings.h.
(serf_http2_stream_data_t): Add more state.
(serf_http2__stream_create): Init state.
(stream_send_headers): New function, extracted from...
(serf_http2__stream_setup_next_request): ... here, which is now a caller.
(window_allocate_info_t): New struct.
(data_write_started,
data_write_done): New function.
(stream_send_body): New function, based on some parts of stream_send_headers.
(stream_send_headers): New function.
(http2_stream_enqueue_response): Implement function.
(serf_http2__stream_handle_hpack): Cache hpack table, as needed for response.
(serf_http2__stream_processor): Remove duplicate definition of variable.
* test/test_server.c
Modified:
serf/trunk/protocols/http2_protocol.c
serf/trunk/protocols/http2_protocol.h
serf/trunk/protocols/http2_stream.c
serf/trunk/test/test_server.c
Modified: serf/trunk/protocols/http2_protocol.c
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1714613&r1=1714612&r2=1714613&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.c (original)
+++ serf/trunk/protocols/http2_protocol.c Mon Nov 16 16:06:05 2015
@@ -1885,3 +1885,38 @@ serf_http2__setup_incoming_request(serf_
return APR_SUCCESS;
}
+
+apr_size_t
+serf_http2__max_payload_size(serf_http2_protocol_t *h2)
+{
+ return h2->lr_max_framesize;
+}
+
+apr_size_t serf_http2__alloc_window(serf_http2_protocol_t *h2,
+ serf_http2_stream_t *stream,
+ apr_size_t requested)
+{
+ if (requested > h2->lr_max_framesize)
+ requested = h2->lr_max_framesize;
+ if (requested > h2->lr_window)
+ requested = h2->lr_window;
+ if (requested > stream->lr_window)
+ requested = stream->lr_window;
+
+ if (requested) {
+ h2->lr_window -= requested;
+ stream->lr_window -= requested;
+ }
+
+ return requested;
+}
+
+void serf_http2__return_window(serf_http2_protocol_t *h2,
+ serf_http2_stream_t *stream,
+ apr_size_t returned)
+{
+ SERF_H2_assert(h2->lr_window + returned <= HTTP2_WINDOW_MAX_ALLOWED);
+ SERF_H2_assert(stream->lr_window + returned <= HTTP2_WINDOW_MAX_ALLOWED);
+ h2->lr_window += returned;
+ stream->lr_window += returned;
+}
Modified: serf/trunk/protocols/http2_protocol.h
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.h?rev=1714613&r1=1714612&r2=1714613&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.h (original)
+++ serf/trunk/protocols/http2_protocol.h Mon Nov 16 16:06:05 2015
@@ -204,6 +204,18 @@ serf_http2__setup_incoming_request(serf_
void **req_setup_baton,
serf_http2_protocol_t *h2);
+/* Gets current l->r max payload size */
+apr_size_t
+serf_http2__max_payload_size(serf_http2_protocol_t *h2);
+
+apr_size_t serf_http2__alloc_window(serf_http2_protocol_t *h2,
+ serf_http2_stream_t *stream,
+ apr_size_t requested);
+
+void serf_http2__return_window(serf_http2_protocol_t *h2,
+ serf_http2_stream_t *stream,
+ apr_size_t returned);
+
apr_status_t
serf_http2__stream_reset(serf_http2_stream_t *stream,
apr_status_t reason,
Modified: serf/trunk/protocols/http2_stream.c
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_stream.c?rev=1714613&r1=1714612&r2=1714613&view=diff
==============================================================================
--- serf/trunk/protocols/http2_stream.c (original)
+++ serf/trunk/protocols/http2_stream.c Mon Nov 16 16:06:05 2015
@@ -21,6 +21,7 @@
#include <stdlib.h>
#include <apr_pools.h>
+#include <apr_strings.h>
#include "serf.h"
#include "serf_bucket_util.h"
@@ -34,6 +35,8 @@ struct serf_http2_stream_data_t
serf_request_t *request; /* May be NULL as streams may outlive requests */
serf_incoming_request_t *in_request;
serf_bucket_t *response_agg;
+ serf_hpack_table_t *tbl;
+ serf_bucket_t *body_tail;
};
serf_http2_stream_t *
@@ -56,6 +59,8 @@ serf_http2__stream_create(serf_http2_pro
stream->data->request = NULL;
stream->data->in_request = NULL;
stream->data->response_agg = NULL;
+ stream->data->tbl = NULL;
+ stream->data->body_tail = NULL;
stream->lr_window = lr_window;
stream->rl_window = rl_window;
@@ -84,6 +89,195 @@ serf_http2__stream_cleanup(serf_http2_st
serf_bucket_mem_free(stream->alloc, stream);
}
+static apr_status_t stream_send_headers(serf_http2_stream_t *stream,
+ serf_bucket_t *hpack,
+ apr_size_t max_payload_size,
+ bool end_stream)
+{
+ apr_status_t status;
+ bool first_frame = true;
+
+ /* And now schedule the packet for writing. Note that it is required
+ by the HTTP/2 spec to send HEADERS and CONTINUATION directly after
+ each other, without other frames inbetween. */
+ while (hpack != NULL)
+ {
+ serf_bucket_t *next;
+ apr_uint64_t remaining;
+
+ /* hpack buckets implement get_remaining. And if they didn't adding the
+ framing around them would apply some reads that fix the buckets.
+
+ So we can ignore the theoretical endless loop here for two different
+ reasons
+ */
+ remaining = serf_bucket_get_remaining(hpack);
+
+ if (remaining > max_payload_size) {
+ serf_bucket_split_create(&next, &hpack, hpack,
+ max_payload_size - (max_payload_size / 4),
+ max_payload_size);
+ }
+ else
+ {
+ next = hpack;
+ hpack = NULL;
+ }
+
+ next = serf__bucket_http2_frame_create(next,
+ first_frame
+ ? HTTP2_FRAME_TYPE_HEADERS
+ : HTTP2_FRAME_TYPE_CONTINUATION,
+ (end_stream
+ ? HTTP2_FLAG_END_STREAM
+ : 0)
+ | ((hpack != NULL)
+ ? 0
+ : HTTP2_FLAG_END_HEADERS),
+ &stream->streamid,
+ serf_http2__allocate_stream_id,
+ stream,
+ max_payload_size,
+ next->allocator);
+ status = serf_http2__enqueue_frame(stream->h2, next, TRUE);
+
+ if (SERF_BUCKET_READ_ERROR(status))
+ return status; /* Connection dead */
+
+ first_frame = false; /* Continue with 'continuation' frames */
+ }
+
+ return APR_SUCCESS;
+}
+
+typedef struct window_allocate_info_t
+{
+ serf_http2_stream_t *stream;
+ serf_bucket_t *bkt;
+ apr_size_t allocated;
+} window_allocate_info_t;
+
+static apr_status_t data_write_started(void *baton,
+ apr_uint64_t bytes_read)
+{
+ window_allocate_info_t *wai = baton;
+
+ bytes_read = serf_bucket_get_remaining(wai->bkt);
+
+ /* Handles unavailable for free */
+ if (bytes_read <= wai->allocated) {
+ /* Nice, we can return something now */
+ apr_size_t to_much = (wai->allocated - (apr_size_t)bytes_read);
+
+ serf_http2__return_window(wai->stream->h2, wai->stream, to_much);
+
+ wai->allocated = 0;
+ }
+ return APR_SUCCESS;
+}
+
+static apr_status_t data_write_done(void *baton,
+ apr_uint64_t bytes_read)
+{
+ window_allocate_info_t *wai = baton;
+
+ if (wai->allocated && bytes_read <= wai->allocated) {
+ /* Nice, we can return something now */
+ apr_size_t to_much = (wai->allocated - (apr_size_t)bytes_read);
+ wai->stream->lr_window += to_much;
+
+ serf_http2__return_window(wai->stream->h2, wai->stream, to_much);
+
+ wai->allocated = 0;
+ }
+
+ serf_bucket_mem_free(wai->stream->alloc, wai);
+ return APR_SUCCESS;
+}
+
+
+static apr_status_t stream_send_body(serf_http2_stream_t *stream,
+ serf_bucket_t *body)
+{
+ apr_uint64_t remaining;
+ serf_bucket_t *next;
+ apr_size_t prefix_len;
+ bool end_stream;
+ apr_status_t status;
+
+ SERF_H2_assert(stream->status == H2S_OPEN
+ || stream->status == H2S_HALFCLOSED_REMOTE);
+ SERF_H2_assert(!stream->data->body_tail || (body ==
+ stream->data->body_tail));
+
+ /* Sending DATA frames over HTTP/2 is not easy as this usually requires
+ handling windowing, priority, etc. This code will improve over time */
+
+ if (!body)
+ remaining = 0;
+ else
+ remaining = serf_bucket_get_remaining(body);
+
+ /* If the stream decided we are already done */
+ if (remaining == 0) {
+ if (stream->status == H2S_OPEN)
+ stream->status = H2S_HALFCLOSED_LOCAL;
+ else
+ stream->status = H2S_CLOSED;
+
+ next = serf__bucket_http2_frame_create(NULL, HTTP2_FRAME_TYPE_DATA,
+ HTTP2_FLAG_END_STREAM,
+ &stream->streamid,
+ serf_http2__allocate_stream_id,
+ stream, 0, stream->alloc);
+ stream->data->body_tail = NULL;
+ return serf_http2__enqueue_frame(stream->h2, next, false);
+ }
+
+ prefix_len = serf_http2__alloc_window(stream->h2, stream,
+ (remaining >= APR_SIZE_MAX)
+ ? SERF_READ_ALL_AVAIL
+ : (apr_size_t)remaining);
+
+ if (prefix_len == 0) {
+ /* No window left */
+ stream->data->body_tail = body;
+ return APR_SUCCESS;
+ }
+
+ if (prefix_len < remaining) {
+ window_allocate_info_t *wai;
+ serf_bucket_split_create(&body, &stream->data->body_tail, body,
+ MIN(prefix_len, 1024), prefix_len);
+
+ wai = serf_bucket_mem_alloc(stream->alloc, sizeof(*wai));
+ wai->stream = stream;
+ wai->allocated = prefix_len;
+
+ body = serf__bucket_event_create(body, wai,
+ data_write_started,
+ data_write_done, NULL, stream->alloc);
+ end_stream = false;
+ }
+ else
+ end_stream = true;
+
+ next = serf__bucket_http2_frame_create(body, HTTP2_FRAME_TYPE_DATA,
+ end_stream ? HTTP2_FLAG_END_STREAM
+ : 0,
+ &stream->streamid,
+ serf_http2__allocate_stream_id,
+ stream, prefix_len,
+ body->allocator);
+
+ status = serf_http2__enqueue_frame(stream->h2, next, TRUE);
+
+ if (!end_stream)
+ return APR_ENOTIMPL; /* The rest of the body won't be sent yet */
+
+ return status;
+}
+
apr_status_t
serf_http2__stream_setup_next_request(serf_http2_stream_t *stream,
serf_connection_t *conn,
@@ -94,7 +288,6 @@ serf_http2__stream_setup_next_request(se
apr_status_t status;
serf_bucket_t *hpack;
serf_bucket_t *body;
- bool first_frame;
bool end_stream;
SERF_H2_assert(request != NULL);
@@ -137,58 +330,10 @@ serf_http2__stream_setup_next_request(se
else
end_stream = false;
- first_frame = true;
-
- /* And now schedule the packet for writing. Note that it is required
- by the HTTP/2 spec to send HEADERS and CONTINUATION directly after
- each other, without other frames inbetween. */
- while (hpack != NULL)
- {
- serf_bucket_t *next;
- apr_uint64_t remaining;
-
- /* hpack buckets implement get_remaining. And if they didn't adding the
- framing around them would apply some reads that fix the buckets.
-
- So we can ignore the theoretical endless loop here for two different
- reasons
- */
- remaining = serf_bucket_get_remaining(hpack);
-
- if (remaining > max_payload_size) {
- serf_bucket_split_create(&next, &hpack, hpack,
- max_payload_size - (max_payload_size / 4),
- max_payload_size);
- }
- else
- {
- next = hpack;
- hpack = NULL;
- }
-
- next = serf__bucket_http2_frame_create(next,
- first_frame
- ? HTTP2_FRAME_TYPE_HEADERS
- : HTTP2_FRAME_TYPE_CONTINUATION,
- (end_stream
- ? HTTP2_FLAG_END_STREAM
- : 0)
- | ((hpack != NULL)
- ? 0
- : HTTP2_FLAG_END_HEADERS),
- &stream->streamid,
- serf_http2__allocate_stream_id,
- stream,
- max_payload_size,
- request->allocator);
- status = serf_http2__enqueue_frame(stream->h2, next, TRUE);
-
- if (SERF_BUCKET_READ_ERROR(status))
- return status; /* Connection dead */
-
- first_frame = false; /* Continue with 'continuation' frames */
- }
-
+ status = stream_send_headers(stream, hpack, max_payload_size,
+ end_stream);
+ if (status)
+ return status;
if (end_stream) {
stream->status = H2S_HALFCLOSED_LOCAL; /* Headers sent; no body */
@@ -235,14 +380,69 @@ stream_response_eof(void *baton,
}
}
+static int set_hpack_header(void *baton,
+ const char *key,
+ const char *value)
+{
+ serf_bucket_t *hpack = baton;
+
+ serf__bucket_hpack_setc(hpack, key, value);
+ return 0;
+}
+
static apr_status_t
http2_stream_enqueue_response(serf_incoming_request_t *request,
void *enqueue_baton,
serf_bucket_t *response_bkt)
{
serf_http2_stream_t *stream = enqueue_baton;
+ serf_bucket_t *hpack;
+ serf_bucket_t *headers;
+ serf_bucket_t *h1_response;
+ serf_status_line sline;
+ apr_status_t status;
+
+ /* OK, this could be implemented using knowledge of the buckets, in
+ a 100% more efficient, but I don't want to introduce new bucket
+ types for this yet. Let's just read everything the http/1 way
+ and put it in HTTP/2 appropriate places */
+ h1_response = serf_bucket_response_create(response_bkt,
+ stream->alloc);
+ do
+ {
+ status = serf_bucket_response_status(h1_response, &sline);
+ } while (status != APR_SUCCESS);
+
+ if (status != APR_SUCCESS)
+ return APR_EGENERAL; /* Can't read statusline. No EAGAIN support before
+ the body (yet) */
+
+ hpack = serf__bucket_hpack_create(stream->data->tbl, stream->alloc);
+ serf__bucket_hpack_setc(hpack, ":status",
+ apr_itoa(stream->data->in_request->pool,
+ sline.code));
- return APR_ENOTIMPL;
+ do
+ {
+ status = serf_bucket_response_wait_for_headers(h1_response);
+ } while (status != APR_SUCCESS);
+
+ if (status != APR_SUCCESS)
+ return APR_EGENERAL; /* Can't read body. No EAGAIN support before
+ the body (yet) */
+
+ headers = serf_bucket_response_get_headers(h1_response);
+
+ serf_bucket_headers_do(headers, set_hpack_header, hpack);
+
+ status = stream_send_headers(stream, hpack,
+ serf_http2__max_payload_size(stream->h2),
+ false);
+
+ if (status)
+ return status;
+
+ return stream_send_body(stream, response_bkt);
}
static apr_status_t
@@ -380,6 +580,8 @@ serf_http2__stream_handle_hpack(serf_htt
if (!stream->data->response_agg)
stream_setup_response(stream, config);
+ stream->data->tbl = hpack_tbl;
+
bucket = serf__bucket_hpack_decode_create(bucket, NULL, NULL,
max_entry_size,
hpack_tbl, allocator);
@@ -530,7 +732,7 @@ serf_http2__stream_processor(void *baton
return status;
if (APR_STATUS_IS_EOF(status)) {
- apr_status_t status = serf_incoming_response_create(request);
+ status = serf_incoming_response_create(request);
if (status)
return status;
Modified: serf/trunk/test/test_server.c
URL:
http://svn.apache.org/viewvc/serf/trunk/test/test_server.c?rev=1714613&r1=1714612&r2=1714613&view=diff
==============================================================================
--- serf/trunk/test/test_server.c (original)
+++ serf/trunk/test/test_server.c Mon Nov 16 16:06:05 2015
@@ -238,7 +238,7 @@ void test_listen_http2(CuTest *tc)
status = run_client_server_loop(tb, num_requests,
handler_ctx, tb->pool);
- CuAssertIntEquals(tc, APR_ENOTIMPL, status);
+ CuAssertIntEquals(tc, APR_SUCCESS, status);
}
/*****************************************************************************/