Hi,
Currently, some of the buckets (HTTP/2 and Brotli) use the callback set
with serf_bucket_aggregate_hold_open() to refill the contents on the fly.
However, read(), read_iovec(), readline() and peek() implementations for
such aggregate buckets will sometimes ignore the fact that the bucket
has been refilled in this way and thus, are going to generate unnecessary
empty reads.
This patch tweaks how the aggregate buckets behave after calling hold_open()
and eliminates such unwanted empty reads where it's possible.
Log message:
[[[
Avoid unwanted empty reads after an aggregate bucket has been refilled
with the hold_open() callback.
Some of the buckets (HTTP/2 and Brotli) use the hold_open() callback
to refill the contents on the fly, and it would be better if they could
return the new data immediately, instead of artificially delaying it
until the next read.
* buckets/aggregate_buckets.c
(read_aggregate, serf_aggregate_readline, serf_aggregate_peek):
Check if the hold_open() callback has refilled the bucket list,
continue if it did that with an APR_SUCCESS.
* test/test_buckets.c
(test_hold_open_action_t, test_hold_open_context_t, test_hold_open,
create_test_hold_open_context): New helpers that allow mocking the
behavior of a hold_open() callback.
(test_aggregate_bucket_hold_open_read_iovec,
test_aggregate_bucket_hold_open_readline,
test_aggregate_bucket_hold_open_peek_eagain): New tests.
(test_buckets): Run new tests.
Patch by: Evgeny Kotkov <evgeny.kotkov{_AT_}visualsvn.com>
]]]
Regards,
Evgeny Kotkov
Index: buckets/aggregate_buckets.c
===================================================================
--- buckets/aggregate_buckets.c (revision 1787257)
+++ buckets/aggregate_buckets.c (working copy)
@@ -228,7 +228,9 @@ static apr_status_t read_aggregate(serf_bucket_t *
if (!ctx->list) {
if (ctx->hold_open) {
- return ctx->hold_open(ctx->hold_open_baton, bucket);
+ status = ctx->hold_open(ctx->hold_open_baton, bucket);
+ if (status || !ctx->list)
+ return status;
}
else {
return APR_EOF;
@@ -385,7 +387,9 @@ static apr_status_t serf_aggregate_readline(serf_b
if (!ctx->list) {
if (ctx->hold_open) {
- return ctx->hold_open(ctx->hold_open_baton, bucket);
+ status = ctx->hold_open(ctx->hold_open_baton, bucket);
+ if (status || !ctx->list)
+ return status;
}
else {
return APR_EOF;
@@ -408,12 +412,15 @@ static apr_status_t serf_aggregate_readline(serf_b
ctx->done = ctx->list;
ctx->list = next_list;
- /* If we have no more in our list, return EOF. */
+ /* If we have no more in our list, return EOF... */
if (!ctx->list) {
ctx->last = NULL;
if (ctx->hold_open) {
- return ctx->hold_open(ctx->hold_open_baton, bucket);
+ status = ctx->hold_open(ctx->hold_open_baton, bucket);
+ if (status || !ctx->list)
+ return status;
+ /* ...but continue if hold_open() refilled the list. */
}
else {
return APR_EOF;
@@ -447,8 +454,9 @@ static apr_status_t serf_aggregate_peek(serf_bucke
if (ctx->hold_open) {
status = ctx->hold_open(ctx->hold_open_baton, bucket);
if (APR_STATUS_IS_EAGAIN(status))
- status = APR_SUCCESS;
- return status;
+ return APR_SUCCESS;
+ else if (status || !ctx->list)
+ return status;
}
else {
return APR_EOF;
Index: test/test_buckets.c
===================================================================
--- test/test_buckets.c (revision 1787257)
+++ test/test_buckets.c (working copy)
@@ -928,6 +928,174 @@ static void test_aggregate_bucket_readline(CuTest
serf_bucket_destroy(aggbkt);
}
+typedef struct test_hold_open_action_t {
+ const char *append_str;
+ apr_status_t status;
+} test_hold_open_action_t;
+
+typedef struct test_hold_open_context_t {
+ test_hold_open_action_t *actions;
+ int len;
+ int current_action;
+} test_hold_open_context_t;
+
+static apr_status_t test_hold_open(void *baton, serf_bucket_t *aggbkt)
+{
+ test_hold_open_context_t *ctx = baton;
+
+ if (ctx->current_action < ctx->len) {
+ test_hold_open_action_t *action = &ctx->actions[ctx->current_action];
+
+ if (action->append_str) {
+ serf_bucket_t *bkt = SERF_BUCKET_SIMPLE_STRING(action->append_str,
+ aggbkt->allocator);
+ serf_bucket_aggregate_append(aggbkt, bkt);
+ }
+ ctx->current_action++;
+ return action->status;
+ }
+ else {
+ return REPORT_TEST_SUITE_ERROR();
+ }
+}
+
+static test_hold_open_context_t *
+create_test_hold_open_context(test_hold_open_action_t *actions, int len,
+ serf_bucket_alloc_t *alloc)
+{
+ test_hold_open_context_t *ctx = serf_bucket_mem_calloc(alloc,
sizeof(*ctx));
+
+ ctx->actions = actions;
+ ctx->len = len;
+ ctx->current_action = 0;
+
+ return ctx;
+}
+
+static void test_aggregate_bucket_hold_open_read_iovec(CuTest *tc)
+{
+ test_baton_t *tb = tc->testBaton;
+ serf_bucket_alloc_t *alloc = test__create_bucket_allocator(tc, tb->pool);
+ test_hold_open_context_t *ctx;
+ serf_bucket_t *aggbkt;
+ apr_status_t status;
+ struct iovec vecs[3];
+ struct iovec *vec;
+ int vecs_used;
+ test_hold_open_action_t actions[] = {
+ { "a", APR_SUCCESS },
+ { "", APR_SUCCESS },
+ { "b", APR_SUCCESS },
+ { "c", APR_SUCCESS },
+ { NULL, APR_EOF }
+ };
+
+ aggbkt = serf_bucket_aggregate_create(alloc);
+ ctx = create_test_hold_open_context(actions,
+ sizeof(actions) / sizeof(actions[0]),
+ alloc);
+ serf_bucket_aggregate_hold_open(aggbkt, test_hold_open, ctx);
+
+ /* See if we can read everything as three vecs. */
+ status = serf_bucket_read_iovec(aggbkt, SERF_READ_ALL_AVAIL, 3,
+ vecs, &vecs_used);
+ CuAssertIntEquals(tc, APR_EOF, status);
+ CuAssertIntEquals(tc, 3, vecs_used);
+
+ vec = &vecs[0];
+ CuAssertIntEquals(tc, 1, (int)vec->iov_len);
+ CuAssert(tc, vec->iov_base, strncmp("a", vec->iov_base, vec->iov_len) ==
0);
+ vec = &vecs[1];
+ CuAssertIntEquals(tc, 1, (int)vec->iov_len);
+ CuAssert(tc, vec->iov_base, strncmp("b", vec->iov_base, vec->iov_len) ==
0);
+ vec = &vecs[2];
+ CuAssertIntEquals(tc, 1, (int)vec->iov_len);
+ CuAssert(tc, vec->iov_base, strncmp("c", vec->iov_base, vec->iov_len) ==
0);
+
+ serf_bucket_destroy(aggbkt);
+}
+
+static void test_aggregate_bucket_hold_open_readline(CuTest *tc)
+{
+ test_baton_t *tb = tc->testBaton;
+ serf_bucket_alloc_t *alloc = test__create_bucket_allocator(tc, tb->pool);
+ test_hold_open_context_t *ctx;
+ serf_bucket_t *aggbkt;
+ apr_status_t status;
+ int found;
+ const char *data;
+ apr_size_t len;
+ test_hold_open_action_t actions[] = {
+ { "a" CRLF, APR_SUCCESS },
+ { "", APR_SUCCESS },
+ { "b", APR_SUCCESS },
+ { CRLF, APR_SUCCESS },
+ { NULL, APR_EOF }
+ };
+
+ aggbkt = serf_bucket_aggregate_create(alloc);
+ ctx = create_test_hold_open_context(actions,
+ sizeof(actions) / sizeof(actions[0]),
+ alloc);
+ serf_bucket_aggregate_hold_open(aggbkt, test_hold_open, ctx);
+
+ status = serf_bucket_readline(aggbkt, SERF_NEWLINE_ANY, &found,
+ &data, &len);
+ CuAssertIntEquals(tc, APR_SUCCESS, status);
+ CuAssertIntEquals(tc, SERF_NEWLINE_CRLF, found);
+ CuAssertIntEquals(tc, 3, (int)len);
+ CuAssert(tc, data, strncmp("a" CRLF, data, len) == 0);
+
+ status = serf_bucket_readline(aggbkt, SERF_NEWLINE_ANY, &found,
+ &data, &len);
+ CuAssertIntEquals(tc, APR_SUCCESS, status);
+ CuAssertIntEquals(tc, SERF_NEWLINE_NONE, found);
+ CuAssertIntEquals(tc, 1, (int)len);
+ CuAssert(tc, data, strncmp("b", data, len) == 0);
+
+ status = serf_bucket_readline(aggbkt, SERF_NEWLINE_ANY, &found,
+ &data, &len);
+ CuAssertIntEquals(tc, APR_EOF, status);
+ CuAssertIntEquals(tc, SERF_NEWLINE_CRLF, found);
+ CuAssertIntEquals(tc, 2, (int)len);
+ CuAssert(tc, data, strncmp(CRLF, data, len) == 0);
+
+ serf_bucket_destroy(aggbkt);
+}
+
+static void test_aggregate_bucket_hold_open_peek_eagain(CuTest *tc)
+{
+ test_baton_t *tb = tc->testBaton;
+ serf_bucket_alloc_t *alloc = test__create_bucket_allocator(tc, tb->pool);
+ test_hold_open_context_t *ctx;
+ serf_bucket_t *aggbkt;
+ apr_status_t status;
+ const char *data;
+ apr_size_t len;
+ test_hold_open_action_t actions[] = {
+ { NULL, APR_EAGAIN },
+ { "abc", APR_SUCCESS },
+ { NULL, APR_EOF }
+ };
+
+ aggbkt = serf_bucket_aggregate_create(alloc);
+ ctx = create_test_hold_open_context(actions,
+ sizeof(actions) / sizeof(actions[0]),
+ alloc);
+ serf_bucket_aggregate_hold_open(aggbkt, test_hold_open, ctx);
+
+ status = serf_bucket_peek(aggbkt, &data, &len);
+ CuAssertIntEquals(tc, APR_SUCCESS, status);
+ CuAssertIntEquals(tc, 0, (int)len);
+
+ status = serf_bucket_peek(aggbkt, &data, &len);
+ CuAssertIntEquals(tc, APR_EOF, status);
+ CuAssertIntEquals(tc, 3, (int)len);
+ CuAssert(tc, data, strncmp("abc", data, len) == 0);
+
+ serf_bucket_destroy(aggbkt);
+}
+
/* Test for issue: the server aborts the connection in the middle of
streaming the body of the response, where the length was set with the
Content-Length header. Test that we get a decent error code from the
@@ -3193,6 +3361,9 @@ CuSuite *test_buckets(void)
SUITE_ADD_TEST(suite, test_iovec_buckets);
SUITE_ADD_TEST(suite, test_aggregate_buckets);
SUITE_ADD_TEST(suite, test_aggregate_bucket_readline);
+ SUITE_ADD_TEST(suite, test_aggregate_bucket_hold_open_read_iovec);
+ SUITE_ADD_TEST(suite, test_aggregate_bucket_hold_open_readline);
+ SUITE_ADD_TEST(suite, test_aggregate_bucket_hold_open_peek_eagain);
SUITE_ADD_TEST(suite, test_header_buckets);
SUITE_ADD_TEST(suite, test_copy_bucket);
SUITE_ADD_TEST(suite, test_linebuf_crlf_split);