Author: rhuijben Date: Wed Nov 4 14:43:01 2015 New Revision: 1712557 URL: http://svn.apache.org/viewvc?rev=1712557&view=rev Log: Update the http2 framing bucket -responsible for creating frames- to just create a single frame for passed data. The updated implementation should perform less copying when possible, but tries to create the frame whatever it takes.
I decided that splitting data over multiple frames belongs on a different layer as the two cases where this is needed need different behavior anyway. * buckets/copy_buckets.c (serf__copy_iovec): Make library private instead of static. * buckets/http2_frame_buckets.c (serf_http2_frame_context_t): Simplify state. (serf__bucket_http2_frame_create): Remove some arguments. Limits max_payload_size to a uint32, as that is how http2 configures it and it has to fit in 24 bits anyway. (http2_prepare_frame): Try to determine the payload size without copying and only fall back if that doesn't work. (serf_http2_frame_read, serf_http2_frame_read_iovec): Verify that the payload didn't shrink or grow. (serf_http2_frame_peek): Simplify a bit. * protocols/http2_buckets.h (serf__bucket_http2_frame_create): Remove some arguments. * protocols/http2_protocol.c (serf__http2_protocol_init, http2_handle_ping, http2_handle_settings): Update caller. (http2_handle_goaway, apr_status_t): Minor logging tweak. * protocols/http2_stream.c (serf_http2__stream_setup_next_request): Update caller. * serf_private.h (serf__copy_iovec): Set here. * test/test_buckets.c (test_http2_frame_bucket_basic): Update caller. Modified: serf/trunk/buckets/copy_buckets.c serf/trunk/buckets/http2_frame_buckets.c serf/trunk/protocols/http2_buckets.h serf/trunk/protocols/http2_protocol.c serf/trunk/protocols/http2_stream.c serf/trunk/serf_private.h serf/trunk/test/test_buckets.c Modified: serf/trunk/buckets/copy_buckets.c URL: http://svn.apache.org/viewvc/serf/trunk/buckets/copy_buckets.c?rev=1712557&r1=1712556&r2=1712557&view=diff ============================================================================== --- serf/trunk/buckets/copy_buckets.c (original) +++ serf/trunk/buckets/copy_buckets.c Wed Nov 4 14:43:01 2015 @@ -58,10 +58,10 @@ serf_bucket_t *serf_bucket_copy_create( return serf_bucket_create(&serf_bucket_type_copy, allocator, ctx); } -static void serf__copy_iovec(char *data, - apr_size_t *copied, - struct iovec *vecs, - int vecs_used) +void serf__copy_iovec(char *data, + apr_size_t *copied, + struct iovec *vecs, + int vecs_used) { int i; apr_size_t sz = 0; Modified: serf/trunk/buckets/http2_frame_buckets.c URL: http://svn.apache.org/viewvc/serf/trunk/buckets/http2_frame_buckets.c?rev=1712557&r1=1712556&r2=1712557&view=diff ============================================================================== --- serf/trunk/buckets/http2_frame_buckets.c (original) +++ serf/trunk/buckets/http2_frame_buckets.c Wed Nov 4 14:43:01 2015 @@ -610,14 +610,13 @@ typedef struct serf_http2_frame_context_ serf_bucket_t *stream; serf_bucket_alloc_t *alloc; serf_bucket_t *chunk; - apr_status_t stream_status; + apr_size_t bytes_remaining; apr_size_t max_payload_size; + apr_int32_t stream_id; unsigned char frametype; unsigned char flags; - char end_of_stream; - char end_of_headers; char created_frame; apr_int32_t *p_stream_id; @@ -625,12 +624,6 @@ typedef struct serf_http2_frame_context_ void (*stream_id_alloc)(void *baton, apr_int32_t *stream_id); apr_size_t current_window; - void *alloc_window_baton; - apr_int32_t (*alloc_window)(void *baton, - unsigned char frametype, - apr_int32_t stream_id, - apr_size_t requested, - int peek); } serf_http2_frame_context_t; @@ -643,14 +636,7 @@ serf__bucket_http2_frame_create(serf_buc void *baton, apr_int32_t *stream_id), void *stream_id_baton, - apr_size_t max_payload_size, - apr_int32_t (*alloc_window)( - void *baton, - unsigned char frametype, - apr_int32_t stream_id, - apr_size_t requested, - int peek), - void *alloc_window_baton, + apr_uint32_t max_payload_size, serf_bucket_alloc_t *alloc) { serf_http2_frame_context_t *ctx = serf_bucket_mem_alloc(alloc, sizeof(*ctx)); @@ -658,7 +644,6 @@ serf__bucket_http2_frame_create(serf_buc ctx->alloc = alloc; ctx->stream = stream; ctx->chunk = serf_bucket_aggregate_create(alloc); - ctx->stream_status = APR_SUCCESS; ctx->max_payload_size = max_payload_size; ctx->frametype = frame_type; ctx->flags = flags; @@ -688,10 +673,7 @@ serf__bucket_http2_frame_create(serf_buc } ctx->current_window = 0; - ctx->alloc_window = alloc_window; - ctx->alloc_window_baton = alloc_window_baton; - - ctx->end_of_stream = ctx->end_of_headers = ctx->created_frame = FALSE; + ctx->created_frame = FALSE; return serf_bucket_create(&serf_bucket_type__http2_frame, alloc, ctx); } @@ -700,63 +682,169 @@ static apr_status_t http2_prepare_frame(serf_bucket_t *bucket) { serf_http2_frame_context_t *ctx = bucket->data; - struct iovec vecs[512]; int vecs_used; - apr_size_t len; - unsigned char frame[FRAME_PREFIX_SIZE]; - int i; + apr_uint64_t payload_remaining; if (ctx->created_frame) return APR_SUCCESS; - ctx->created_frame = TRUE; + /* How long will this frame be? */ + if (!ctx->stream) + payload_remaining = 0; + else + payload_remaining = serf_bucket_get_remaining(ctx->stream); - if (ctx->stream) + if (payload_remaining != SERF_LENGTH_UNKNOWN + && payload_remaining > ctx->max_payload_size) + { + return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR; + } + else if (payload_remaining != SERF_LENGTH_UNKNOWN) { - ctx->stream_status = serf_bucket_read_iovec(ctx->stream, - ctx->max_payload_size, - 512, vecs, &vecs_used); + if (ctx->stream) + serf_bucket_aggregate_append(ctx->chunk, ctx->stream); - if (SERF_BUCKET_READ_ERROR(ctx->stream_status)) - return ctx->stream_status; + ctx->stream = NULL; /* Now managed by aggregate */ } else { - vecs_used = 0; - ctx->stream_status = APR_EOF; + /* Our payload doesn't know how long it is. Our only option + now is to create the actual data */ + struct iovec vecs[512]; + apr_status_t status; + + status = serf_bucket_read_iovec(ctx->stream, ctx->max_payload_size, + 512, vecs, &vecs_used); + + if (SERF_BUCKET_READ_ERROR(status)) + return status; + else if (APR_STATUS_IS_EOF(status)) + { + /* OK, we got everything, let's put the data at the start of the + aggregate. */ + serf_bucket_aggregate_append_iovec(ctx->chunk, vecs, vecs_used); + + /* Obtain the size now , to avoid problems when the bucket + doesn't know that it has nothing remaining*/ + payload_remaining = serf_bucket_get_remaining(ctx->chunk); + + /* Just add the stream behind the iovecs. This keeps the chunks + available exactly until they are no longer necessary */ + serf_bucket_aggregate_append(ctx->chunk, ctx->stream); + ctx->stream = NULL; /* Managed by aggregate */ + + if (payload_remaining == SERF_LENGTH_UNKNOWN) + { + /* Should never happen: + Aggregate with only iovecs should know size */ + return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR; + } + } + else + { + /* Auch... worst case scenario, we have to copy the data. Luckily + we have an absolute limit after which we may error out */ + apr_size_t total = 0; + char *data = serf_bucket_mem_alloc(bucket->allocator, + ctx->max_payload_size); + + serf__copy_iovec(data, &total, vecs, vecs_used); + + while (!APR_STATUS_IS_EOF(status) + && total < ctx->max_payload_size) + { + apr_size_t read; + status = serf_bucket_read_iovec(ctx->stream, + ctx->max_payload_size - total + 1, + 512, vecs, &vecs_used); + + if (SERF_BUCKET_READ_ERROR(status)) + { + serf_bucket_mem_free(bucket->allocator, data); + return status; + } + + serf__copy_iovec(data, &read, vecs, vecs_used); + total += read; + + if (status && !APR_STATUS_IS_EOF(status)) + { + /* Checkpoint what we got now... + + Next time this function is called the buffer is read first and + then continued from the original stream */ + serf_bucket_t *new_stream; + new_stream = serf_bucket_aggregate_create(bucket->allocator); + + serf_bucket_aggregate_append( + new_stream, + serf_bucket_simple_own_create(data, total, bucket->allocator)); + + serf_bucket_aggregate_append(new_stream, ctx->stream); + ctx->stream = new_stream; + + return status; + } + } + + if (total > ctx->max_payload_size) + { + /* The chunk is at least 1 byte bigger then allowed */ + serf_bucket_mem_free(bucket->allocator, data); + + return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR; + } + else + { + /* Ok, we have what we need in our buffer */ + serf_bucket_aggregate_append( + ctx->chunk, + serf_bucket_simple_own_create(data, total, bucket->allocator)); + payload_remaining = total; + + /* And we no longer need stream */ + serf_bucket_destroy(ctx->stream); + ctx->stream = NULL; + } + } } - /* For this first version assume that everything fits in a single frame */ - if (! APR_STATUS_IS_EOF(ctx->stream_status)) - abort(); /* Not implemented yet */ - if (ctx->stream_id < 0 && ctx->stream_id_alloc) - { - ctx->stream_id_alloc(ctx->stream_id_baton, ctx->p_stream_id); - ctx->stream_id = *ctx->p_stream_id; - } - len = 0; - for (i = 0; i < vecs_used; i++) - len += vecs[i].iov_len; + /* Ok, now we can construct the frame */ + ctx->created_frame = TRUE; + { + unsigned char frame[FRAME_PREFIX_SIZE]; - frame[0] = (len >> 16) & 0xFF; - frame[1] = (len >> 8) & 0xFF; - frame[2] = len & 0xFF; - frame[3] = ctx->frametype; - frame[4] = ctx->flags; - frame[5] = ((apr_uint32_t)ctx->stream_id >> 24) & 0x7F; - frame[6] = ((apr_uint32_t)ctx->stream_id >> 16) & 0xFF; - frame[7] = ((apr_uint32_t)ctx->stream_id >> 8) & 0xFF; - frame[8] = ctx->stream_id & 0xFF; + /* Allocate the streamid if there isn't one. + Once the streamid hits the wire it automatically closes all + unused identifiers < this value. + */ + if (ctx->stream_id < 0 && ctx->stream_id_alloc) + { + ctx->stream_id_alloc(ctx->stream_id_baton, ctx->p_stream_id); + ctx->stream_id = *ctx->p_stream_id; + } + + frame[0] = (payload_remaining >> 16) & 0xFF; + frame[1] = (payload_remaining >> 8) & 0xFF; + frame[2] = payload_remaining & 0xFF; + frame[3] = ctx->frametype; + frame[4] = ctx->flags; + frame[5] = ((apr_uint32_t)ctx->stream_id >> 24) & 0x7F; + frame[6] = ((apr_uint32_t)ctx->stream_id >> 16) & 0xFF; + frame[7] = ((apr_uint32_t)ctx->stream_id >> 8) & 0xFF; + frame[8] = ctx->stream_id & 0xFF; - serf_bucket_aggregate_append(ctx->chunk, + /* Put the frame before the data */ + serf_bucket_aggregate_prepend(ctx->chunk, serf_bucket_simple_copy_create((const char *)&frame, FRAME_PREFIX_SIZE, ctx->alloc)); - if (vecs_used > 0) - serf_bucket_aggregate_append_iovec(ctx->chunk, vecs, vecs_used); + /* And set the amount of data that we verify will be read */ + ctx->bytes_remaining = (apr_size_t)payload_remaining + FRAME_PREFIX_SIZE; + } return APR_SUCCESS; } @@ -775,8 +863,24 @@ serf_http2_frame_read(serf_bucket_t *buc status = serf_bucket_read(ctx->chunk, requested, data, len); + if (!SERF_BUCKET_READ_ERROR(status)) + { + if (*len > ctx->bytes_remaining) + { + /* Frame payload resized after the header was written */ + return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR; + } + ctx->bytes_remaining -= *len; + } + if (APR_STATUS_IS_EOF(status)) - return ctx->stream_status; + { + if (ctx->bytes_remaining > 0) + { + /* Frame payload resized after the header was written */ + return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR; + } + } return status; } @@ -798,8 +902,30 @@ serf_http2_frame_read_iovec(serf_bucket_ status = serf_bucket_read_iovec(ctx->chunk, requested, vecs_size, vecs, vecs_used); + if (!SERF_BUCKET_READ_ERROR(status)) + { + apr_size_t len = 0; + int i; + + for (i = 0; i < *vecs_used; i++) + len += vecs[i].iov_len; + + if (len > ctx->bytes_remaining) + { + /* Frame resized after the header was written */ + return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR; + } + ctx->bytes_remaining -= len; + } + if (APR_STATUS_IS_EOF(status)) - return ctx->stream_status; + { + if (ctx->bytes_remaining > 0) + { + /* Frame payload resized after the header was written */ + return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR; + } + } return status; } @@ -814,14 +940,12 @@ serf_http2_frame_peek(serf_bucket_t *buc status = http2_prepare_frame(bucket); if (status) - return status; - - status = serf_bucket_peek(ctx->chunk, data, len); - - if (APR_STATUS_IS_EOF(status)) - return ctx->stream_status; + { + *len = 0; + return APR_SUCCESS; + } - return status; + return serf_bucket_peek(ctx->chunk, data, len); } static void Modified: serf/trunk/protocols/http2_buckets.h URL: http://svn.apache.org/viewvc/serf/trunk/protocols/http2_buckets.h?rev=1712557&r1=1712556&r2=1712557&view=diff ============================================================================== --- serf/trunk/protocols/http2_buckets.h (original) +++ serf/trunk/protocols/http2_buckets.h Wed Nov 4 14:43:01 2015 @@ -211,14 +211,7 @@ serf__bucket_http2_frame_create(serf_buc void *baton, apr_int32_t *stream_id), void *stream_id_baton, - apr_size_t max_payload_size, - apr_int32_t(*alloc_window)( - void *baton, - unsigned char frametype, - apr_int32_t stream_id, - apr_size_t requested, - int peek), - void *alloc_window_baton, + apr_uint32_t max_payload_size, serf_bucket_alloc_t *alloc); /* ==================================================================== */ Modified: serf/trunk/protocols/http2_protocol.c URL: http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1712557&r1=1712556&r2=1712557&view=diff ============================================================================== --- serf/trunk/protocols/http2_protocol.c (original) +++ serf/trunk/protocols/http2_protocol.c Wed Nov 4 14:43:01 2015 @@ -236,9 +236,9 @@ void serf__http2_protocol_init(serf_conn serf_bucket_t *window_size; tmp = serf__bucket_http2_frame_create(NULL, HTTP2_FRAME_TYPE_SETTINGS, 0, - NULL, NULL, NULL, /* Static id: 0*/ - HTTP2_DEFAULT_MAX_FRAMESIZE, - NULL, NULL, conn->allocator); + NULL, NULL, NULL, /* stream: 0 */ + h2->lr_max_framesize, + conn->allocator); serf_http2__enqueue_frame(h2, tmp, FALSE); @@ -246,9 +246,9 @@ void serf__http2_protocol_init(serf_conn window_size = serf_bucket_create_numberv(conn->allocator, "4", 0x40000000); tmp = serf__bucket_http2_frame_create(window_size, HTTP2_FRAME_TYPE_WINDOW_UPDATE, 0, - NULL, NULL, NULL, - HTTP2_DEFAULT_MAX_FRAMESIZE, - NULL, NULL, conn->allocator); + NULL, NULL, NULL, /* stream: 0 */ + h2->lr_max_framesize, + conn->allocator); serf_http2__enqueue_frame(h2, tmp, FALSE); h2->rl_window += 0x40000000; /* And update our own administration */ @@ -587,7 +587,6 @@ http2_handle_ping(void *baton, HTTP2_FLAG_ACK, NULL, NULL, NULL, h2->lr_max_framesize, - NULL, NULL, h2->allocator), TRUE /* pump */); @@ -698,8 +697,8 @@ http2_handle_settings(void *baton, HTTP2_FRAME_TYPE_SETTINGS, HTTP2_FLAG_ACK, NULL, NULL, NULL, - HTTP2_DEFAULT_MAX_FRAMESIZE, - NULL, NULL, h2->allocator), + h2->lr_max_framesize, + h2->allocator), TRUE); return APR_SUCCESS; @@ -789,7 +788,7 @@ http2_handle_goaway(void *baton, len - HTTP2_GOWAWAY_DATA_SIZE); serf__log(loglevel, SERF_LOGHTTP2, h2->config, - "Received GOAWAY, last-stream=%d, error=%u: %s\n", + "Received GOAWAY, last-stream=0x%x, error=%u: %s\n", last_streamid, error_code, goaway_text); serf_bucket_mem_free(h2->allocator, goaway_text); @@ -797,7 +796,7 @@ http2_handle_goaway(void *baton, else { serf__log(loglevel, SERF_LOGHTTP2, h2->config, - "Received GOAWAY, last-stream=%d, error=%u.\n", + "Received GOAWAY, last-stream=0x%x, error=%u.\n", last_streamid, error_code); } @@ -971,7 +970,7 @@ http2_process(serf_http2_protocol_t *h2) } serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config, - "Reading 0x%x frame, stream=%x, flags=0x%x\n", + "Reading 0x%x frame, stream=0x%x, flags=0x%x\n", frametype, sid, frameflags); /* If status is EOF then the frame doesn't have/declare a body */ @@ -1611,7 +1610,7 @@ serf_http2__enqueue_stream_reset(serf_ht serf__bucket_http2_frame_create(bkt, HTTP2_FRAME_TYPE_RST_STREAM, 0, &streamid, NULL, NULL, - h2->lr_max_framesize, NULL, NULL, + h2->lr_max_framesize, h2->allocator), TRUE); } Modified: serf/trunk/protocols/http2_stream.c URL: http://svn.apache.org/viewvc/serf/trunk/protocols/http2_stream.c?rev=1712557&r1=1712556&r2=1712557&view=diff ============================================================================== --- serf/trunk/protocols/http2_stream.c (original) +++ serf/trunk/protocols/http2_stream.c Wed Nov 4 14:43:01 2015 @@ -139,7 +139,7 @@ serf_http2__stream_setup_next_request(se serf_http2__allocate_stream_id, stream, HTTP2_DEFAULT_MAX_FRAMESIZE, - NULL, NULL, request->allocator); + request->allocator); serf_http2__enqueue_frame(stream->h2, hpack, TRUE); Modified: serf/trunk/serf_private.h URL: http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1712557&r1=1712556&r2=1712557&view=diff ============================================================================== --- serf/trunk/serf_private.h (original) +++ serf/trunk/serf_private.h Wed Nov 4 14:43:01 2015 @@ -439,6 +439,13 @@ struct serf_connection_t { /*** Internal bucket functions ***/ +/* Copies all data contained in vecs to *data, optionally telling how much was + copied */ +void serf__copy_iovec(char *data, + apr_size_t *copied, + struct iovec *vecs, + int vecs_used); + /** Transform a response_bucket in-place into an aggregate bucket. Restore the status line and all headers, not just the body. Modified: serf/trunk/test/test_buckets.c URL: http://svn.apache.org/viewvc/serf/trunk/test/test_buckets.c?rev=1712557&r1=1712556&r2=1712557&view=diff ============================================================================== --- serf/trunk/test/test_buckets.c (original) +++ serf/trunk/test/test_buckets.c Wed Nov 4 14:43:01 2015 @@ -2483,7 +2483,7 @@ static void test_http2_frame_bucket_basi body_in = SERF_BUCKET_SIMPLE_STRING("This is no config!", alloc); frame_in = serf__bucket_http2_frame_create(body_in, 99, 7, &exp_streamid, NULL, NULL, - 16384, NULL, NULL, alloc); + 16384, alloc); frame_out = serf__bucket_http2_unframe_create(frame_in, 16384, alloc); read_and_check_bucket(tc, frame_out, "This is no config!"); @@ -2532,6 +2532,7 @@ CuSuite *test_buckets(void) SUITE_ADD_TEST(suite, test_response_bucket_peek_at_headers); SUITE_ADD_TEST(suite, test_response_bucket_iis_status_code); SUITE_ADD_TEST(suite, test_response_bucket_no_reason); + SUITE_ADD_TEST(suite, test_response_continue); SUITE_ADD_TEST(suite, test_bucket_header_set); SUITE_ADD_TEST(suite, test_bucket_header_do); SUITE_ADD_TEST(suite, test_iovec_buckets);