Author: rhuijben
Date: Fri Nov 13 18:12:12 2015
New Revision: 1714246

URL: http://svn.apache.org/viewvc?rev=1714246&view=rev
Log:
Implement split buckets as a real fix for the data lifetime issues
I uncovered in r1713435, by abusing the limit bucket.

The previous fix did solve the symptoms for the test, while it still
kept iovecs pointing to data of buckets that were read from again.
We didn't see that in the tests as simple buckets pointing towards
static data don't test the bucket assumption that data can go away
at the next read.

My primary usecase is that we have a request body bucket, that we need
to forward in http2 data frames of a certain maximum size (while also
sticking to window limits). These split buckets allow putting the first
part in the write queue while keeping the tail for a later packet.

* buckets/aggregate_buckets.c
  (read_aggregate): When a iovec_read returns success, but no iovec items
    stop trying to read again, but return what we have now. On the next
    read the head (that filled the previous items in the iovec) will be
    destroyed, so the tail can continue.

* buckets/limit_buckets.c
  (serf_limit_read,
   serf_limit_readline,
   serf_limit_read_iovec): Return new, slightly more generic error code.

* buckets/split_buckets.c
  New file.

* serf.h
  (SERF_ERROR_TRUNCATED_STREAM): New error.

* serf_bucket_types.h
  (serf_bucket_split_create): New function.

* test/test_buckets.c
  (test_limit_buckets): Extract invalid usecase into...
  (test_split_buckets): ... this new function, testing the split buckets.

Added:
    serf/trunk/buckets/split_buckets.c   (with props)
Modified:
    serf/trunk/buckets/aggregate_buckets.c
    serf/trunk/buckets/limit_buckets.c
    serf/trunk/serf.h
    serf/trunk/serf_bucket_types.h
    serf/trunk/test/test_buckets.c

Modified: serf/trunk/buckets/aggregate_buckets.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/buckets/aggregate_buckets.c?rev=1714246&r1=1714245&r2=1714246&view=diff
==============================================================================
--- serf/trunk/buckets/aggregate_buckets.c (original)
+++ serf/trunk/buckets/aggregate_buckets.c Fri Nov 13 18:12:12 2015
@@ -319,6 +319,16 @@ static apr_status_t read_aggregate(serf_
                 return APR_SUCCESS;
             }
         }
+        else if (!status) {
+            /* Success and no data. Let's return what we have.
+               Better luck next time.
+
+               This scenario is triggered by test_split_buckets(),
+               in a case where EAGAIN is really not what we want.
+             */
+
+            return APR_SUCCESS;
+        }
     }
 
     return status;

Modified: serf/trunk/buckets/limit_buckets.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/buckets/limit_buckets.c?rev=1714246&r1=1714245&r2=1714246&view=diff
==============================================================================
--- serf/trunk/buckets/limit_buckets.c (original)
+++ serf/trunk/buckets/limit_buckets.c Fri Nov 13 18:12:12 2015
@@ -68,7 +68,7 @@ static apr_status_t serf_limit_read(serf
             status = APR_EOF;
         }
         else if (APR_STATUS_IS_EOF(status) && ctx->remaining) {
-            status = SERF_ERROR_TRUNCATED_HTTP_RESPONSE;
+            status = SERF_ERROR_TRUNCATED_STREAM;
         }
     }
 
@@ -106,7 +106,7 @@ static apr_status_t serf_limit_readline(
             status = APR_EOF;
         }
         else if (APR_STATUS_IS_EOF(status) && ctx->remaining) {
-            status = SERF_ERROR_TRUNCATED_HTTP_RESPONSE;
+            status = SERF_ERROR_TRUNCATED_STREAM;
         }
     }
 
@@ -147,7 +147,7 @@ static apr_status_t serf_limit_read_iove
           status = APR_EOF;
       }
       else if (APR_STATUS_IS_EOF(status) && ctx->remaining) {
-          status = SERF_ERROR_TRUNCATED_HTTP_RESPONSE;
+          status = SERF_ERROR_TRUNCATED_STREAM;
       }
   }
 

