Author: rhuijben Date: Fri Oct 30 00:40:16 2015 New Revision: 1711394 URL: http://svn.apache.org/viewvc?rev=1711394&view=rev Log: Complete the implementation of the copy buckets as noted in the TODO comments.
* buckets/copy_buckets.c (serf_copy_read): Add implementation. (serf_copy_read_iovec): Add the missing pieces. (serf_copy_get_remaining): New function. (serf_copy_destroy): Destroy hold buffer. (serf_bucket_type_copy): Add get_remaining. * test/test_buckets.c (discard_data): Move helper function here. (test_serf_default_read_iovec): Rename this function to... (test_copy_bucket): ... this. Remove '#if 0' and extend. (discard_data): Move up. (test_buckets): Re-add item in order. Modified: serf/trunk/buckets/copy_buckets.c serf/trunk/test/test_buckets.c Modified: serf/trunk/buckets/copy_buckets.c URL: http://svn.apache.org/viewvc/serf/trunk/buckets/copy_buckets.c?rev=1711394&r1=1711393&r2=1711394&view=diff ============================================================================== --- serf/trunk/buckets/copy_buckets.c (original) +++ serf/trunk/buckets/copy_buckets.c Fri Oct 30 00:40:16 2015 @@ -62,12 +62,60 @@ static apr_status_t serf_copy_read(serf_ apr_size_t requested, const char **data, apr_size_t *len) { - /* ### peek to see how much is easily available. if it is MIN_SIZE, - ### then a read() would (likely) get that same amount. otherwise, - ### we should read an iovec and concatenate the result. */ + copy_context_t *ctx = bucket->data; + apr_status_t status; + const char *wdata; + apr_size_t peek_len; + struct iovec vecs[16]; + int vecs_used; + apr_size_t fetched; + + status = serf_bucket_peek(ctx->wrapped, &wdata, &peek_len); + + if (SERF_BUCKET_READ_ERROR(status)) { + *len = 0; + return status; + } + + /* Can we just return the peeked result? */ + if (status || requested <= peek_len || ctx->min_size <= peek_len) { - /* ### fix this return code */ - return APR_SUCCESS; + return serf_bucket_read(ctx->wrapped, requested, data, len); + } + + if (! ctx->hold_buf) + ctx->hold_buf = serf_bucket_mem_alloc(bucket->allocator, + ctx->min_size); + + /* Reduce requested to fit in our buffer if necessary*/ + if (requested > ctx->min_size) + requested = ctx->min_size; + + fetched = 0; + while (fetched < requested) { + int i; + status = serf_bucket_read_iovec(ctx->wrapped, requested - fetched, + 16, vecs, &vecs_used); + + if (SERF_BUCKET_READ_ERROR(status)) { + if (fetched > 0) + status = APR_EAGAIN; + break; + } + + for (i = 0; i < vecs_used; i++) { + memcpy(ctx->hold_buf + fetched, vecs[i].iov_base, vecs[i].iov_len); + fetched += vecs[i].iov_len; + } + + if (status) + break; + } + + *data = ctx->hold_buf; + *len = fetched; + + return status; } @@ -93,9 +141,26 @@ static apr_status_t serf_copy_read_iovec copy_context_t *ctx = bucket->data; apr_status_t status; apr_size_t total; + apr_size_t fetched; int i; - status = serf_bucket_read_iovec(bucket, requested, + /* If somebody really wants to call us for 1 iovec, call the function + that already implements the copying for this */ + if (vecs_size == 1) { + *vecs_used = 1; + const char *data; + apr_size_t len; + + status = serf_copy_read(bucket, requested, &data, &len); + + vecs[0].iov_base = (void*)data; + vecs[0].iov_len = len; + *vecs_used = 1; + + return status; + } + + status = serf_bucket_read_iovec(ctx->wrapped, requested, vecs_size, vecs, vecs_used); /* There are four possible results: @@ -125,12 +190,49 @@ static apr_status_t serf_copy_read_iovec /* Copy what we have into our buffer. Then continue reading to get at least MIN_SIZE or REQUESTED bytes of data. */ + if (! ctx->hold_buf) + ctx->hold_buf = serf_bucket_mem_alloc(bucket->allocator, + ctx->min_size); /* ### copy into HOLD_BUF. then read/append some more. */ + fetched = 0; + for (i = 0; i < *vecs_used; i++) { + memcpy(ctx->hold_buf + fetched, vecs[i].iov_base, vecs[i].iov_len); + fetched += vecs[i].iov_len; + } + /* ### point vecs[0] at HOLD_BUF. */ + vecs[0].iov_base = ctx->hold_buf; + vecs[0].iov_len = fetched; - return status; + while (TRUE) { + int v_used; + + status = serf_bucket_read_iovec(ctx->wrapped, requested - fetched, + vecs_size - 1, &vecs[1], &v_used); + + if (SERF_BUCKET_READ_ERROR(status)) { + *vecs_used = 1; + return APR_EAGAIN; + } + + for (i = 1; i <= v_used; i++) + total += vecs[i].iov_len; + + if (status || total >= ctx->min_size || total == requested) { + *vecs_used = v_used + 1; + return status; + } + + for (i = 1; i <= v_used; i++) { + memcpy(ctx->hold_buf + fetched, vecs[i].iov_base, + vecs[i].iov_len); + fetched += vecs[i].iov_len; + } + + vecs[0].iov_len = fetched; + } } @@ -168,12 +270,20 @@ static apr_status_t serf_copy_peek(serf_ return serf_bucket_peek(ctx->wrapped, data, len); } +static apr_uint64_t serf_copy_get_remaining(serf_bucket_t *bucket) +{ + copy_context_t *ctx = bucket->data; + + return serf_bucket_get_remaining(ctx->wrapped); +} + static void serf_copy_destroy(serf_bucket_t *bucket) { -/* copy_context_t *ctx = bucket->data;*/ + copy_context_t *ctx = bucket->data; - /* ### kill the HOLD_BUF */ + if (ctx->hold_buf) + serf_bucket_mem_free(bucket->allocator, ctx->hold_buf); serf_default_destroy_and_data(bucket); } @@ -198,6 +308,6 @@ const serf_bucket_type_t serf_bucket_typ serf_copy_peek, serf_copy_destroy, serf_copy_read_bucket, - serf_default_get_remaining, + serf_copy_get_remaining, serf_copy_set_config, }; Modified: serf/trunk/test/test_buckets.c URL: http://svn.apache.org/viewvc/serf/trunk/test/test_buckets.c?rev=1711394&r1=1711393&r2=1711394&view=diff ============================================================================== --- serf/trunk/test/test_buckets.c (original) +++ serf/trunk/test/test_buckets.c Fri Oct 30 00:40:16 2015 @@ -161,6 +161,28 @@ void readlines_and_check_bucket(CuTest * CuAssert(tc, "Read less data than expected.", strlen(expected) == 0); } +static apr_status_t discard_data(serf_bucket_t *bkt, + apr_size_t *read_len) +{ + const char *data; + apr_size_t data_len; + apr_status_t status; + apr_size_t read; + + read = 0; + + do + { + status = serf_bucket_read(bkt, SERF_READ_ALL_AVAIL, &data, &data_len); + + if (!SERF_BUCKET_READ_ERROR(status)) { + read += data_len; + } + } while(status == APR_SUCCESS); + + *read_len = read; + return status; +} /******************************** TEST CASES **********************************/ @@ -981,20 +1003,18 @@ static void test_response_bucket_peek_at } #undef EXP_RESPONSE -/* ### this test is useful, but needs to switch to the new COPY bucket - ### to test the behavior. */ -#if 0 - /* Test that the internal function serf_default_read_iovec, used by many bucket types, groups multiple buffers in one iovec. */ -static void test_serf_default_read_iovec(CuTest *tc) +static void test_copy_bucket(CuTest *tc) { test_baton_t *tb = tc->testBaton; apr_status_t status; - serf_bucket_t *bkt, *aggbkt; - struct iovec tgt_vecs[32]; + serf_bucket_t *bkt, *aggbkt, *copybkt; + struct iovec tgt_vecs[2]; int vecs_used, i; apr_size_t actual_len = 0; + const char *data; + apr_size_t len; serf_bucket_alloc_t *alloc = serf_bucket_allocator_create(tb->pool, NULL, NULL); @@ -1005,6 +1025,7 @@ static void test_serf_default_read_iovec /* Test 1: multiple children, should be read in one iovec. */ aggbkt = serf_bucket_aggregate_create(alloc); + copybkt = serf_bucket_copy_create(aggbkt, 1024, alloc); bkt = SERF_BUCKET_SIMPLE_STRING_LEN(BODY, 20, alloc); serf_bucket_aggregate_append(aggbkt, bkt); @@ -1013,15 +1034,61 @@ static void test_serf_default_read_iovec bkt = SERF_BUCKET_SIMPLE_STRING_LEN(BODY+40, strlen(BODY)-40, alloc); serf_bucket_aggregate_append(aggbkt, bkt); - status = serf_default_read_iovec(aggbkt, SERF_READ_ALL_AVAIL, 32, tgt_vecs, - &vecs_used); + CuAssertIntEquals(tc, strlen(BODY), + (int)serf_bucket_get_remaining(aggbkt)); + CuAssertIntEquals(tc, strlen(BODY), + (int)serf_bucket_get_remaining(copybkt)); + + /* When < min_size, everything should be read in one go */ + status = serf_bucket_read(copybkt, SERF_READ_ALL_AVAIL, &data, &len); + CuAssertIntEquals(tc, APR_EOF, status); + CuAssertIntEquals(tc, strlen(BODY), len); + + + /* Just reuse the existing aggbkt. Fill again */ + copybkt = serf_bucket_copy_create(aggbkt, 35, alloc); + bkt = SERF_BUCKET_SIMPLE_STRING_LEN(BODY, 20, alloc); + serf_bucket_aggregate_append(aggbkt, bkt); + bkt = SERF_BUCKET_SIMPLE_STRING_LEN(BODY + 20, 20, alloc); + serf_bucket_aggregate_append(aggbkt, bkt); + bkt = SERF_BUCKET_SIMPLE_STRING_LEN(BODY + 40, strlen(BODY) - 40, alloc); + serf_bucket_aggregate_append(aggbkt, bkt); + + CuAssertIntEquals(tc, strlen(BODY), + (int)serf_bucket_get_remaining(copybkt)); + + /* When, requesting more than min_size, but more than in the first chunk + we will get min_size */ + status = serf_bucket_read(copybkt, SERF_READ_ALL_AVAIL, &data, &len); + CuAssertIntEquals(tc, APR_SUCCESS, status); + CuAssertIntEquals(tc, 35, len); + + /* We can read the rest */ + CuAssertIntEquals(tc, APR_EOF, discard_data(copybkt, &len)); + CuAssertIntEquals(tc, strlen(BODY) - 35, len); + + /* Just reuse the existing aggbkt. Fill again */ + copybkt = serf_bucket_copy_create(aggbkt, 45, alloc); + bkt = SERF_BUCKET_SIMPLE_STRING_LEN(BODY, 20, alloc); + serf_bucket_aggregate_append(aggbkt, bkt); + bkt = SERF_BUCKET_SIMPLE_STRING_LEN(BODY + 20, 20, alloc); + serf_bucket_aggregate_append(aggbkt, bkt); + bkt = SERF_BUCKET_SIMPLE_STRING_LEN(BODY + 40, strlen(BODY) - 40, alloc); + serf_bucket_aggregate_append(aggbkt, bkt); + + CuAssertIntEquals(tc, strlen(BODY), + (int)serf_bucket_get_remaining(copybkt)); + + /* Now test if we can read everything as two vecs */ + status = serf_bucket_read_iovec(copybkt, SERF_READ_ALL_AVAIL, + 2, tgt_vecs, &vecs_used); + CuAssertIntEquals(tc, APR_EOF, status); for (i = 0; i < vecs_used; i++) actual_len += tgt_vecs[i].iov_len; CuAssertIntEquals(tc, strlen(BODY), actual_len); } -#endif /* Test that serf doesn't hang in an endless loop when a linebuf is in split-CRLF state. */ @@ -1537,29 +1604,6 @@ static void test_deflate_buckets(CuTest apr_pool_destroy(iterpool); } -static apr_status_t discard_data(serf_bucket_t *bkt, - apr_size_t *read_len) -{ - const char *data; - apr_size_t data_len; - apr_status_t status; - apr_size_t read; - - read = 0; - - do - { - status = serf_bucket_read(bkt, SERF_READ_ALL_AVAIL, &data, &data_len); - - if (!SERF_BUCKET_READ_ERROR(status)) { - read += data_len; - } - } while(status == APR_SUCCESS); - - *read_len = read; - return status; -} - static apr_status_t hold_open(void *baton, serf_bucket_t *aggbkt) { test_baton_t *tb = baton; @@ -2176,8 +2220,6 @@ static void test_http2_frame_bucket_basi { test_baton_t *tb = tc->testBaton; serf_bucket_alloc_t *alloc; - serf_bucket_t *hpack; - apr_size_t sz; serf_bucket_t *body_in; serf_bucket_t *frame_in; serf_bucket_t *frame_out; @@ -2238,9 +2280,11 @@ CuSuite *test_buckets(void) SUITE_ADD_TEST(suite, test_aggregate_buckets); SUITE_ADD_TEST(suite, test_aggregate_bucket_readline); SUITE_ADD_TEST(suite, test_header_buckets); + SUITE_ADD_TEST(suite, test_copy_bucket); SUITE_ADD_TEST(suite, test_linebuf_crlf_split); SUITE_ADD_TEST(suite, test_response_no_body_expected); SUITE_ADD_TEST(suite, test_random_eagain_in_response); + SUITE_ADD_TEST(suite, test_linebuf_fetch_crlf); SUITE_ADD_TEST(suite, test_dechunk_buckets); SUITE_ADD_TEST(suite, test_deflate_buckets); SUITE_ADD_TEST(suite, test_http2_unframe_buckets); @@ -2255,11 +2299,7 @@ CuSuite *test_buckets(void) SUITE_ADD_TEST(suite, test_deflate_4GBplus_buckets); #endif -#if 0 - SUITE_ADD_TEST(suite, test_serf_default_read_iovec); -#endif - - SUITE_ADD_TEST(suite, test_linebuf_fetch_crlf); + return suite; }