Author: rhuijben Date: Sat Oct 31 19:53:57 2015 New Revision: 1711679 URL: http://svn.apache.org/viewvc?rev=1711679&view=rev Log: Checkpoint a rewrite of the prototype http2 protocol handler to a fully event driven protocol handler.
This patch starts delegating reading of the headers and response to the stream handler, which was already responsible for writing out the request. * protocols/http2_protocol.c (http2_process): New prototype. (serf_http2_protocol_t): Store settings. Update state for event driven handling. (serf__http2_protocol_init): Update init. (setup_for_http2): Update caller. (serf_http2__enqueue_frame): Destroy bucket on error. (http2_handle_priority, http2_handle_promise, http2_handle_frame_reset, http2_handle_stream_window_update, http2_handle_connection_window_update, http2_handle_ping, http2_handle_ping_ack, http2_handle_settings, http2_handle_goaway): New functions handling specific frame data. (http2_handle_continuation, http2_end_of_frame, http2_bucket_processor): New callbacks for specific bucket callbacks. (http2_read): Rename to... (http2_process): ... this, and implements the logic in a completely different way. (http2_protocol_read): Update caller. (serf_http2__allocate_stream_id): Tweak for variable name updates. (serf_http2__stream_get): Resolve endless loop. Tweak varnames. (serf_http2__enqueue_stream_reset): New stub function. * protocols/http2_protocol.h (SERF_H2_assert): New define. (HTTP2_DEFAULT_MAX_CONCURRENT, HTTP2_PRIORITY_DATA_SIZE, HTTP2_RST_DATA_SIZE, HTTP2_PROMISE_DATA_SIZE, HTTP2_PING_DATA_SIZE, HTTP2_GOWAWAY_DATA_SIZE, HTTP2_WINDOW_UPDATE_DATA_SIZE, HTTP2_SETTING_SIZE): New defines for magic values. (serf_http2_stream_data_t): New typedef. (serf_http2_stream_t): Hide some data inside serf_http2_stream_data_t. (serf_http2_processor_t): New typedef. (serf_http2__enqueue_stream_reset): New function. (serf_http2__stream_get_by_id): Rename to... (serf_http2__stream_get): ... this to match implementation. (serf_http2__stream_reset, serf_http2__stream_handle_hpack, serf_http2__stream_handle_data, serf_http2__stream_processor): New functions. * protocols/http2_stream.c (serf_http2_stream_data_t): New struct. (serf_http2__stream_create): Init data struct. (serf_http2__stream_cleanup): Cleanup data struct. (serf_http2__stream_setup_request): Update caller. (serf_http2__stream_reset, stream_response_eof, serf_http2__stream_handle_hpack, serf_http2__stream_handle_data, serf_http2__stream_processor): New function. Modified: serf/trunk/protocols/http2_protocol.c serf/trunk/protocols/http2_protocol.h serf/trunk/protocols/http2_stream.c Modified: serf/trunk/protocols/http2_protocol.c URL: http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1711679&r1=1711678&r2=1711679&view=diff ============================================================================== --- serf/trunk/protocols/http2_protocol.c (original) +++ serf/trunk/protocols/http2_protocol.c Sat Oct 31 19:53:57 2015 @@ -43,6 +43,9 @@ http2_protocol_hangup(serf_connection_t static void http2_protocol_teardown(serf_connection_t *conn); +static apr_status_t +http2_process(serf_http2_protocol_t *h2); + static serf_bucket_t * serf_bucket_create_numberv(serf_bucket_alloc_t *allocator, const char *format, ...) { @@ -132,26 +135,42 @@ struct serf_http2_protocol_t apr_pool_t *pool; serf_connection_t *conn; serf_bucket_t *ostream; + serf_bucket_alloc_t *allocator; - serf_hpack_table_t *hpack_tbl; + serf_http2_processor_t processor; + void *processor_baton; + serf_bucket_t *read_frame; /* Frame currently being read */ + int in_frame; - apr_uint32_t default_lr_window; - apr_uint32_t default_rl_window; + serf_hpack_table_t *hpack_tbl; + serf_config_t *config; - apr_int64_t lr_window; /* local->remote */ - apr_int64_t rl_window; /* remote->local */ - apr_int32_t next_local_streamid; - apr_int32_t next_remote_streamid; + /* Local -> Remote. Settings provided by other side */ + apr_uint32_t lr_default_window; + apr_uint32_t lr_window; + apr_uint32_t lr_max_framesize; + apr_uint32_t lr_max_headersize; + apr_uint32_t lr_max_concurrent; + apr_int32_t lr_next_streamid; + char lr_push_enabled; + + /* Remote -> Local. Settings set by us. Acknowledged by other side */ + apr_uint32_t rl_default_window; + apr_uint32_t rl_window; + apr_uint32_t rl_max_framesize; + apr_uint32_t rl_max_headersize; + apr_uint32_t rl_max_concurrent; + apr_int32_t rl_next_streamid; + char rl_push_enabled; serf_http2_stream_t *first; serf_http2_stream_t *last; - char buffer[HTTP2_DEFAULT_MAX_FRAMESIZE]; - apr_size_t buffer_used; - serf_bucket_t *cur_frame; - serf_bucket_t *cur_payload; - int in_payload; + int setting_acks; + int enforce_flow_control; + serf_bucket_t *continuation_bucket; + apr_int32_t continuation_streamid; }; static apr_status_t @@ -179,6 +198,7 @@ void serf__http2_protocol_init(serf_conn serf_http2_protocol_t *ctx; apr_pool_t *protocol_pool; serf_bucket_t *tmp; + const int WE_ARE_CLIENT = 1; apr_pool_create(&protocol_pool, conn->pool); @@ -186,15 +206,30 @@ void serf__http2_protocol_init(serf_conn ctx->pool = protocol_pool; ctx->conn = conn; ctx->ostream = conn->ostream_tail; + ctx->allocator = conn->allocator; + ctx->config = conn->config; /* Defaults until negotiated */ - ctx->default_lr_window = HTTP2_DEFAULT_WINDOW_SIZE; - ctx->default_rl_window = HTTP2_DEFAULT_WINDOW_SIZE; - - ctx->lr_window = ctx->default_lr_window; - ctx->rl_window = ctx->default_rl_window; - ctx->next_local_streamid = 1; /* 2 if we would be the server */ - ctx->next_remote_streamid = 2; /* 1 if we would be the client */ + ctx->rl_default_window = HTTP2_DEFAULT_WINDOW_SIZE; + ctx->rl_window = HTTP2_DEFAULT_WINDOW_SIZE; + ctx->rl_next_streamid = WE_ARE_CLIENT ? 2 : 1; + ctx->rl_max_framesize = HTTP2_DEFAULT_MAX_FRAMESIZE; + ctx->rl_max_headersize = APR_UINT32_MAX; + ctx->rl_max_concurrent = HTTP2_DEFAULT_MAX_CONCURRENT; + ctx->rl_push_enabled = TRUE; + + ctx->lr_default_window = HTTP2_DEFAULT_WINDOW_SIZE; + ctx->lr_window = HTTP2_DEFAULT_WINDOW_SIZE; + ctx->lr_next_streamid = WE_ARE_CLIENT ? 1 : 2; + ctx->lr_max_framesize = HTTP2_DEFAULT_MAX_FRAMESIZE; + ctx->lr_max_headersize = APR_UINT32_MAX; + ctx->lr_max_concurrent = HTTP2_DEFAULT_MAX_CONCURRENT; + ctx->lr_push_enabled = TRUE; + + ctx->setting_acks = 0; + ctx->enforce_flow_control = TRUE; + ctx->continuation_bucket = NULL; + ctx->continuation_streamid = 0; ctx->first = ctx->last = NULL; @@ -253,9 +288,9 @@ setup_for_http2(serf_http2_protocol_t *h serf_http2_stream_t *stream; stream = serf_http2__stream_create(h2, -1, - h2->default_lr_window, - h2->default_rl_window, - h2->conn->allocator); + h2->lr_default_window, + h2->rl_default_window, + h2->allocator); if (h2->first) { @@ -287,7 +322,10 @@ serf_http2__enqueue_frame(serf_http2_pro status = serf_bucket_peek(h2->ostream, &data, &len); if (SERF_BUCKET_READ_ERROR(status)) - return status; + { + serf_bucket_destroy(frame); + return status; + } if (len == 0) { @@ -322,178 +360,871 @@ serf_http2__enqueue_frame(serf_http2_pro return APR_SUCCESS; } +/* Implements serf_bucket_prefix_handler_t. + Handles PRIORITY frames and the priority prefix of HEADERS frames */ +static apr_status_t +http2_handle_priority(void *baton, + serf_bucket_t *bucket, + const char *data, + apr_size_t len) +{ + serf_http2_stream_t *stream = baton; + + if (len != HTTP2_PRIORITY_DATA_SIZE) + return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR; + + if (stream == NULL) + return APR_SUCCESS; /* Nothing to record this on */ + /* ### TODO: Store priority information on stream */ + SERF_H2_assert(stream->h2 != NULL); + + return APR_SUCCESS; +} + +/* Implements serf_bucket_prefix_handler_t. + Handles the promise prefix of PUSH_PROMISE frames */ static apr_status_t -http2_read(serf_connection_t *conn) +http2_handle_promise(void *baton, + serf_bucket_t *bucket, + const char *data, + apr_size_t len) { - serf_http2_protocol_t *ctx = conn->protocol_baton; - apr_status_t status = APR_SUCCESS; + serf_http2_stream_t *stream = baton; - while (TRUE) - { - status = APR_SUCCESS; + if (len != HTTP2_PROMISE_DATA_SIZE) + return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR; - if (ctx->cur_frame) - { - const char *data; - apr_size_t len; + /* ### TODO: Prepare reading promise on stream */ + SERF_H2_assert(stream->h2 != NULL); - if (! ctx->in_payload) - { - unsigned char flags; - unsigned char frametype; - apr_int32_t streamid; - apr_uint64_t size; - - status = serf__bucket_http2_unframe_read_info(ctx->cur_frame, - &streamid, &frametype, - &flags); + return APR_SUCCESS; +} - if (status && !APR_STATUS_IS_EOF(status)) - break; +/* Implements serf_bucket_prefix_handler_t. + Handles the promise prefix of FRAME_RSET frames */ +static apr_status_t +http2_handle_frame_reset(void *baton, + serf_bucket_t *bucket, + const char *data, + apr_size_t len) +{ + serf_http2_stream_t *stream = baton; - size = serf_bucket_get_remaining(ctx->cur_frame); + if (len != HTTP2_RST_DATA_SIZE) + return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR; - serf__log(LOGLVL_INFO, SERF_LOGHTTP2, conn->config, - "Start 0x%02x http2 frame on stream 0x%x, flags=0x%x, size=0x%x\n", - (int)frametype, (int)streamid, (int)flags, (int)size); - - ctx->in_payload = TRUE; - - if (flags & HTTP2_FLAG_PADDED) - { - ctx->cur_payload = - serf__bucket_http2_unpad_create( - ctx->cur_frame, TRUE, - ctx->cur_frame->allocator); - } - else - ctx->cur_payload = ctx->cur_frame; + SERF_H2_assert(stream->h2 != NULL); - if (frametype == HTTP2_FRAME_TYPE_HEADERS) - { - ctx->cur_payload = serf__bucket_hpack_decode_create( - ctx->cur_payload, - NULL, NULL, - 16384, ctx->hpack_tbl, - ctx->cur_frame->allocator); - } - } + /* ### TODO: Handle error code, etc. */ + stream->status = H2S_CLOSED; - status = serf_bucket_read(ctx->cur_payload, - sizeof(ctx->buffer) - ctx->buffer_used, - &data, &len); + return APR_SUCCESS; +} - if (SERF_BUCKET_READ_ERROR(status)) - break; +/* Implements serf_bucket_prefix_handler_t. + Handles WINDOW_UPDATE frames when they apply to a stream */ +static apr_status_t +http2_handle_stream_window_update(void *baton, + serf_bucket_t *bucket, + const char *data, + apr_size_t len) +{ + serf_http2_stream_t *stream = baton; - if (len) - { - memcpy(&ctx->buffer[ctx->buffer_used], data, len); - ctx->buffer_used += len; - } + if (len != HTTP2_WINDOW_UPDATE_DATA_SIZE) + return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR; - if (APR_STATUS_IS_EOF(status)) - { - apr_int32_t streamid; - unsigned char frametype; - unsigned char flags; - - serf__bucket_http2_unframe_read_info(ctx->cur_frame, - &streamid, &frametype, - &flags); - serf__log(LOGLVL_INFO, SERF_LOGHTTP2, conn->config, - "Done 0x%02x http2 frame on stream 0x%x, flags=0x%x, size=0x%x\n", - (int)frametype, (int)streamid, (int)flags, (int)ctx->buffer_used); - - if (frametype == HTTP2_FRAME_TYPE_DATA - || frametype == HTTP2_FRAME_TYPE_HEADERS) - { - /* Ugly hack to dump body. Memory LEAK! */ - serf__log(LOGLVL_INFO, SERF_LOGHTTP2, conn->config, - "%s\n", apr_pstrmemdup(conn->pool, ctx->buffer, ctx->buffer_used)); - } - - if (frametype == HTTP2_FRAME_TYPE_GOAWAY && conn) - serf__log(LOGLVL_WARNING, SERF_LOGHTTP2, conn->config, - "Go away reason %d: %s\n", ctx->buffer[7], - apr_pstrmemdup(conn->pool, - &ctx->buffer[8], - (ctx->buffer_used >= 8) - ? ctx->buffer_used-8 : 0)); - - if (frametype == HTTP2_FRAME_TYPE_RST_STREAM && conn) - serf__log(LOGLVL_WARNING, SERF_LOGHTTP2, conn->config, - "Reset reason %d: %s\n", ctx->buffer[7], - apr_pstrmemdup(conn->pool, - &ctx->buffer[8], - (ctx->buffer_used >= 8) - ? ctx->buffer_used - 8 : 0)); - - if (frametype == HTTP2_FRAME_TYPE_SETTINGS - && !(flags & HTTP2_FLAG_ACK)) - { - /* Always ack settings */ - serf_http2__enqueue_frame( - ctx, + SERF_H2_assert(stream->h2 != NULL); + + return APR_SUCCESS; +} + +/* Implements serf_bucket_prefix_handler_t. + Handles WINDOW_UPDATE frames when they apply to the connection */ +static apr_status_t +http2_handle_connection_window_update(void *baton, + serf_bucket_t *bucket, + const char *data, + apr_size_t len) +{ + serf_http2_protocol_t *h2 = baton; + + if (len != HTTP2_WINDOW_UPDATE_DATA_SIZE) + return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR; + + SERF_H2_assert(h2 != NULL); + + return APR_SUCCESS; +} + +/* Implements serf_bucket_prefix_handler_t. + Handles PING frames for pings initiated remotely */ +static apr_status_t +http2_handle_ping(void *baton, + serf_bucket_t *bucket, + const char *data, + apr_size_t len) +{ + serf_http2_protocol_t *h2 = baton; + serf_bucket_t *body; + apr_status_t status; + + if (len != HTTP2_PING_DATA_SIZE) + return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR; + + SERF_H2_assert(h2 != NULL); + + /* Reply with a PONG (=PING + ACK) with the same data*/ + + body = serf_bucket_simple_copy_create(data, len, + h2->allocator); + + status = serf_http2__enqueue_frame( + h2, + serf__bucket_http2_frame_create(body, + HTTP2_FRAME_TYPE_PING, + HTTP2_FLAG_ACK, + NULL, NULL, NULL, + h2->lr_max_framesize, + NULL, NULL, + h2->allocator), + TRUE /* pump */); + + if (SERF_BUCKET_READ_ERROR(status)) + return status; + + return APR_SUCCESS; +} + +/* Implements serf_bucket_prefix_handler_t. + Handles PING frames for pings initiated locally */ +static apr_status_t +http2_handle_ping_ack(void *baton, + serf_bucket_t *bucket, + const char *data, + apr_size_t len) +{ + serf_http2_protocol_t *h2 = baton; + if (len != HTTP2_PING_DATA_SIZE) + return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR; + + SERF_H2_assert(h2 != NULL); + + /* Did we send a ping? */ + + return APR_SUCCESS; +} + +/* Implements serf_bucket_prefix_handler_t. + Handles SETTINGS frames */ +static apr_status_t +http2_handle_settings(void *baton, + serf_bucket_t *bucket, + const char *data, + apr_size_t len) +{ + serf_http2_protocol_t *h2 = baton; + apr_size_t i; + const struct setting_t + { + unsigned char s1, s0; + unsigned char v3, v2, v1, v0; + } *setting; + + if ((len % HTTP2_SETTING_SIZE) != 0) + return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR; + + /* ### TODO: Handle settings */ + setting = (const void *)data; + for (i = 0, setting = (const void *)data; + i < len; + i += sizeof(*setting), setting++) + { + apr_uint16_t id = (setting->s1 << 8) | setting->s0; + apr_uint32_t value = (setting->v3 << 24) | (setting->v2 << 16) + | (setting->v1 << 8) | setting->v0; + + switch (id) + { + case HTTP2_SETTING_HEADER_TABLE_SIZE: + /* TODO: Send to hpack table */ + serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config, + "Setting HPACK Table size to %u\n", value); + break; + case HTTP2_SETTING_ENABLE_PUSH: + serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config, + "Setting Push enabled: %u\n", value); + h2->lr_push_enabled = (value != 0); + break; + case HTTP2_SETTING_MAX_CONCURRENT_STREAMS: + serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config, + "Setting Max Concurrent %u\n", value); + h2->lr_max_concurrent = value; + break; + case HTTP2_SETTING_INITIAL_WINDOW_SIZE: + /* Sanitize? */ + serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config, + "Setting Initial Window Size %u\n", value); + h2->lr_default_window = value; + break; + case HTTP2_SETTING_MAX_FRAME_SIZE: + /* Sanitize? */ + serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config, + "Setting Max framesize %u\n", value); + h2->lr_max_framesize = value; + break; + case HTTP2_SETTING_MAX_HEADER_LIST_SIZE: + serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config, + "Setting Max header list size %u\n", value); + h2->lr_max_headersize = value; + break; + default: + /* An endpoint that receives a SETTINGS frame with any unknown + or unsupported identifier MUST ignore that setting. */ + serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config, + "Ignoring unknown setting %d, value %u\n", id, value); + break; + } + } + + /* Always ack settings */ + serf_http2__enqueue_frame( + h2, serf__bucket_http2_frame_create( NULL, HTTP2_FRAME_TYPE_SETTINGS, HTTP2_FLAG_ACK, NULL, NULL, NULL, HTTP2_DEFAULT_MAX_FRAMESIZE, - NULL, NULL, conn->allocator), - TRUE); - } - else if (frametype == HTTP2_FRAME_TYPE_DATA) - { - /* Provide a bit of window space to the server after - receiving data */ - serf_http2__enqueue_frame( - ctx, - serf__bucket_http2_frame_create( - serf_bucket_create_numberv(conn->allocator, "4", (apr_int32_t)16384), - HTTP2_FRAME_TYPE_WINDOW_UPDATE, 0, - &streamid, NULL, NULL, - HTTP2_DEFAULT_MAX_FRAMESIZE, - NULL, NULL, conn->allocator), + NULL, NULL, h2->allocator), TRUE); - } - else if (frametype == HTTP2_FRAME_TYPE_PING) - { - /* TODO: PONG (=Ping Ack) */ - } - - serf_bucket_destroy(ctx->cur_payload); - ctx->cur_frame = ctx->cur_payload = NULL; - ctx->in_payload = FALSE; - ctx->buffer_used = 0; + + return APR_SUCCESS; +} + +/* Implements serf_bucket_prefix_handler_t. + Handles GOAWAY frames */ +static apr_status_t +http2_handle_goaway(void *baton, + serf_bucket_t *bucket, + const char *data, + apr_size_t len) +{ + serf_http2_protocol_t *h2 = baton; + + SERF_H2_assert(h2 != NULL); + + return APR_SUCCESS; +} + + +/* Implements serf_bucket_aggregate_eof_t */ +static apr_status_t +http2_handle_continuation(void *baton, + serf_bucket_t *aggregate_bucket) +{ + serf_http2_protocol_t *h2 = baton; + apr_status_t status; + const char *data; + apr_size_t len; + + if (h2->continuation_bucket != aggregate_bucket) + return APR_EOF; /* This is all we have */ + + SERF_H2_assert(h2->read_frame == NULL); + SERF_H2_assert(h2->continuation_bucket == aggregate_bucket); + + status = http2_process(h2); + if (status) + return status; + + if (h2->continuation_bucket == aggregate_bucket) + { + /* We expect more data in the future. Something + was done in http2_process() or it didn't + return APR_SUCCESS */ + return APR_SUCCESS; + } + + /* As h2->continuation_bucket is no longer attached we don't + recurse on peeking. Just check if there is more */ + return serf_bucket_peek(aggregate_bucket, &data, &len); +} + +/* Implements the serf__bucket_http2_unframe_set_eof callback */ +static apr_status_t +http2_end_of_frame(void *baton, + serf_bucket_t *frame) +{ + serf_http2_protocol_t *h2 = baton; + + SERF_H2_assert(h2->read_frame == frame); + h2->read_frame = NULL; + h2->in_frame = FALSE; + h2->processor = NULL; + h2->processor_baton = NULL; + + return APR_SUCCESS; +} + +/* Implements serf_http2_processor_t */ +static apr_status_t +http2_bucket_processor(void *baton, + serf_http2_protocol_t *h2, + serf_bucket_t *frame_bucket) +{ + struct iovec vecs[16]; + int vecs_used; + serf_bucket_t *payload = baton; + apr_status_t status; + + status = serf_bucket_read_iovec(payload, SERF_READ_ALL_AVAIL, 16, + vecs, &vecs_used); + + if (APR_STATUS_IS_EOF(status)) + { + SERF_H2_assert(!h2->in_frame && !h2->read_frame); + serf_bucket_destroy(payload); + } + + return status; +} + +/* Processes incoming HTTP2 data */ +static apr_status_t +http2_process(serf_http2_protocol_t *h2) +{ + while (TRUE) + { + apr_status_t status; + serf_bucket_t *body; + + if (h2->processor) + { + status = h2->processor(h2->processor_baton, h2, h2->read_frame); + + if (SERF_BUCKET_READ_ERROR(status)) + return status; + else if (APR_STATUS_IS_EOF(status)) + { + /* ### frame ended */ + SERF_H2_assert(h2->read_frame == NULL); + h2->processor = NULL; + h2->processor_baton = NULL; } - else - continue; + else if (h2->in_frame) + { + if (status) + return status; + else + continue; + } + } + else + { + SERF_H2_assert(!h2->in_frame); } - if (APR_STATUS_IS_EOF(status)) + body = h2->read_frame; + + if (! body) { - const char *data; - apr_size_t len; - status = serf_bucket_peek(conn->stream, &data, &len); + SERF_H2_assert(!h2->in_frame); + + body = serf__bucket_http2_unframe_create( + h2->conn->stream, FALSE, + h2->rl_max_framesize, + h2->allocator); + + serf__bucket_http2_unframe_set_eof(body, + http2_end_of_frame, h2); - if (SERF_BUCKET_READ_ERROR(status) - || APR_STATUS_IS_EOF(status)) + serf_bucket_set_config(body, h2->config); + h2->read_frame = body; + } + + if (! h2->in_frame) + { + apr_int32_t sid; + unsigned char frametype; + unsigned char frameflags; + apr_size_t remaining; + serf_http2_processor_t process_handler = NULL; + void *process_baton = NULL; + serf_bucket_t *process_bucket = NULL; + serf_http2_stream_t *stream; + apr_uint32_t reset_reason; + + status = serf__bucket_http2_unframe_read_info(body, &sid, + &frametype, &frameflags); + + if (APR_STATUS_IS_EOF(status)) { - /* We have a real EOF*/ - break; + /* Entire frame is already read (just header) */ + SERF_H2_assert(h2->read_frame == NULL); + SERF_H2_assert(! h2->in_frame); + } + else if (status) + { + SERF_H2_assert(h2->read_frame != NULL); + SERF_H2_assert(! h2->in_frame); + return status; + } + else + { + h2->in_frame = TRUE; + SERF_H2_assert(h2->read_frame != NULL); } - } - ctx->cur_frame = ctx->cur_payload = - serf__bucket_http2_unframe_create(conn->stream, FALSE, - HTTP2_DEFAULT_MAX_FRAMESIZE, - conn->stream->allocator); - } + serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config, + "Reading 0x%x frame, stream=%x, flags=0x%x\n", + frametype, sid, frameflags); - return status; + /* If status is EOF then the frame doesn't have/declare a body */ + switch (frametype) + { + /* ---------------------------------------------------- */ + case HTTP2_FRAME_TYPE_DATA: + case HTTP2_FRAME_TYPE_HEADERS: + case HTTP2_FRAME_TYPE_PUSH_PROMISE: + if (h2->continuation_bucket) + { + h2->continuation_bucket = NULL; + h2->continuation_streamid = 0; + return APR_EAGAIN; + } + + stream = serf_http2__stream_get(h2, sid, TRUE, TRUE); + + if (sid == 0) + { + /* DATA, HEADERS and PUSH_PROMISE: + + These frames MUST be associated with a stream. If a + XXX frame is received whose stream identifier field is 0x0, + the recipient MUST respond with a connection error + (Section 5.4.1) of type PROTOCOL_ERROR. */ + return SERF_ERROR_HTTP2_PROTOCOL_ERROR; + } + + reset_reason = 0; + + if (frametype == HTTP2_FRAME_TYPE_DATA) + { + remaining = (apr_size_t)serf_bucket_get_remaining(body); + + if (h2->rl_window < remaining) + { + if (h2->enforce_flow_control) + reset_reason = SERF_ERROR_HTTP2_FLOW_CONTROL_ERROR; + + h2->rl_window = 0; + } + else + h2->rl_window -= remaining; + + if (stream) + { + if (stream->rl_window < remaining) + { + if (h2->enforce_flow_control) + reset_reason = SERF_ERROR_HTTP2_FLOW_CONTROL_ERROR; + + stream->rl_window = 0; + } + else + stream->rl_window -= remaining; + } + } + + /* DATA, HEADERS and PUSH_PROMISE can have padding */ + if (frameflags & HTTP2_FLAG_PADDED) + { + body = serf__bucket_http2_unpad_create(body, TRUE, + h2->allocator); + } + + /* An HEADERS frame can have an included priority 'frame' */ + if (frametype == HTTP2_FRAME_TYPE_HEADERS + && (frameflags & HTTP2_FLAG_PRIORITY)) + { + body = serf_bucket_prefix_create(body, + HTTP2_PRIORITY_DATA_SIZE, + http2_handle_priority, + stream, h2->allocator); + } + + if (frametype == HTTP2_FRAME_TYPE_PUSH_PROMISE) + { + body = serf_bucket_prefix_create(body, + HTTP2_PROMISE_DATA_SIZE, + http2_handle_promise, + stream, h2->allocator); + } + + if (!stream) + { + if (!reset_reason) + reset_reason = SERF_ERROR_HTTP2_STREAM_CLOSED; + } + else + switch (frametype) + { + case HTTP2_FRAME_TYPE_DATA: + if (stream->status != H2S_OPEN + && stream->status != H2S_HALFCLOSED_LOCAL) + { + reset_reason = SERF_ERROR_HTTP2_STREAM_CLOSED; + } + break; + case HTTP2_FRAME_TYPE_HEADERS: + if (stream->status != H2S_IDLE + && stream->status != H2S_RESERVED_LOCAL + && stream->status != H2S_OPEN + && stream->status != H2S_HALFCLOSED_REMOTE) + { + reset_reason = SERF_ERROR_HTTP2_STREAM_CLOSED; + } + break; + case HTTP2_FRAME_TYPE_PUSH_PROMISE: + if (stream->status != H2S_OPEN + && stream->status != H2S_HALFCLOSED_REMOTE) + { + reset_reason = SERF_ERROR_HTTP2_STREAM_CLOSED; + } + break; + } + + if (reset_reason) + { + if (stream) + serf_http2__stream_reset(stream, reset_reason, TRUE); + else + serf_http2__enqueue_stream_reset(h2, sid, reset_reason); + } + + if (frametype == HTTP2_FRAME_TYPE_HEADERS + || frametype == HTTP2_FRAME_TYPE_PUSH_PROMISE) + { + if (!(frameflags & HTTP2_FLAG_END_HEADERS)) + { + /* This header frame is *directly* followed by + continuation frames... We hide this from the + stream code, by providing an aggregate that will + read through the body of multiple frames */ + + h2->continuation_bucket = serf_bucket_aggregate_create( + h2->allocator); + h2->continuation_streamid = sid; + + serf_bucket_aggregate_append(h2->continuation_bucket, + body); + + serf_bucket_aggregate_hold_open( + h2->continuation_bucket, + http2_handle_continuation, h2); + } + + if (stream && !reset_reason) + { + body = serf_http2__stream_handle_hpack( + stream, body, frametype, + (frameflags & HTTP2_FLAG_END_STREAM), + h2->rl_max_headersize, + h2->hpack_tbl, h2->config, + h2->allocator); + } + else + { + /* Even when we don't want to process the headers we + must read them to update the HPACK state */ + body = serf__bucket_hpack_decode_create( + body, NULL, NULL, h2->rl_max_headersize, + h2->hpack_tbl, h2->allocator); + } + } + else /* We have a data bucket */ + { + body = serf_http2__stream_handle_data( + stream, body, frametype, + (frameflags & HTTP2_FLAG_END_STREAM), + h2->config, h2->allocator); + } + + if (body) + process_bucket = body; /* We will take care of discarding */ + else + { + /* The stream wants to handle the reading itself */ + process_handler = serf_http2__stream_processor; + process_baton = stream; + } + break; + + /* ---------------------------------------------------- */ + case HTTP2_FRAME_TYPE_PRIORITY: + if (sid == 0) + { + /* The PRIORITY frame always identifies a stream. If a + PRIORITY frame is received with a stream identifier of + 0x0, the recipient MUST respond with a connection error + (Section 5.4.1) of type PROTOCOL_ERROR.*/ + + return SERF_ERROR_HTTP2_PROTOCOL_ERROR; + } + else if (serf_bucket_get_remaining(body) + != HTTP2_PRIORITY_DATA_SIZE) + { + /* A PRIORITY frame with a length other than 5 octets MUST + be treated as a stream error (Section 5.4.2) of type + FRAME_SIZE_ERROR.*/ + + /* ### But we currently upgrade this to a connection error */ + return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR; + } + + stream = serf_http2__stream_get(h2, sid, TRUE, TRUE); + + if (stream) + { + body = serf_bucket_prefix_create(body, + HTTP2_PRIORITY_DATA_SIZE, + http2_handle_priority, + stream, h2->allocator); + } + + /* Just reading will do the right thing now */ + process_bucket = body; + break; + + /* ---------------------------------------------------- */ + case HTTP2_FRAME_TYPE_RST_STREAM: + if (sid == 0) + { + /* RST_STREAM frames MUST be associated with a stream. + If a RST_STREAM frame is received with a stream + identifier of 0x0, the recipient MUST treat this as a + connection error (Section 5.4.1) of type PROTOCOL_ERROR. + */ + + return SERF_ERROR_HTTP2_PROTOCOL_ERROR; + } + else if (serf_bucket_get_remaining(body) + != HTTP2_RST_DATA_SIZE) + { + /* A RST_STREAM frame with a length other than 4 octets MUST + be treated as a connection error (Section 5.4.1) of type + FRAME_SIZE_ERROR. */ + + return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR; + } + + stream = serf_http2__stream_get(h2, sid, TRUE, TRUE); + + if (stream) + { + body = serf_bucket_prefix_create(body, + HTTP2_FRAME_TYPE_RST_STREAM, + http2_handle_frame_reset, + stream, h2->allocator); + } + + /* Just reading will do the right thing now */ + process_bucket = body; + break; + + /* ---------------------------------------------------- */ + case HTTP2_FRAME_TYPE_SETTINGS: + if (sid != 0) + { + /* SETTINGS frames always apply to a connection, never a + single stream. The stream identifier for a SETTINGS + frame MUST be zero (0x0). If an endpoint receives a + SETTINGS frame whose stream identifier field is + anything other than 0x0, the endpoint MUST respond + with a connection error (Section 5.4.1) of type + PROTOCOL_ERROR. + */ + return SERF_ERROR_HTTP2_PROTOCOL_ERROR; + } + + remaining = (apr_size_t)serf_bucket_get_remaining(body); + if (frameflags & HTTP2_FLAG_ACK) + { + if (remaining != 0) + { + /* When this bit is set, the payload of the SETTINGS + frame MUST be empty. Receipt of a SETTINGS frame + with the ACK flag set and a length field value + other than 0 MUST be treated as a connection error + (Section 5.4.1) of type FRAME_SIZE_ERROR. */ + return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR; + } + h2->setting_acks++; + } + else if ((remaining % HTTP2_SETTING_SIZE) != 0) + { + /* A SETTINGS frame with a length other than a multiple of + 6 octets MUST be treated as a connection error (Section + 5.4.1) of type FRAME_SIZE_ERROR. */ + return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR; + } + else + { + /* Just read everything... We checked it against our + max-framesize */ + body = serf_bucket_prefix_create(body, remaining, + http2_handle_settings, h2, + h2->allocator); + } + + /* Just reading will do the right thing now */ + process_bucket = body; + break; + + /* ---------------------------------------------------- */ + case HTTP2_FRAME_TYPE_PING: + if (sid != 0) + { + /* PING frames are not associated with any individual + stream. If a PING frame is received with a stream + identifier field value other than 0x0, the recipient + MUST respond with a connection error (Section 5.4.1) + of type PROTOCOL_ERROR.*/ + return SERF_ERROR_HTTP2_PROTOCOL_ERROR; + } + else if (serf_bucket_get_remaining(body) + != HTTP2_PING_DATA_SIZE) + { + /* Receipt of a PING frame with a length field value other + than 8 MUST be treated as a connection error (Section + 5.4.1) of type FRAME_SIZE_ERROR.. */ + return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR; + } + + body = serf_bucket_prefix_create(body, HTTP2_PING_DATA_SIZE, + (frameflags & HTTP2_FLAG_ACK) + ? http2_handle_ping + : http2_handle_ping_ack, + h2, h2->allocator); + + /* Just reading will do the right thing now */ + process_bucket = body; + break; + /* ---------------------------------------------------- */ + case HTTP2_FRAME_TYPE_GOAWAY: + if (sid != 0) + { + /* The GOAWAY frame applies to the connection, not a + specific stream. An endpoint MUST treat a GOAWAY frame + with a stream identifier other than 0x0 as a connection + error(Section 5.4.1) of type PROTOCOL_ERROR. */ + return SERF_ERROR_HTTP2_PROTOCOL_ERROR; + } + + /* As the final go-away frame is best effort only we are not + checking the bodysize against HTTP2_GOWAWAY_DATA_SIZE here. + We'll see what we get in the goaway handler. + + Go away frames may contain additional opaque debug + information at the end, so instead of reading + HTTP2_GOWAWAY_DATA_SIZE bytes, we just read the whole frame. + */ + remaining = (apr_size_t)serf_bucket_get_remaining(body); + + body = serf_bucket_prefix_create(body, remaining, + http2_handle_goaway, h2, + h2->allocator); + + /* Just reading will do the right thing now */ + process_bucket = body; + break; + /* ---------------------------------------------------- */ + case HTTP2_FRAME_TYPE_WINDOW_UPDATE: + if (serf_bucket_get_remaining(body) + != HTTP2_WINDOW_UPDATE_DATA_SIZE) + { + /* A WINDOW_UPDATE frame with a length other than 4 octets + MUST be treated as a connection error (Section 5.4.1) + of type FRAME_SIZE_ERROR. */ + return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR; + } + + if (sid == 0) + { + body = serf_bucket_prefix_create( + body, + HTTP2_WINDOW_UPDATE_DATA_SIZE, + http2_handle_connection_window_update, h2, + h2->allocator); + } + else + { + stream = serf_http2__stream_get(h2, sid, TRUE, TRUE); + + if (stream) + body = serf_bucket_prefix_create( + body, + HTTP2_WINDOW_UPDATE_DATA_SIZE, + http2_handle_stream_window_update, stream, + h2->allocator); + } + + /* Just reading will do the right thing now */ + process_bucket = body; + break; + + /* ---------------------------------------------------- */ + case HTTP2_FRAME_TYPE_CONTINUATION: + if (!h2->continuation_bucket + || sid != h2->continuation_streamid) + { + /* A CONTINUATION frame MUST be preceded by a HEADERS, + PUSH_PROMISE or CONTINUATION frame without the + END_HEADERS flag set. A recipient that observes + violation of this rule MUST respond with a connection + error(Section 5.4.1) of type PROTOCOL_ERROR. */ + h2->continuation_bucket = NULL; + h2->continuation_streamid = 0; + return SERF_ERROR_HTTP2_PROTOCOL_ERROR; + } + + serf_bucket_aggregate_append(h2->continuation_bucket, body); + + if (frameflags & HTTP2_FLAG_END_HEADERS) + { + h2->continuation_bucket = NULL; + h2->continuation_streamid = 0; + } + + return APR_SUCCESS; + + /* ---------------------------------------------------- */ + default: + /* We explicitly ignore all other frames as required, + so reading will do the right thing now */ + process_bucket = body; + } /* switch */ + + if (body) + serf_bucket_set_config(body, h2->config); + + SERF_H2_assert(h2->processor == NULL); + + if (process_handler) + { + h2->processor = process_handler; + h2->processor_baton = process_baton; + } + else + { + SERF_H2_assert(process_bucket != NULL); + h2->processor = http2_bucket_processor; + h2->processor_baton = process_bucket; + } + } + } /* while(TRUE) */ } static apr_status_t @@ -510,7 +1241,7 @@ http2_protocol_read(serf_connection_t *c conn->ctx->dirty_pollset = 1; } - status = http2_read(conn); + status = http2_process(conn->protocol_baton); if (!status) return APR_SUCCESS; @@ -601,8 +1332,8 @@ serf_http2__allocate_stream_id(void *bat */ if (stream->streamid < 0) { - stream->streamid = stream->h2->next_local_streamid; - stream->h2->next_local_streamid += 2; + stream->streamid = stream->h2->lr_next_streamid; + stream->h2->lr_next_streamid += 2; if (stream->status == H2S_INIT) stream->status = H2S_IDLE; @@ -628,7 +1359,7 @@ serf_http2__stream_get(serf_http2_protoc if (streamid < 0) return NULL; - for (stream = h2->first; stream; stream->next) + for (stream = h2->first; stream; stream = stream->next) { if (stream->streamid == streamid) { @@ -640,13 +1371,13 @@ serf_http2__stream_get(serf_http2_protoc } if (create_for_remote - && (streamid & 0x01) == (h2->next_remote_streamid & 0x01)) + && (streamid & 0x01) == (h2->rl_next_streamid & 0x01)) { serf_http2_stream_t *rs; stream = serf_http2__stream_create(h2, streamid, - h2->default_lr_window, - h2->default_rl_window, - h2->conn->allocator); + h2->lr_default_window, + h2->rl_default_window, + h2->allocator); if (h2->first) { @@ -657,10 +1388,10 @@ serf_http2__stream_get(serf_http2_protoc else h2->last = h2->first = stream; - if (streamid < h2->next_remote_streamid) + if (streamid < h2->rl_next_streamid) stream->status = H2S_CLOSED; else - h2->next_remote_streamid = (streamid + 2); + h2->rl_next_streamid = (streamid + 2); for (rs = h2->first; rs; rs = rs->next) { @@ -681,3 +1412,11 @@ serf_http2__stream_get(serf_http2_protoc } return NULL; } + +apr_status_t +serf_http2__enqueue_stream_reset(serf_http2_protocol_t *h2, + apr_int32_t streamid, + apr_status_t reason) +{ + return APR_SUCCESS; +} Modified: serf/trunk/protocols/http2_protocol.h URL: http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.h?rev=1711679&r1=1711678&r2=1711679&view=diff ============================================================================== --- serf/trunk/protocols/http2_protocol.h (original) +++ serf/trunk/protocols/http2_protocol.h Sat Oct 31 19:53:57 2015 @@ -24,6 +24,13 @@ #include "serf.h" #include "serf_private.h" +#ifdef _DEBUG +#include <assert.h> +#define SERF_H2_assert(x) assert(x) +#else +#define SERF_H2_assert(x) ((void)0) +#endif + #define SERF_LOGHTTP2 \ SERF_LOGCOMP_PROTOCOL, (__FILE__ ":" APR_STRINGIFY(__LINE__)) @@ -34,9 +41,19 @@ extern "C" { /* ********** HTTP2 Frame types ********** */ /* The standard maximum framesize. Always supported */ -#define HTTP2_DEFAULT_MAX_FRAMESIZE 16384 +#define HTTP2_DEFAULT_MAX_FRAMESIZE 16384 /* The default stream and connection window size before updates */ -#define HTTP2_DEFAULT_WINDOW_SIZE 65535 +#define HTTP2_DEFAULT_WINDOW_SIZE 65535 +#define HTTP2_DEFAULT_MAX_CONCURRENT 0xFFFFFFFF + +#define HTTP2_PRIORITY_DATA_SIZE 5 +#define HTTP2_RST_DATA_SIZE 4 +#define HTTP2_PROMISE_DATA_SIZE 4 +#define HTTP2_PING_DATA_SIZE 8 +#define HTTP2_GOWAWAY_DATA_SIZE 8 +#define HTTP2_WINDOW_UPDATE_DATA_SIZE 4 + +#define HTTP2_SETTING_SIZE 6 /* Frame type is an 8 bit unsigned integer */ @@ -87,17 +104,20 @@ extern "C" { /* ------------------------------------- */ typedef struct serf_http2_protocol_t serf_http2_protocol_t; +typedef struct serf_http2_stream_data_t serf_http2_stream_data_t; + typedef struct serf_http2_stream_t { struct serf_http2_protocol_t *h2; serf_bucket_alloc_t *alloc; + /* Opaque implementation details */ + serf_http2_stream_data_t *data; + /* Linked list of currently existing streams */ struct serf_http2_stream_t *next; struct serf_http2_stream_t *prev; - serf_request_t *request; /* May be NULL as streams may outlive requests */ - apr_int64_t lr_window; /* local->remote */ apr_int64_t rl_window; /* remote->local */ @@ -119,6 +139,10 @@ typedef struct serf_http2_stream_t /* TODO: Priority, etc. */ } serf_http2_stream_t; +typedef apr_status_t (* serf_http2_processor_t)(void *baton, + serf_http2_protocol_t *h2, + serf_bucket_t *body); + /* Enques an http2 frame for output */ apr_status_t serf_http2__enqueue_frame(serf_http2_protocol_t *h2, @@ -133,6 +157,12 @@ serf_http2__stream_create(serf_http2_pro apr_uint32_t rl_window, serf_bucket_alloc_t *alloc); + +apr_status_t +serf_http2__enqueue_stream_reset(serf_http2_protocol_t *h2, + apr_int32_t streamid, + apr_status_t reason); + /* Allocates a new stream id for a stream. BATON is a serf_http2_stream_t * instance. @@ -147,10 +177,10 @@ void serf_http2__stream_cleanup(serf_http2_stream_t *stream); serf_http2_stream_t * -serf_http2__stream_get_by_id(serf_http2_protocol_t *h2, - apr_int32_t streamid, - int create_for_remote, - int move_first); +serf_http2__stream_get(serf_http2_protocol_t *h2, + apr_int32_t streamid, + int create_for_remote, + int move_first); /* Sets up STREAM to handle REQUEST */ apr_status_t @@ -158,7 +188,33 @@ serf_http2__stream_setup_request(serf_ht serf_hpack_table_t *hpack_tbl, serf_request_t *request); +apr_status_t +serf_http2__stream_reset(serf_http2_stream_t *stream, + apr_status_t reason, + int local_reset); + +serf_bucket_t * +serf_http2__stream_handle_hpack(serf_http2_stream_t *stream, + serf_bucket_t *bucket, + unsigned char frametype, + int end_stream, + apr_size_t max_entry_size, + serf_hpack_table_t *hpack_tbl, + serf_config_t *config, + serf_bucket_alloc_t *allocator); + +serf_bucket_t * +serf_http2__stream_handle_data(serf_http2_stream_t *stream, + serf_bucket_t *bucket, + unsigned char frametype, + int end_stream, + serf_config_t *config, + serf_bucket_alloc_t *allocator); +apr_status_t +serf_http2__stream_processor(void *baton, + serf_http2_protocol_t *h2, + serf_bucket_t *bucket); #ifdef __cplusplus } Modified: serf/trunk/protocols/http2_stream.c URL: http://svn.apache.org/viewvc/serf/trunk/protocols/http2_stream.c?rev=1711679&r1=1711678&r2=1711679&view=diff ============================================================================== --- serf/trunk/protocols/http2_stream.c (original) +++ serf/trunk/protocols/http2_stream.c Sat Oct 31 19:53:57 2015 @@ -29,6 +29,12 @@ #include "protocols/http2_buckets.h" #include "protocols/http2_protocol.h" +struct serf_http2_stream_data_t +{ + serf_request_t *request; /* May be NULL as streams may outlive requests */ + serf_bucket_t *response_agg; +}; + serf_http2_stream_t * serf_http2__stream_create(serf_http2_protocol_t *h2, apr_int32_t streamid, @@ -41,8 +47,12 @@ serf_http2__stream_create(serf_http2_pro stream->h2 = h2; stream->alloc = alloc; + stream->data = serf_bucket_mem_alloc(alloc, sizeof(*stream->data)); + stream->next = stream->prev = NULL; - stream->request = NULL; + + stream->data->request = NULL; + stream->data->response_agg = NULL; stream->lr_window = lr_window; stream->rl_window = rl_window; @@ -60,6 +70,14 @@ serf_http2__stream_create(serf_http2_pro void serf_http2__stream_cleanup(serf_http2_stream_t *stream) { + if (stream->data) + { + if (stream->data->response_agg) + serf_bucket_destroy(stream->data->response_agg); + + serf_bucket_mem_free(stream->alloc, stream->data); + stream->data = NULL; + } serf_bucket_mem_free(stream->alloc, stream); } @@ -72,7 +90,7 @@ serf_http2__stream_setup_request(serf_ht serf_bucket_t *hpack; serf_bucket_t *body; - stream->request = request; + stream->data->request = request; if (!request->req_bkt) { @@ -112,3 +130,145 @@ serf_http2__stream_setup_request(serf_ht return APR_SUCCESS; } + +apr_status_t +serf_http2__stream_reset(serf_http2_stream_t *stream, + apr_status_t reason, + int local_reset) +{ + stream->status = H2S_CLOSED; + + if (stream->streamid < 0) + return APR_SUCCESS; + + if (local_reset) + return serf_http2__enqueue_stream_reset(stream->h2, + stream->streamid, + reason); + + return APR_SUCCESS; +} + +apr_status_t +stream_response_eof(void *baton, + serf_bucket_t *aggregate_bucket) +{ + serf_http2_stream_t *stream = baton; + + switch (stream->status) + { + case H2S_CLOSED: + case H2S_HALFCLOSED_REMOTE: + return APR_EOF; + default: + return APR_EAGAIN; + } +} + +serf_bucket_t * +serf_http2__stream_handle_hpack(serf_http2_stream_t *stream, + serf_bucket_t *bucket, + unsigned char frametype, + int end_stream, + apr_size_t max_entry_size, + serf_hpack_table_t *hpack_tbl, + serf_config_t *config, + serf_bucket_alloc_t *allocator) +{ + if (!stream->data->response_agg) + { + stream->data->response_agg = serf_bucket_aggregate_create(stream->alloc); + serf_bucket_aggregate_hold_open(stream->data->response_agg, + stream_response_eof, stream); + serf_bucket_set_config(stream->data->response_agg, config); + } + + bucket = serf__bucket_hpack_decode_create(bucket, NULL, NULL, max_entry_size, + hpack_tbl, allocator); + + serf_bucket_aggregate_append(stream->data->response_agg, bucket); + + if (end_stream) + { + if (stream->status == H2S_HALFCLOSED_LOCAL) + stream->status = H2S_CLOSED; + else + stream->status = H2S_HALFCLOSED_REMOTE; + } + + return NULL; +} + +serf_bucket_t * +serf_http2__stream_handle_data(serf_http2_stream_t *stream, + serf_bucket_t *bucket, + unsigned char frametype, + int end_stream, + serf_config_t *config, + serf_bucket_alloc_t *allocator) +{ + if (!stream->data->response_agg) + { + stream->data->response_agg = serf_bucket_aggregate_create(stream->alloc); + serf_bucket_aggregate_hold_open(stream->data->response_agg, + stream_response_eof, stream); + + serf_bucket_set_config(stream->data->response_agg, config); + } + + serf_bucket_aggregate_append(stream->data->response_agg, bucket); + + if (end_stream) + { + if (stream->status == H2S_HALFCLOSED_LOCAL) + stream->status = H2S_CLOSED; + else + stream->status = H2S_HALFCLOSED_REMOTE; + } + + return NULL; +} + +apr_status_t +serf_http2__stream_processor(void *baton, + serf_http2_protocol_t *h2, + serf_bucket_t *bucket) +{ + serf_http2_stream_t *stream = baton; + apr_status_t status = APR_SUCCESS; + + if (!stream->data->response_agg) + return APR_EAGAIN; + + /* ### TODO: Delegate to request */ + while (!status) + { + const char *data; + apr_size_t len; + + status = serf_bucket_read(stream->data->response_agg, + SERF_READ_ALL_AVAIL, &data, &len); + + if (!SERF_BUCKET_READ_ERROR(status)) + { + char *printable = serf_bstrmemdup(bucket->allocator, data, len); + char *c; + + for (c = printable; *c; c++) + { + if (((*c < ' ') || (*c > '\x7E')) && !strchr("\r\n", *c)) /* Poor mans isctrl*/ + { + *c = ' '; + } + } + +#ifdef _DEBUG + fputs(printable, stdout); +#endif + + serf_bucket_mem_free(bucket->allocator, printable); + } + } + + return status; +}