Added: serf/trunk/buckets/split_buckets.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/buckets/split_buckets.c?rev=1714246&view=auto
==============================================================================
--- serf/trunk/buckets/split_buckets.c (added)
+++ serf/trunk/buckets/split_buckets.c Fri Nov 13 18:12:12 2015
@@ -0,0 +1,432 @@
+/* ====================================================================
+ *    Licensed to the Apache Software Foundation (ASF) under one
+ *    or more contributor license agreements.  See the NOTICE file
+ *    distributed with this work for additional information
+ *    regarding copyright ownership.  The ASF licenses this file
+ *    to you under the Apache License, Version 2.0 (the
+ *    "License"); you may not use this file except in compliance
+ *    with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing,
+ *    software distributed under the License is distributed on an
+ *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *    KIND, either express or implied.  See the License for the
+ *    specific language governing permissions and limitations
+ *    under the License.
+ * ====================================================================
+ */
+
+#include <apr_pools.h>
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+#include "serf_private.h"
+
+typedef struct split_context_t {
+    serf_bucket_t *stream;
+    struct split_stream_ctx_t *head, *tail;
+
+    bool want_size;
+} split_context_t;
+
+typedef struct split_stream_ctx_t {
+    split_context_t *ctx;
+
+    apr_size_t read_size;
+    apr_size_t fixed_size;
+    apr_size_t min_size;
+    apr_size_t max_size;
+    apr_uint64_t tail_size;
+
+    struct split_stream_ctx_t *prev, *next;
+
+    bool at_eof, cant_read;
+} split_stream_ctx_t;
+
+static void split_detach_head(split_stream_ctx_t *sctx)
+{
+    split_context_t *ctx = sctx->ctx;
+
+    if (!ctx || ctx->head != sctx)
+        return;
+
+    if (ctx->tail != sctx) {
+        /* We can detach now */
+        ctx->head = sctx->next;
+        sctx->next->prev = NULL;
+
+        /* Did somebody ask for the size while it wasn't possible? 
+           Perhaps we can retrieve and store it now */
+        if (ctx->want_size) {
+            ctx->want_size = false;
+            ctx->head->tail_size = serf_bucket_get_remaining(ctx->stream);
+
+            if (ctx->head->tail_size != SERF_LENGTH_UNKNOWN)
+                ctx->head->tail_size -= ctx->head->read_size;
+        }
+        else if (sctx->tail_size != SERF_LENGTH_UNKNOWN) {
+
+            /* If we have a cached total size, move it to the new head */
+            ctx->head->tail_size = sctx->tail_size - sctx->read_size
+                                                   + ctx->head->read_size;
+        }
+    }
+    else {
+        serf_bucket_t *stream = ctx->stream;
+
+        serf_bucket_mem_free(stream->allocator, ctx);
+        serf_bucket_destroy(stream);
+    }
+
+    sctx->prev = sctx->next = NULL;
+    sctx->ctx = NULL;
+}
+
+static apr_status_t serf_split_read(serf_bucket_t *bucket,
+                                    apr_size_t requested,
+                                    const char **data,
+                                    apr_size_t *len)
+{
+    split_stream_ctx_t *sctx = bucket->data;
+    split_context_t *ctx = sctx->ctx;
+    apr_status_t status;
+
+    if (! ctx || sctx->at_eof) {
+        split_detach_head(sctx);
+        *data = NULL;
+        *len = 0;
+        return APR_EOF;
+    }
+    else if (sctx->prev) {
+        /* Not the current head */
+        *data = NULL;
+        *len = 0;
+        if (sctx->prev->prev || !sctx->prev->at_eof)
+            return APR_EAGAIN; /* Not ready soon */
+
+        return APR_SUCCESS; /* Most likely ready at next read */
+    }
+
+    if (sctx->max_size != SERF_READ_ALL_AVAIL
+        && requested > (sctx->max_size - sctx->read_size))
+    {
+      requested = (sctx->max_size - sctx->read_size);
+    }
+
+    status = serf_bucket_read(ctx->stream, requested, data, len);
+
+    if (!SERF_BUCKET_READ_ERROR(status)) {
+        sctx->cant_read = (*len != 0);
+        sctx->read_size += *len;
+
+        if (sctx->read_size >= sctx->min_size) {
+            /* We read enough. Fix the final length now */
+            sctx->at_eof = true;
+            sctx->fixed_size = sctx->max_size = sctx->read_size;
+            status = APR_EOF;
+        }
+        else if (APR_STATUS_IS_EOF(status)
+                 && (sctx->fixed_size && sctx->read_size != sctx->fixed_size)) 
{
+
+            /* We promised more data via get_remaining() than we can deliver */
+            status = SERF_ERROR_TRUNCATED_STREAM;
+        }
+    }
+    else
+      sctx->cant_read = false;
+
+    return status;
+}
+
+static apr_status_t serf_split_read_iovec(serf_bucket_t *bucket,
+                                          apr_size_t requested,
+                                          int vecs_size,
+                                          struct iovec *vecs,
+                                          int *vecs_used)
+{
+    split_stream_ctx_t *sctx = bucket->data;
+    split_context_t *ctx = sctx->ctx;
+    apr_status_t status;
+
+    if (! ctx || sctx->at_eof) {
+        split_detach_head(sctx);
+        *vecs_used = 0;
+        return APR_EOF;
+    }
+    else if (sctx->prev) {
+        /* Not the current head */
+        *vecs_used = 0;
+        if (sctx->prev->prev || !sctx->prev->at_eof)
+            return APR_EAGAIN; /* Not ready soon */
+
+        return APR_SUCCESS; /* Most likely ready at next read */
+    }
+
+    if (sctx->max_size != SERF_READ_ALL_AVAIL
+        && requested > (sctx->max_size - sctx->read_size))
+    {
+        requested = (sctx->max_size - sctx->read_size);
+    }
+
+    status = serf_bucket_read_iovec(ctx->stream, requested, vecs_size,
+                                    vecs, vecs_used);
+
+    if (!SERF_BUCKET_READ_ERROR(status)) {
+        apr_size_t len = 0;
+        int i;
+
+        for (i = 0; i < *vecs_used; i++)
+            len += vecs[i].iov_len;
+
+        sctx->cant_read = (len != 0);
+        sctx->read_size += len;
+
+        if (sctx->read_size >= sctx->min_size) {
+            /* We read enough. Fix the final length now */
+            sctx->at_eof = true;
+            sctx->fixed_size = sctx->max_size = sctx->read_size;
+            status = APR_EOF;
+        }
+        else if (APR_STATUS_IS_EOF(status)) {
+            sctx->at_eof = TRUE;
+
+            if (sctx->fixed_size && sctx->read_size != sctx->fixed_size) {
+                /* We promised more data via get_remaining() than we can
+                   deliver. -> BAD get_remaining()  */
+                status = SERF_ERROR_TRUNCATED_STREAM;
+            }
+        }
+    }
+    else
+      sctx->cant_read = false;
+
+    return status;
+}
+
+static apr_status_t serf_split_peek(serf_bucket_t *bucket,
+                                    const char **data,
+                                    apr_size_t *len)
+{
+    split_stream_ctx_t *sctx = bucket->data;
+    split_context_t *ctx = sctx->ctx;
+    apr_status_t status;
+
+    if (! ctx || sctx->at_eof) {
+        split_detach_head(sctx);
+        *data = "";
+        *len = 0;
+        return APR_EOF;
+    }
+    else if (sctx->prev) {
+        /* Not the current head */
+        *data = "";
+        *len = 0;
+        if (sctx->prev->prev || !sctx->prev->at_eof)
+            return APR_EAGAIN; /* Not ready soon */
+
+        return APR_SUCCESS; /* Most likely ready at next read */
+    }
+
+    status = serf_bucket_peek(ctx->stream, data, len);
+
+    if (!SERF_BUCKET_READ_ERROR(status)
+        && (*len > (sctx->max_size - sctx->read_size)))
+    {
+        /* We can read to the max size. If we provide this
+           much data now, we must promise to return as much
+           later
+        */
+        sctx->min_size = sctx->fixed_size = sctx->max_size;
+        *len = (sctx->max_size - sctx->read_size);
+        sctx->cant_read = (*len > 0);
+    }
+    else
+        sctx->cant_read = false;
+
+    return status;
+}
+
+static apr_uint64_t serf_split_get_remaining(serf_bucket_t *bucket)
+{
+    split_stream_ctx_t *sctx = bucket->data;
+    split_context_t *ctx = sctx->ctx;
+    split_stream_ctx_t *head;
+    apr_uint64_t remaining;
+
+    if (!ctx) {
+        return 0; /* at eof */
+    }
+    else if (ctx->head == sctx) {
+        /* We are HEAD. We hereby unlock the data to allow reading the size */
+        sctx->cant_read = false;
+    }
+
+    if (sctx->fixed_size) {
+        return sctx->fixed_size - sctx->read_size; /* already calculated */
+    }
+
+    /* Do we know the total size? */
+    head = ctx->head;
+
+    if (head->tail_size == SERF_LENGTH_UNKNOWN) {
+        if (head->cant_read) {
+            /* Can't obtain the size without unlocking data*/
+            ctx->want_size = true;
+
+            return SERF_LENGTH_UNKNOWN;
+        }
+        head->tail_size = serf_bucket_get_remaining(ctx->stream);
+
+        if (head->tail_size == SERF_LENGTH_UNKNOWN)
+            return SERF_LENGTH_UNKNOWN;
+
+        /* Add what we already have to avoid updating on every read */
+        head->tail_size += head->read_size;
+    }
+
+    remaining = head->tail_size;
+    /* And now we fix the sizes of the buckets until we get to
+       the one we're interested in */
+    while (head) {
+
+        if (!head->fixed_size) {
+            /* Size not decided yet. Let's make this chunk as big
+               as allowed */
+            head->fixed_size = (remaining < head->max_size)
+                                ? (apr_size_t)remaining
+                                : head->max_size;
+
+            /* Disable dynamic sizing now */
+            head->min_size = head->max_size = head->fixed_size;
+        }
+
+        if (head == sctx) {
+            /* We got the information we need. Exit now to avoid
+               fixing the length of more buckets than needed */
+            return sctx->fixed_size - sctx->read_size;
+        }
+
+        remaining -= head->fixed_size;
+        head = head->next;
+    }
+
+    return SERF_LENGTH_UNKNOWN; /* Hit NULL before our bucket??? */
+}
+
+static void serf_split_destroy(serf_bucket_t *bucket)
+{
+    split_stream_ctx_t *sctx = bucket->data;
+    split_context_t *ctx = sctx->ctx;
+
+    /* Are we the current read bucket */
+    if (!sctx->prev) {
+        if (!sctx->at_eof && sctx->fixed_size) {
+            /* Auch, we promised to read a specific amount and
+                then didn't keep our promise...*/
+            serf__bucket_drain(bucket);
+        }
+
+        split_detach_head(sctx);
+    }
+    else {
+        /* We are destroyed before being read... should never happen,
+           unless the entire chain is destroyed */
+
+        split_stream_ctx_t *h = sctx->next;
+
+        /* We didn't read what we assumed to read. Fix calculations
+           if we can. All data will shift to tail. Let's hope nobody
+           tried to call get_remaining() on the final tail... */
+
+        while (h) {
+            h->tail_size = SERF_LENGTH_UNKNOWN;
+            h = h->next;
+        }
+
+        /* Remove ourself from list */
+        sctx->prev->next = sctx->next;
+        if (sctx->next)
+            sctx->next->prev = sctx->prev;
+        else
+            ctx->tail = sctx->prev;
+    }
+
+    serf_default_destroy_and_data(bucket);
+}
+
+static apr_status_t serf_split_set_config(serf_bucket_t *bucket,
+                                          serf_config_t *config)
+{
+    split_stream_ctx_t *sctx = bucket->data;
+    split_context_t *ctx = sctx->ctx;
+
+    if (ctx && !sctx->prev)
+        return serf_bucket_set_config(ctx->stream, config);
+
+    return APR_SUCCESS;
+}
+
+#define SERF_BUCKET_IS__SPLIT(b) SERF_BUCKET_CHECK((b), _split)
+static const serf_bucket_type_t serf_bucket_type__split =
+{
+  "SPLIT",
+  serf_split_read,
+  serf_default_readline,
+  serf_split_read_iovec,
+  serf_default_read_for_sendfile,
+  serf_buckets_are_v2,
+  serf_split_peek,
+  serf_split_destroy,
+  serf_default_read_bucket,
+  serf_split_get_remaining,
+  serf_split_set_config
+};
+
+void serf_bucket_split_create(serf_bucket_t **head,
+                              serf_bucket_t **tail,
+                              serf_bucket_t *stream,
+                              apr_size_t min_chunk_size,
+                              apr_size_t max_chunk_size)
+{
+  split_stream_ctx_t *tail_ctx, *head_ctx;
+  split_context_t *ctx;
+  serf_bucket_alloc_t *allocator = stream->allocator;
+
+  tail_ctx = serf_bucket_mem_calloc(allocator, sizeof(*tail_ctx));
+  tail_ctx->tail_size = SERF_LENGTH_UNKNOWN;
+
+  if (SERF_BUCKET_IS__SPLIT(stream)) {
+    ctx = stream->data;
+    *head = stream;
+
+    head_ctx = tail_ctx->prev = ctx->tail;
+    ctx->tail->next = tail_ctx;
+    ctx->tail = tail_ctx;
+  }
+  else {
+    ctx = serf_bucket_mem_calloc(allocator, sizeof(*ctx));
+    ctx->stream = stream;
+
+    head_ctx = serf_bucket_mem_calloc(allocator, sizeof(*head_ctx));
+    head_ctx->ctx = ctx;
+
+    ctx->tail = head_ctx->next = tail_ctx;
+    ctx->head = tail_ctx->prev = head_ctx;
+
+    *head = serf_bucket_create(&serf_bucket_type__split, allocator,
+                               head_ctx);
+  }
+
+  *tail = serf_bucket_create(&serf_bucket_type__split, allocator, tail_ctx);
+
+  tail_ctx->ctx = ctx;
+  /* head_ctx->fixed_size = 0; // Unknown */
+  head_ctx->min_size = min_chunk_size;
+  head_ctx->max_size = max_chunk_size;
+
+  /* tail_ctx->fixed_size = 0; // Unknown */
+  tail_ctx->min_size = SERF_READ_ALL_AVAIL;
+  tail_ctx->max_size = SERF_READ_ALL_AVAIL;
+}

