Author: rhuijben
Date: Wed Nov  4 14:43:01 2015
New Revision: 1712557

URL: http://svn.apache.org/viewvc?rev=1712557&view=rev
Log:
Update the http2 framing bucket -responsible for creating frames- to just
create a single frame for passed data. The updated implementation should
perform less copying when possible, but tries to create the frame whatever
it takes.

I decided that splitting data over multiple frames belongs on a different
layer as the two cases where this is needed need different behavior anyway.

* buckets/copy_buckets.c
  (serf__copy_iovec): Make library private instead of static.

* buckets/http2_frame_buckets.c
  (serf_http2_frame_context_t): Simplify state.
  (serf__bucket_http2_frame_create): Remove some arguments. Limits
    max_payload_size to a uint32, as that is how http2 configures it
    and it has to fit in 24 bits anyway.
  (http2_prepare_frame): Try to determine the payload size without copying
    and only fall back if that doesn't work.

  (serf_http2_frame_read,
   serf_http2_frame_read_iovec): Verify that the payload didn't shrink or grow.
  (serf_http2_frame_peek): Simplify a bit.

* protocols/http2_buckets.h
  (serf__bucket_http2_frame_create): Remove some arguments.

* protocols/http2_protocol.c
  (serf__http2_protocol_init,
   http2_handle_ping,
   http2_handle_settings): Update caller.
  (http2_handle_goaway,
   apr_status_t): Minor logging tweak.

* protocols/http2_stream.c
  (serf_http2__stream_setup_next_request): Update caller.

* serf_private.h
  (serf__copy_iovec): Set here.

* test/test_buckets.c
  (test_http2_frame_bucket_basic): Update caller.

Modified:
    serf/trunk/buckets/copy_buckets.c
    serf/trunk/buckets/http2_frame_buckets.c
    serf/trunk/protocols/http2_buckets.h
    serf/trunk/protocols/http2_protocol.c
    serf/trunk/protocols/http2_stream.c
    serf/trunk/serf_private.h
    serf/trunk/test/test_buckets.c

Modified: serf/trunk/buckets/copy_buckets.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/buckets/copy_buckets.c?rev=1712557&r1=1712556&r2=1712557&view=diff
==============================================================================
--- serf/trunk/buckets/copy_buckets.c (original)
+++ serf/trunk/buckets/copy_buckets.c Wed Nov  4 14:43:01 2015
@@ -58,10 +58,10 @@ serf_bucket_t *serf_bucket_copy_create(
     return serf_bucket_create(&serf_bucket_type_copy, allocator, ctx);
 }
 
-static void serf__copy_iovec(char *data,
-                             apr_size_t *copied,
-                             struct iovec *vecs,
-                             int vecs_used)
+void serf__copy_iovec(char *data,
+                      apr_size_t *copied,
+                      struct iovec *vecs,
+                      int vecs_used)
 {
     int i;
     apr_size_t sz = 0;

Modified: serf/trunk/buckets/http2_frame_buckets.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/buckets/http2_frame_buckets.c?rev=1712557&r1=1712556&r2=1712557&view=diff
==============================================================================
--- serf/trunk/buckets/http2_frame_buckets.c (original)
+++ serf/trunk/buckets/http2_frame_buckets.c Wed Nov  4 14:43:01 2015
@@ -610,14 +610,13 @@ typedef struct serf_http2_frame_context_
   serf_bucket_t *stream;
   serf_bucket_alloc_t *alloc;
   serf_bucket_t *chunk;
-  apr_status_t stream_status;
+  apr_size_t bytes_remaining;
   apr_size_t max_payload_size;
+
   apr_int32_t stream_id;
 
   unsigned char frametype;
   unsigned char flags;
-  char end_of_stream;
-  char end_of_headers;
   char created_frame;
 
   apr_int32_t *p_stream_id;
@@ -625,12 +624,6 @@ typedef struct serf_http2_frame_context_
   void (*stream_id_alloc)(void *baton, apr_int32_t *stream_id);
 
   apr_size_t current_window;
-  void *alloc_window_baton;
-  apr_int32_t (*alloc_window)(void *baton,
-                              unsigned char frametype,
-                              apr_int32_t stream_id,
-                              apr_size_t requested,
-                              int peek);
 
 } serf_http2_frame_context_t;
 
@@ -643,14 +636,7 @@ serf__bucket_http2_frame_create(serf_buc
                                                    void *baton,
                                                    apr_int32_t *stream_id),
                                 void *stream_id_baton,
-                                apr_size_t max_payload_size,
-                                apr_int32_t (*alloc_window)(
-                                                   void *baton,
-                                                   unsigned char frametype,
-                                                   apr_int32_t stream_id,
-                                                   apr_size_t requested,
-                                                   int peek),
-                                void *alloc_window_baton,
+                                apr_uint32_t max_payload_size,
                                 serf_bucket_alloc_t *alloc)
 {
   serf_http2_frame_context_t *ctx = serf_bucket_mem_alloc(alloc, sizeof(*ctx));
@@ -658,7 +644,6 @@ serf__bucket_http2_frame_create(serf_buc
   ctx->alloc = alloc;
   ctx->stream = stream;
   ctx->chunk = serf_bucket_aggregate_create(alloc);
-  ctx->stream_status = APR_SUCCESS;
   ctx->max_payload_size = max_payload_size;
   ctx->frametype = frame_type;
   ctx->flags = flags;
@@ -688,10 +673,7 @@ serf__bucket_http2_frame_create(serf_buc
     }
 
   ctx->current_window = 0;
-  ctx->alloc_window = alloc_window;
-  ctx->alloc_window_baton = alloc_window_baton;
-
-  ctx->end_of_stream = ctx->end_of_headers = ctx->created_frame = FALSE;
+  ctx->created_frame = FALSE;
 
   return serf_bucket_create(&serf_bucket_type__http2_frame, alloc, ctx);
 }
@@ -700,63 +682,169 @@ static apr_status_t
 http2_prepare_frame(serf_bucket_t *bucket)
 {
   serf_http2_frame_context_t *ctx = bucket->data;
-  struct iovec vecs[512];
   int vecs_used;
-  apr_size_t len;
-  unsigned char frame[FRAME_PREFIX_SIZE];
-  int i;
+  apr_uint64_t payload_remaining;
 
   if (ctx->created_frame)
     return APR_SUCCESS;
 
-  ctx->created_frame = TRUE;
+  /* How long will this frame be? */
+  if (!ctx->stream)
+    payload_remaining = 0;
+  else
+    payload_remaining = serf_bucket_get_remaining(ctx->stream);
 
-  if (ctx->stream)
+  if (payload_remaining != SERF_LENGTH_UNKNOWN
+      && payload_remaining > ctx->max_payload_size)
+    {
+      return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+    }
+  else if (payload_remaining != SERF_LENGTH_UNKNOWN)
     {
-      ctx->stream_status = serf_bucket_read_iovec(ctx->stream,
-                                                  ctx->max_payload_size,
-                                                  512, vecs, &vecs_used);
+      if (ctx->stream)
+        serf_bucket_aggregate_append(ctx->chunk, ctx->stream);
 
-      if (SERF_BUCKET_READ_ERROR(ctx->stream_status))
-        return ctx->stream_status;
+      ctx->stream = NULL; /* Now managed by aggregate */
     }
   else
     {
-      vecs_used = 0;
-      ctx->stream_status = APR_EOF;
+      /* Our payload doesn't know how long it is. Our only option
+         now is to create the actual data */
+      struct iovec vecs[512];
+      apr_status_t status;
+
+      status = serf_bucket_read_iovec(ctx->stream, ctx->max_payload_size,
+                                      512, vecs, &vecs_used);
+
+      if (SERF_BUCKET_READ_ERROR(status))
+        return status;
+      else if (APR_STATUS_IS_EOF(status))
+        {
+          /* OK, we got everything, let's put the data at the start of the
+             aggregate. */
+          serf_bucket_aggregate_append_iovec(ctx->chunk, vecs, vecs_used);
+
+          /* Obtain the size now , to avoid problems when the bucket
+             doesn't know that it has nothing remaining*/
+          payload_remaining = serf_bucket_get_remaining(ctx->chunk);
+
+          /* Just add the stream behind the iovecs. This keeps the chunks
+              available exactly until they are no longer necessary */
+          serf_bucket_aggregate_append(ctx->chunk, ctx->stream);
+          ctx->stream = NULL; /* Managed by aggregate */
+
+          if (payload_remaining == SERF_LENGTH_UNKNOWN)
+           {
+             /* Should never happen:
+                Aggregate with only iovecs should know size */
+             return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+           }
+        }
+      else
+        {
+          /* Auch... worst case scenario, we have to copy the data. Luckily
+             we have an absolute limit after which we may error out */
+          apr_size_t total = 0;
+          char *data = serf_bucket_mem_alloc(bucket->allocator,
+                                             ctx->max_payload_size);
+
+          serf__copy_iovec(data, &total, vecs, vecs_used);
+
+          while (!APR_STATUS_IS_EOF(status)
+                 && total < ctx->max_payload_size)
+            {
+              apr_size_t read;
+              status = serf_bucket_read_iovec(ctx->stream,
+                                              ctx->max_payload_size - total + 
1,
+                                              512, vecs, &vecs_used);
+
+              if (SERF_BUCKET_READ_ERROR(status))
+                {
+                  serf_bucket_mem_free(bucket->allocator, data);
+                  return status;
+                }
+
+              serf__copy_iovec(data, &read, vecs, vecs_used);
+              total += read;
+
+              if (status && !APR_STATUS_IS_EOF(status))
+                {
+                  /* Checkpoint what we got now...
+
+                     Next time this function is called the buffer is read 
first and
+                     then continued from the original stream */
+                  serf_bucket_t *new_stream;
+                  new_stream = serf_bucket_aggregate_create(bucket->allocator);
+
+                  serf_bucket_aggregate_append(
+                      new_stream,
+                      serf_bucket_simple_own_create(data, total, 
bucket->allocator));
+
+                  serf_bucket_aggregate_append(new_stream, ctx->stream);
+                  ctx->stream = new_stream;
+
+                  return status;
+                }
+            }
+
+          if (total > ctx->max_payload_size)
+            {
+              /* The chunk is at least 1 byte bigger then allowed */
+              serf_bucket_mem_free(bucket->allocator, data);
+
+              return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+            }
+          else
+            {
+              /* Ok, we have what we need in our buffer */
+              serf_bucket_aggregate_append(
+                    ctx->chunk,
+                    serf_bucket_simple_own_create(data, total, 
bucket->allocator));
+              payload_remaining = total;
+
+              /* And we no longer need stream */
+              serf_bucket_destroy(ctx->stream);
+              ctx->stream = NULL;
+            }
+        }
     }
 
-  /* For this first version assume that everything fits in a single frame */
-  if (! APR_STATUS_IS_EOF(ctx->stream_status))
-    abort(); /* Not implemented yet */
 
-  if (ctx->stream_id < 0 && ctx->stream_id_alloc)
-    {
-      ctx->stream_id_alloc(ctx->stream_id_baton, ctx->p_stream_id);
-      ctx->stream_id = *ctx->p_stream_id;
-    }
 
-  len = 0;
-  for (i = 0; i < vecs_used; i++)
-    len += vecs[i].iov_len;
+  /* Ok, now we can construct the frame */
+  ctx->created_frame = TRUE;
+  {
+    unsigned char frame[FRAME_PREFIX_SIZE];
 
-  frame[0] = (len >> 16) & 0xFF;
-  frame[1] = (len >> 8) & 0xFF;
-  frame[2] = len & 0xFF;
-  frame[3] = ctx->frametype;
-  frame[4] = ctx->flags;
-  frame[5] = ((apr_uint32_t)ctx->stream_id >> 24) & 0x7F;
-  frame[6] = ((apr_uint32_t)ctx->stream_id >> 16) & 0xFF;
-  frame[7] = ((apr_uint32_t)ctx->stream_id >> 8) & 0xFF;
-  frame[8] = ctx->stream_id & 0xFF;
+    /* Allocate the streamid if there isn't one.
+       Once the streamid hits the wire it automatically closes all
+       unused identifiers < this value.
+     */
+    if (ctx->stream_id < 0 && ctx->stream_id_alloc)
+      {
+        ctx->stream_id_alloc(ctx->stream_id_baton, ctx->p_stream_id);
+        ctx->stream_id = *ctx->p_stream_id;
+      }
+
+    frame[0] = (payload_remaining >> 16) & 0xFF;
+    frame[1] = (payload_remaining >> 8) & 0xFF;
+    frame[2] = payload_remaining & 0xFF;
+    frame[3] = ctx->frametype;
+    frame[4] = ctx->flags;
+    frame[5] = ((apr_uint32_t)ctx->stream_id >> 24) & 0x7F;
+    frame[6] = ((apr_uint32_t)ctx->stream_id >> 16) & 0xFF;
+    frame[7] = ((apr_uint32_t)ctx->stream_id >> 8) & 0xFF;
+    frame[8] = ctx->stream_id & 0xFF;
 
-  serf_bucket_aggregate_append(ctx->chunk,
+    /* Put the frame before the data */
+    serf_bucket_aggregate_prepend(ctx->chunk,
               serf_bucket_simple_copy_create((const char *)&frame,
                                              FRAME_PREFIX_SIZE,
                                              ctx->alloc));
-  if (vecs_used > 0)
-    serf_bucket_aggregate_append_iovec(ctx->chunk, vecs, vecs_used);
 
+    /* And set the amount of data that we verify will be read */
+    ctx->bytes_remaining = (apr_size_t)payload_remaining + FRAME_PREFIX_SIZE;
+  }
   return APR_SUCCESS;
 }
 
@@ -775,8 +863,24 @@ serf_http2_frame_read(serf_bucket_t *buc
 
   status = serf_bucket_read(ctx->chunk, requested, data, len);
 
+  if (!SERF_BUCKET_READ_ERROR(status))
+    {
+      if (*len > ctx->bytes_remaining)
+        {
+          /* Frame payload resized after the header was written */
+          return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+        }
+      ctx->bytes_remaining -= *len;
+    }
+
   if (APR_STATUS_IS_EOF(status))
-    return ctx->stream_status;
+    {
+      if (ctx->bytes_remaining > 0)
+        {
+          /* Frame payload resized after the header was written */
+          return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+        }
+    }
 
   return status;
 }
@@ -798,8 +902,30 @@ serf_http2_frame_read_iovec(serf_bucket_
   status = serf_bucket_read_iovec(ctx->chunk, requested, vecs_size, vecs,
                                   vecs_used);
 
+  if (!SERF_BUCKET_READ_ERROR(status))
+    {
+      apr_size_t len = 0;
+      int i;
+
+      for (i = 0; i < *vecs_used; i++)
+        len += vecs[i].iov_len;
+
+      if (len > ctx->bytes_remaining)
+        {
+          /* Frame resized after the header was written */
+          return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+        }
+      ctx->bytes_remaining -= len;
+    }
+
   if (APR_STATUS_IS_EOF(status))
-    return ctx->stream_status;
+    {
+      if (ctx->bytes_remaining > 0)
+        {
+          /* Frame payload resized after the header was written */
+          return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+        }
+    }
 
   return status;
 }
@@ -814,14 +940,12 @@ serf_http2_frame_peek(serf_bucket_t *buc
 
   status = http2_prepare_frame(bucket);
   if (status)
-    return status;
-
-  status = serf_bucket_peek(ctx->chunk, data, len);
-
-  if (APR_STATUS_IS_EOF(status))
-    return ctx->stream_status;
+    {
+      *len = 0;
+      return APR_SUCCESS;
+    }
 
-  return status;
+  return serf_bucket_peek(ctx->chunk, data, len);
 }
 
 static void

Modified: serf/trunk/protocols/http2_buckets.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_buckets.h?rev=1712557&r1=1712556&r2=1712557&view=diff
==============================================================================
--- serf/trunk/protocols/http2_buckets.h (original)
+++ serf/trunk/protocols/http2_buckets.h Wed Nov  4 14:43:01 2015
@@ -211,14 +211,7 @@ serf__bucket_http2_frame_create(serf_buc
                                       void *baton,
                                       apr_int32_t *stream_id),
                                 void *stream_id_baton,
-                                apr_size_t max_payload_size,
-                                apr_int32_t(*alloc_window)(
-                                      void *baton,
-                                      unsigned char frametype,
-                                      apr_int32_t stream_id,
-                                      apr_size_t requested,
-                                      int peek),
-                                void *alloc_window_baton,
+                                apr_uint32_t max_payload_size,
                                 serf_bucket_alloc_t *alloc);
 
 /* ==================================================================== */

Modified: serf/trunk/protocols/http2_protocol.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1712557&r1=1712556&r2=1712557&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.c (original)
+++ serf/trunk/protocols/http2_protocol.c Wed Nov  4 14:43:01 2015
@@ -236,9 +236,9 @@ void serf__http2_protocol_init(serf_conn
     serf_bucket_t *window_size;
 
     tmp = serf__bucket_http2_frame_create(NULL, HTTP2_FRAME_TYPE_SETTINGS, 0,
-                                          NULL, NULL, NULL, /* Static id: 0*/
-                                          HTTP2_DEFAULT_MAX_FRAMESIZE,
-                                          NULL, NULL, conn->allocator);
+                                          NULL, NULL, NULL, /* stream: 0 */
+                                          h2->lr_max_framesize,
+                                          conn->allocator);
 
     serf_http2__enqueue_frame(h2, tmp, FALSE);
 
@@ -246,9 +246,9 @@ void serf__http2_protocol_init(serf_conn
     window_size = serf_bucket_create_numberv(conn->allocator, "4", 0x40000000);
     tmp = serf__bucket_http2_frame_create(window_size,
                                           HTTP2_FRAME_TYPE_WINDOW_UPDATE, 0,
-                                          NULL, NULL, NULL,
-                                          HTTP2_DEFAULT_MAX_FRAMESIZE,
-                                          NULL, NULL, conn->allocator);
+                                          NULL, NULL, NULL, /* stream: 0 */
+                                          h2->lr_max_framesize,
+                                          conn->allocator);
     serf_http2__enqueue_frame(h2, tmp, FALSE);
 
     h2->rl_window += 0x40000000; /* And update our own administration */
@@ -587,7 +587,6 @@ http2_handle_ping(void *baton,
                                                   HTTP2_FLAG_ACK,
                                                   NULL, NULL, NULL,
                                                   h2->lr_max_framesize,
-                                                  NULL, NULL,
                                                   h2->allocator),
                   TRUE /* pump */);
 