Propchange: serf/trunk/buckets/split_buckets.c
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: serf/trunk/serf.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/serf.h?rev=1714246&r1=1714245&r2=1714246&view=diff
==============================================================================
--- serf/trunk/serf.h (original)
+++ serf/trunk/serf.h Fri Nov 13 18:12:12 2015
@@ -107,6 +107,8 @@ typedef struct serf_config_t serf_config
 #define SERF_ERROR_RESPONSE_HEADER_TOO_LONG (SERF_ERROR_START + 11)
 /* The connection to the server timed out. */
 #define SERF_ERROR_CONNECTION_TIMEDOUT (SERF_ERROR_START + 12)
+/* The stream returned less data than was expected */
+#define SERF_ERROR_TRUNCATED_STREAM (SERF_ERROR_START + 13)
 
 /* Http-2 stream errors, mapped into our error range */
 #define SERF_ERROR_HTTP2_NO_ERROR (SERF_ERROR_START + 50)

Modified: serf/trunk/serf_bucket_types.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/serf_bucket_types.h?rev=1714246&r1=1714245&r2=1714246&view=diff
==============================================================================
--- serf/trunk/serf_bucket_types.h (original)
+++ serf/trunk/serf_bucket_types.h Fri Nov 13 18:12:12 2015
@@ -783,6 +783,28 @@ serf_bucket_t *serf_bucket_prefix_create
 
 /* ==================================================================== */
 