@@ -698,8 +697,8 @@ http2_handle_settings(void *baton,
                                     HTTP2_FRAME_TYPE_SETTINGS,
                                     HTTP2_FLAG_ACK,
                                     NULL, NULL, NULL,
-                                    HTTP2_DEFAULT_MAX_FRAMESIZE,
-                                    NULL, NULL, h2->allocator),
+                                    h2->lr_max_framesize,
+                                    h2->allocator),
                     TRUE);
 
   return APR_SUCCESS;
@@ -789,7 +788,7 @@ http2_handle_goaway(void *baton,
                                     len - HTTP2_GOWAWAY_DATA_SIZE);
 
       serf__log(loglevel, SERF_LOGHTTP2, h2->config,
-                "Received GOAWAY, last-stream=%d, error=%u: %s\n",
+                "Received GOAWAY, last-stream=0x%x, error=%u: %s\n",
                 last_streamid, error_code, goaway_text);
 
       serf_bucket_mem_free(h2->allocator, goaway_text);
@@ -797,7 +796,7 @@ http2_handle_goaway(void *baton,
   else
     {
       serf__log(loglevel, SERF_LOGHTTP2, h2->config,
-                "Received GOAWAY, last-stream=%d, error=%u.\n",
+                "Received GOAWAY, last-stream=0x%x, error=%u.\n",
                 last_streamid, error_code);
     }
 
@@ -971,7 +970,7 @@ http2_process(serf_http2_protocol_t *h2)
             }
 
           serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
-                    "Reading 0x%x frame, stream=%x, flags=0x%x\n",
+                    "Reading 0x%x frame, stream=0x%x, flags=0x%x\n",
                     frametype, sid, frameflags);
 
           /* If status is EOF then the frame doesn't have/declare a body */
@@ -1611,7 +1610,7 @@ serf_http2__enqueue_stream_reset(serf_ht
             serf__bucket_http2_frame_create(bkt,
                                             HTTP2_FRAME_TYPE_RST_STREAM,
                                             0, &streamid, NULL, NULL,
-                                            h2->lr_max_framesize, NULL, NULL,
+                                            h2->lr_max_framesize,
                                             h2->allocator),
             TRUE);
 }

Modified: serf/trunk/protocols/http2_stream.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_stream.c?rev=1712557&r1=1712556&r2=1712557&view=diff
==============================================================================
--- serf/trunk/protocols/http2_stream.c (original)
+++ serf/trunk/protocols/http2_stream.c Wed Nov  4 14:43:01 2015
@@ -139,7 +139,7 @@ serf_http2__stream_setup_next_request(se
                                            serf_http2__allocate_stream_id,
                                            stream,
                                            HTTP2_DEFAULT_MAX_FRAMESIZE,
-                                           NULL, NULL, request->allocator);
+                                           request->allocator);
 
   serf_http2__enqueue_frame(stream->h2, hpack, TRUE);
 