+/* Creates two buckets, *HEAD and *TAIL, which together contain the output
+   of STREAM. If there is enough data in STREAM, HEAD will be a bucket of at
+   least MIN_CHUNK_SIZE and will never be larget than MAX_CHUNK_SIZE.
+
+   If STREAM is at EOF before MIN_CHUNK_SIZE, HEAD will contain the data,
+   while TAIL is immediately at EOF.
+
+   HEAD and TAIL will make sure that data read from TAIL will not break the
+   data availability promises on HEAD. Passing an existing tail of this
+   function as new stream may be handled specificaly, but the read promises
+   on all nodes ahead of stream will still hold.
+
+   HEAD and TAIL are allocated in STREAM->allocator. STREAM will be
+   destroyed when no longer referenced or after EOF.
+ */
+void serf_bucket_split_create(serf_bucket_t **head,
+                              serf_bucket_t **tail,
+                              serf_bucket_t *stream,
+                              apr_size_t min_chunk_size,
+                              apr_size_t max_chunk_size);
+
+
 /* ### do we need a PIPE bucket type? they are simple apr_file_t objects */
 
 

Modified: serf/trunk/test/test_buckets.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/test/test_buckets.c?rev=1714246&r1=1714245&r2=1714246&view=diff
==============================================================================
--- serf/trunk/test/test_buckets.c (original)
+++ serf/trunk/test/test_buckets.c Fri Nov 13 18:12:12 2015
@@ -2188,7 +2188,6 @@ static void test_limit_buckets(CuTest *t
   apr_size_t len;
   serf_bucket_t *raw;
   serf_bucket_t *limit;
-  serf_bucket_t *agg;
   apr_status_t status;
 
   /* The normal usecase */
@@ -2198,36 +2197,69 @@ static void test_limit_buckets(CuTest *t
   read_and_check_bucket(tc, raw, "NOPQRSTUVWXYZ");
   serf_bucket_destroy(limit);
 
+  /* What if there is not enough data? */
+  raw = SERF_BUCKET_SIMPLE_STRING("ABCDE", alloc);
+  limit = serf_bucket_limit_create(raw, 13, alloc);
+
+  status = read_all(limit, buffer, sizeof(buffer), &len);
+  CuAssertIntEquals(tc, SERF_ERROR_TRUNCATED_STREAM, status);
+  serf_bucket_destroy(limit);
+
+  {
+    const char *data;
+    int found;
+
+    raw = SERF_BUCKET_SIMPLE_STRING("ABCDEF\nGHIJKLMNOP", alloc);
+    limit = serf_bucket_limit_create(raw, 5, alloc);
+
+    CuAssertIntEquals(tc, APR_EOF,
+                      serf_bucket_readline(limit, SERF_NEWLINE_ANY, &found,
+                                           &data, &len));
+    CuAssertIntEquals(tc, SERF_NEWLINE_NONE, found);
+    CuAssertIntEquals(tc, len, 5); /* > 5 is over limit -> bug */
+    DRAIN_BUCKET(raw);
+    serf_bucket_destroy(limit);
+  }
+}
+
+static void test_split_buckets(CuTest *tc)
+{
+  test_baton_t *tb = tc->testBaton;
+  serf_bucket_alloc_t *alloc = tb->bkt_alloc;
+  char buffer[26];
+  apr_size_t len;
+  serf_bucket_t *raw;
+  serf_bucket_t *head, *tail;
+  serf_bucket_t *agg;
+  apr_status_t status;
+
   /* The normal usecase but different way */
   raw = SERF_BUCKET_SIMPLE_STRING("ABCDEFGHIJKLMNOPQRSTUVWXYZ", alloc);
-  limit = serf_bucket_limit_create(
-                serf_bucket_barrier_create(raw, alloc),
-                13, alloc);
+  serf_bucket_split_create(&head, &tail, raw, 13, 13);
   agg = serf_bucket_aggregate_create(alloc);
-  serf_bucket_aggregate_prepend(agg, limit);
+  serf_bucket_aggregate_prepend(agg, head);
   serf_bucket_aggregate_append(agg,
                                serf_bucket_simple_create("!", 1, NULL, NULL,
                                                          alloc));
-  serf_bucket_aggregate_append(agg, raw);
+  serf_bucket_aggregate_append(agg, tail);
   read_and_check_bucket(tc, agg, "ABCDEFGHIJKLM!NOPQRSTUVWXYZ");
   serf_bucket_destroy(agg);
 
   /* What if there is not enough data? */
   raw = SERF_BUCKET_SIMPLE_STRING("ABCDE", alloc);
-  limit = serf_bucket_limit_create(raw, 13, alloc);
+  serf_bucket_split_create(&head, &tail, raw, 13, 13);
 
-  status = read_all(limit, buffer, sizeof(buffer), &len);
-  CuAssertIntEquals(tc, SERF_ERROR_TRUNCATED_HTTP_RESPONSE, status);
-  serf_bucket_destroy(limit);
+  status = read_all(head, buffer, sizeof(buffer), &len);
+  CuAssertIntEquals(tc, APR_EOF, status);
+  serf_bucket_destroy(head);
+  serf_bucket_destroy(tail);
 
   /* And now a really bad case of the 'different way' */
   raw = SERF_BUCKET_SIMPLE_STRING("ABCDE", alloc);
-  limit = serf_bucket_limit_create(
-                serf_bucket_barrier_create(raw, alloc),
-                5, alloc);
+  serf_bucket_split_create(&head, &tail, raw, 5, 5);
   agg = serf_bucket_aggregate_create(alloc);
-  serf_bucket_aggregate_prepend(agg, limit);
-  serf_bucket_aggregate_append(agg, raw);
+  serf_bucket_aggregate_prepend(agg, head);
+  serf_bucket_aggregate_append(agg, tail);
 
   {
     struct iovec vecs[12];
@@ -2237,13 +2269,18 @@ static void test_limit_buckets(CuTest *t
        as reading the last part destroyed the data pointed to by
        iovecs of the first */
 
-    CuAssertIntEquals(tc, APR_EOF,
+    CuAssertIntEquals(tc, APR_SUCCESS,
                       serf_bucket_read_iovec(agg, SERF_READ_ALL_AVAIL,
                                              12, vecs, &vecs_read));
 
     serf__copy_iovec(buffer, &len, vecs, vecs_read);
 
     CuAssertIntEquals(tc, 5, len);
+
+    CuAssertIntEquals(tc, APR_EOF,
+                      serf_bucket_read_iovec(agg, SERF_READ_ALL_AVAIL,
+                                             12, vecs, &vecs_read));
+    CuAssertIntEquals(tc, 0, vecs_read);
   }
   serf_bucket_destroy(agg);
 
@@ -2252,14 +2289,17 @@ static void test_limit_buckets(CuTest *t
     int found;
 
     raw = SERF_BUCKET_SIMPLE_STRING("ABCDEF\nGHIJKLMNOP", alloc);
-    limit = serf_bucket_limit_create(raw, 5, alloc);
+    serf_bucket_split_create(&head, &tail, raw, 5, 5);
 
     CuAssertIntEquals(tc, APR_EOF,
-                      serf_bucket_readline(limit, SERF_NEWLINE_ANY, &found,
+                      serf_bucket_readline(head, SERF_NEWLINE_ANY, &found,
                                            &data, &len));
     CuAssertIntEquals(tc, SERF_NEWLINE_NONE, found);
     CuAssertIntEquals(tc, len, 5); /* > 5 is over limit -> bug */
-    DRAIN_BUCKET(raw);
+    DRAIN_BUCKET(head);
+    DRAIN_BUCKET(tail);
+    serf_bucket_destroy(head);
+    serf_bucket_destroy(tail);
   }
 }
 
@@ -2790,6 +2830,7 @@ CuSuite *test_buckets(void)
     SUITE_ADD_TEST(suite, test_deflate_buckets);
     SUITE_ADD_TEST(suite, test_prefix_buckets);
     SUITE_ADD_TEST(suite, test_limit_buckets);
+    SUITE_ADD_TEST(suite, test_split_buckets);
     SUITE_ADD_TEST(suite, test_deflate_compress_buckets);
     SUITE_ADD_TEST(suite, test_http2_unframe_buckets);
     SUITE_ADD_TEST(suite, test_http2_unpad_buckets);


Reply via email to