Modified: serf/trunk/serf_private.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1712557&r1=1712556&r2=1712557&view=diff
==============================================================================
--- serf/trunk/serf_private.h (original)
+++ serf/trunk/serf_private.h Wed Nov  4 14:43:01 2015
@@ -439,6 +439,13 @@ struct serf_connection_t {
 
 /*** Internal bucket functions ***/
 
+/* Copies all data contained in vecs to *data, optionally telling how much was
+   copied */
+void serf__copy_iovec(char *data,
+                      apr_size_t *copied,
+                      struct iovec *vecs,
+                      int vecs_used);
+
 /** Transform a response_bucket in-place into an aggregate bucket. Restore the
     status line and all headers, not just the body.
  

Modified: serf/trunk/test/test_buckets.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/test/test_buckets.c?rev=1712557&r1=1712556&r2=1712557&view=diff
==============================================================================
--- serf/trunk/test/test_buckets.c (original)
+++ serf/trunk/test/test_buckets.c Wed Nov  4 14:43:01 2015
@@ -2483,7 +2483,7 @@ static void test_http2_frame_bucket_basi
   body_in = SERF_BUCKET_SIMPLE_STRING("This is no config!", alloc);
   frame_in = serf__bucket_http2_frame_create(body_in, 99, 7, &exp_streamid,
                                              NULL, NULL,
-                                             16384, NULL, NULL, alloc);
+                                             16384, alloc);
   frame_out = serf__bucket_http2_unframe_create(frame_in, 16384, alloc);
 
   read_and_check_bucket(tc, frame_out, "This is no config!");
@@ -2532,6 +2532,7 @@ CuSuite *test_buckets(void)
     SUITE_ADD_TEST(suite, test_response_bucket_peek_at_headers);
     SUITE_ADD_TEST(suite, test_response_bucket_iis_status_code);
     SUITE_ADD_TEST(suite, test_response_bucket_no_reason);
+    SUITE_ADD_TEST(suite, test_response_continue);
     SUITE_ADD_TEST(suite, test_bucket_header_set);
     SUITE_ADD_TEST(suite, test_bucket_header_do);
     SUITE_ADD_TEST(suite, test_iovec_buckets);


Reply via email to