Author: rhuijben
Date: Sat Oct 31 19:53:57 2015
New Revision: 1711679

URL: http://svn.apache.org/viewvc?rev=1711679&view=rev
Log:
Checkpoint a rewrite of the prototype http2 protocol handler to a fully event
driven protocol handler.

This patch starts delegating reading of the headers and response to the stream
handler, which was already responsible for writing out the request.

* protocols/http2_protocol.c
  (http2_process): New prototype.
  (serf_http2_protocol_t): Store settings. Update state for event driven
    handling.
  (serf__http2_protocol_init): Update init.
  (setup_for_http2): Update caller.
  (serf_http2__enqueue_frame): Destroy bucket on error.

  (http2_handle_priority,
   http2_handle_promise,
   http2_handle_frame_reset,
   http2_handle_stream_window_update,
   http2_handle_connection_window_update,
   http2_handle_ping,
   http2_handle_ping_ack,
   http2_handle_settings,
   http2_handle_goaway): New functions handling specific frame data.

  (http2_handle_continuation,
   http2_end_of_frame,
   http2_bucket_processor): New callbacks for specific bucket callbacks.

  (http2_read): Rename to...
  (http2_process): ... this, and implements the logic in a completely different
    way.
  (http2_protocol_read): Update caller.

  (serf_http2__allocate_stream_id): Tweak for variable name updates.
  (serf_http2__stream_get): Resolve endless loop. Tweak varnames.
  (serf_http2__enqueue_stream_reset): New stub function.

* protocols/http2_protocol.h
  (SERF_H2_assert): New define.
  (HTTP2_DEFAULT_MAX_CONCURRENT,
   HTTP2_PRIORITY_DATA_SIZE,
   HTTP2_RST_DATA_SIZE,
   HTTP2_PROMISE_DATA_SIZE,
   HTTP2_PING_DATA_SIZE,
   HTTP2_GOWAWAY_DATA_SIZE,
   HTTP2_WINDOW_UPDATE_DATA_SIZE,
   HTTP2_SETTING_SIZE): New defines for magic values.

  (serf_http2_stream_data_t): New typedef.
  (serf_http2_stream_t): Hide some data inside serf_http2_stream_data_t.
  (serf_http2_processor_t): New typedef.
  (serf_http2__enqueue_stream_reset): New function.
  (serf_http2__stream_get_by_id): Rename to...
  (serf_http2__stream_get): ... this to match implementation.

  (serf_http2__stream_reset,
   serf_http2__stream_handle_hpack,
   serf_http2__stream_handle_data,
   serf_http2__stream_processor): New functions.

* protocols/http2_stream.c
  (serf_http2_stream_data_t): New struct.
  (serf_http2__stream_create): Init data struct.
  (serf_http2__stream_cleanup): Cleanup data struct.
  (serf_http2__stream_setup_request): Update caller.

  (serf_http2__stream_reset,
   stream_response_eof,
   serf_http2__stream_handle_hpack,
   serf_http2__stream_handle_data,
   serf_http2__stream_processor): New function.

Modified:
    serf/trunk/protocols/http2_protocol.c
    serf/trunk/protocols/http2_protocol.h
    serf/trunk/protocols/http2_stream.c

Modified: serf/trunk/protocols/http2_protocol.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1711679&r1=1711678&r2=1711679&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.c (original)
+++ serf/trunk/protocols/http2_protocol.c Sat Oct 31 19:53:57 2015
@@ -43,6 +43,9 @@ http2_protocol_hangup(serf_connection_t
 static void
 http2_protocol_teardown(serf_connection_t *conn);
 
+static apr_status_t
+http2_process(serf_http2_protocol_t *h2);
+
 static serf_bucket_t *
 serf_bucket_create_numberv(serf_bucket_alloc_t *allocator, const char *format, 
...)
 {
@@ -132,26 +135,42 @@ struct serf_http2_protocol_t
   apr_pool_t *pool;
   serf_connection_t *conn;
   serf_bucket_t *ostream;
+  serf_bucket_alloc_t *allocator;
 
-  serf_hpack_table_t *hpack_tbl;
+  serf_http2_processor_t processor;
+  void *processor_baton;
+  serf_bucket_t *read_frame;   /* Frame currently being read */
+  int in_frame;
 
-  apr_uint32_t default_lr_window;
-  apr_uint32_t default_rl_window;
+  serf_hpack_table_t *hpack_tbl;
+  serf_config_t *config;
 
-  apr_int64_t lr_window; /* local->remote */
-  apr_int64_t rl_window; /* remote->local */
-  apr_int32_t next_local_streamid;
-  apr_int32_t next_remote_streamid;
+  /* Local -> Remote. Settings provided by other side */
+  apr_uint32_t lr_default_window;
+  apr_uint32_t lr_window;
+  apr_uint32_t lr_max_framesize;
+  apr_uint32_t lr_max_headersize;
+  apr_uint32_t lr_max_concurrent;
+  apr_int32_t lr_next_streamid;
+  char lr_push_enabled;
+
+  /* Remote -> Local. Settings set by us. Acknowledged by other side */
+  apr_uint32_t rl_default_window;
+  apr_uint32_t rl_window;
+  apr_uint32_t rl_max_framesize;
+  apr_uint32_t rl_max_headersize;
+  apr_uint32_t rl_max_concurrent;
+  apr_int32_t rl_next_streamid;
+  char rl_push_enabled;
 
   serf_http2_stream_t *first;
   serf_http2_stream_t *last;
 
-  char buffer[HTTP2_DEFAULT_MAX_FRAMESIZE];
-  apr_size_t buffer_used;
-  serf_bucket_t *cur_frame;
-  serf_bucket_t *cur_payload;
-  int in_payload;
+  int setting_acks;
+  int enforce_flow_control;
 
+  serf_bucket_t *continuation_bucket;
+  apr_int32_t continuation_streamid;
 };
 
 static apr_status_t
@@ -179,6 +198,7 @@ void serf__http2_protocol_init(serf_conn
   serf_http2_protocol_t *ctx;
   apr_pool_t *protocol_pool;
   serf_bucket_t *tmp;
+  const int WE_ARE_CLIENT = 1;
 
   apr_pool_create(&protocol_pool, conn->pool);
 
@@ -186,15 +206,30 @@ void serf__http2_protocol_init(serf_conn
   ctx->pool = protocol_pool;
   ctx->conn = conn;
   ctx->ostream = conn->ostream_tail;
+  ctx->allocator = conn->allocator;
+  ctx->config = conn->config;
 
   /* Defaults until negotiated */
-  ctx->default_lr_window = HTTP2_DEFAULT_WINDOW_SIZE;
-  ctx->default_rl_window = HTTP2_DEFAULT_WINDOW_SIZE;
-
-  ctx->lr_window = ctx->default_lr_window;
-  ctx->rl_window = ctx->default_rl_window;
-  ctx->next_local_streamid = 1; /* 2 if we would be the server */
-  ctx->next_remote_streamid = 2; /* 1 if we would be the client */
+  ctx->rl_default_window = HTTP2_DEFAULT_WINDOW_SIZE;
+  ctx->rl_window = HTTP2_DEFAULT_WINDOW_SIZE;
+  ctx->rl_next_streamid = WE_ARE_CLIENT ? 2 : 1;
+  ctx->rl_max_framesize = HTTP2_DEFAULT_MAX_FRAMESIZE;
+  ctx->rl_max_headersize = APR_UINT32_MAX;
+  ctx->rl_max_concurrent = HTTP2_DEFAULT_MAX_CONCURRENT;
+  ctx->rl_push_enabled = TRUE;
+
+  ctx->lr_default_window = HTTP2_DEFAULT_WINDOW_SIZE;
+  ctx->lr_window = HTTP2_DEFAULT_WINDOW_SIZE;
+  ctx->lr_next_streamid = WE_ARE_CLIENT ? 1 : 2;
+  ctx->lr_max_framesize = HTTP2_DEFAULT_MAX_FRAMESIZE;
+  ctx->lr_max_headersize = APR_UINT32_MAX;
+  ctx->lr_max_concurrent = HTTP2_DEFAULT_MAX_CONCURRENT;
+  ctx->lr_push_enabled = TRUE;
+
+  ctx->setting_acks = 0;
+  ctx->enforce_flow_control = TRUE;
+  ctx->continuation_bucket = NULL;
+  ctx->continuation_streamid = 0;
 
   ctx->first = ctx->last = NULL;
 
@@ -253,9 +288,9 @@ setup_for_http2(serf_http2_protocol_t *h
   serf_http2_stream_t *stream;
 
   stream = serf_http2__stream_create(h2, -1,
-                                     h2->default_lr_window,
-                                     h2->default_rl_window,
-                                     h2->conn->allocator);
+                                     h2->lr_default_window,
+                                     h2->rl_default_window,
+                                     h2->allocator);
 
   if (h2->first)
     {
@@ -287,7 +322,10 @@ serf_http2__enqueue_frame(serf_http2_pro
       status = serf_bucket_peek(h2->ostream, &data, &len);
 
       if (SERF_BUCKET_READ_ERROR(status))
-        return status;
+        {
+          serf_bucket_destroy(frame);
+          return status; 
+        }
 
       if (len == 0)
         {
@@ -322,178 +360,871 @@ serf_http2__enqueue_frame(serf_http2_pro
   return APR_SUCCESS;
 }
 
+/* Implements serf_bucket_prefix_handler_t.
+   Handles PRIORITY frames and the priority prefix of HEADERS frames */
+static apr_status_t
+http2_handle_priority(void *baton,
+                      serf_bucket_t *bucket,
+                      const char *data,
+                      apr_size_t len)
+{
+  serf_http2_stream_t *stream = baton;
+
+  if (len != HTTP2_PRIORITY_DATA_SIZE)
+    return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+
+  if (stream == NULL)
+      return APR_SUCCESS; /* Nothing to record this on */
 
+  /* ### TODO: Store priority information on stream */
+  SERF_H2_assert(stream->h2 != NULL);
+
+  return APR_SUCCESS;
+}
+
+/* Implements serf_bucket_prefix_handler_t.
+   Handles the promise prefix of PUSH_PROMISE frames */
 static apr_status_t
-http2_read(serf_connection_t *conn)
+http2_handle_promise(void *baton,
+                     serf_bucket_t *bucket,
+                     const char *data,
+                     apr_size_t len)
 {
-  serf_http2_protocol_t *ctx = conn->protocol_baton;
-  apr_status_t status = APR_SUCCESS;
+  serf_http2_stream_t *stream = baton;
 
-  while (TRUE)
-    {
-      status = APR_SUCCESS;
+  if (len != HTTP2_PROMISE_DATA_SIZE)
+    return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
 
-      if (ctx->cur_frame)
-        {
-          const char *data;
-          apr_size_t len;
+  /* ### TODO: Prepare reading promise on stream */
+  SERF_H2_assert(stream->h2 != NULL);
 
-          if (! ctx->in_payload)
-            {
-              unsigned char flags;
-              unsigned char frametype;
-              apr_int32_t streamid;
-              apr_uint64_t size;
-
-              status = serf__bucket_http2_unframe_read_info(ctx->cur_frame,
-                                                            &streamid, 
&frametype,
-                                                            &flags);
+  return APR_SUCCESS;
+}
 
-              if (status && !APR_STATUS_IS_EOF(status))
-                break;
+/* Implements serf_bucket_prefix_handler_t.
+   Handles the promise prefix of FRAME_RSET frames */
+static apr_status_t
+http2_handle_frame_reset(void *baton,
+                         serf_bucket_t *bucket,
+                         const char *data,
+                         apr_size_t len)
+{
+  serf_http2_stream_t *stream = baton;
 
-              size = serf_bucket_get_remaining(ctx->cur_frame);
+  if (len != HTTP2_RST_DATA_SIZE)
+    return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
 
-              serf__log(LOGLVL_INFO, SERF_LOGHTTP2, conn->config,
-                        "Start 0x%02x http2 frame on stream 0x%x, flags=0x%x, 
size=0x%x\n",
-                        (int)frametype, (int)streamid, (int)flags, (int)size);
-
-              ctx->in_payload = TRUE;
-
-              if (flags & HTTP2_FLAG_PADDED)
-                {
-                  ctx->cur_payload =
-                        serf__bucket_http2_unpad_create(
-                              ctx->cur_frame, TRUE,
-                              ctx->cur_frame->allocator);
-                }
-              else
-                ctx->cur_payload = ctx->cur_frame;
+  SERF_H2_assert(stream->h2 != NULL);
 
-              if (frametype == HTTP2_FRAME_TYPE_HEADERS)
-                {
-                  ctx->cur_payload = serf__bucket_hpack_decode_create(
-                                            ctx->cur_payload,
-                                            NULL, NULL,
-                                            16384, ctx->hpack_tbl,
-                                            ctx->cur_frame->allocator);
-                }
-            }
+  /* ### TODO: Handle error code, etc. */
+  stream->status = H2S_CLOSED;
 
-          status = serf_bucket_read(ctx->cur_payload,
-                                    sizeof(ctx->buffer) - ctx->buffer_used,
-                                    &data, &len);
+  return APR_SUCCESS;
+}
 
-          if (SERF_BUCKET_READ_ERROR(status))
-            break;
+/* Implements serf_bucket_prefix_handler_t.
+   Handles WINDOW_UPDATE frames when they apply to a stream */
+static apr_status_t
+http2_handle_stream_window_update(void *baton,
+                                  serf_bucket_t *bucket,
+                                  const char *data,
+                                  apr_size_t len)
+{
+  serf_http2_stream_t *stream = baton;
 
-          if (len)
-            {
-              memcpy(&ctx->buffer[ctx->buffer_used], data, len);
-              ctx->buffer_used += len;
-            }
+  if (len != HTTP2_WINDOW_UPDATE_DATA_SIZE)
+    return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
 
-          if (APR_STATUS_IS_EOF(status))
-            {
-              apr_int32_t streamid;
-              unsigned char frametype;
-              unsigned char flags;
-
-              serf__bucket_http2_unframe_read_info(ctx->cur_frame,
-                                                   &streamid, &frametype,
-                                                   &flags);
-              serf__log(LOGLVL_INFO, SERF_LOGHTTP2, conn->config,
-                        "Done 0x%02x http2 frame on stream 0x%x, flags=0x%x, 
size=0x%x\n",
-                        (int)frametype, (int)streamid, (int)flags, 
(int)ctx->buffer_used);
-
-              if (frametype == HTTP2_FRAME_TYPE_DATA
-                  || frametype == HTTP2_FRAME_TYPE_HEADERS)
-                {
-                  /* Ugly hack to dump body. Memory LEAK! */
-                  serf__log(LOGLVL_INFO, SERF_LOGHTTP2, conn->config,
-                            "%s\n", apr_pstrmemdup(conn->pool, ctx->buffer, 
ctx->buffer_used));
-                }
-
-              if (frametype == HTTP2_FRAME_TYPE_GOAWAY && conn)
-                serf__log(LOGLVL_WARNING, SERF_LOGHTTP2, conn->config,
-                          "Go away reason %d: %s\n", ctx->buffer[7],
-                                                     apr_pstrmemdup(conn->pool,
-                                                               &ctx->buffer[8],
-                                                               
(ctx->buffer_used >= 8)
-                                                               ? 
ctx->buffer_used-8 : 0));
-
-              if (frametype == HTTP2_FRAME_TYPE_RST_STREAM && conn)
-                serf__log(LOGLVL_WARNING, SERF_LOGHTTP2, conn->config,
-                          "Reset reason %d: %s\n", ctx->buffer[7],
-                          apr_pstrmemdup(conn->pool,
-                                         &ctx->buffer[8],
-                                         (ctx->buffer_used >= 8)
-                                         ? ctx->buffer_used - 8 : 0));
-
-              if (frametype == HTTP2_FRAME_TYPE_SETTINGS
-                  && !(flags & HTTP2_FLAG_ACK))
-                {
-                  /* Always ack settings */
-                  serf_http2__enqueue_frame(
-                    ctx,
+  SERF_H2_assert(stream->h2 != NULL);
+
+  return APR_SUCCESS;
+}
+
+/* Implements serf_bucket_prefix_handler_t.
+   Handles WINDOW_UPDATE frames when they apply to the connection */
+static apr_status_t
+http2_handle_connection_window_update(void *baton,
+                                      serf_bucket_t *bucket,
+                                      const char *data,
+                                      apr_size_t len)
+{
+  serf_http2_protocol_t *h2 = baton;
+
+  if (len != HTTP2_WINDOW_UPDATE_DATA_SIZE)
+    return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+
+  SERF_H2_assert(h2 != NULL);
+
+  return APR_SUCCESS;
+}
+
+/* Implements serf_bucket_prefix_handler_t.
+   Handles PING frames for pings initiated remotely */
+static apr_status_t
+http2_handle_ping(void *baton,
+                  serf_bucket_t *bucket,
+                  const char *data,
+                  apr_size_t len)
+{
+  serf_http2_protocol_t *h2 = baton;
+  serf_bucket_t *body;
+  apr_status_t status;
+
+  if (len != HTTP2_PING_DATA_SIZE)
+    return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+
+  SERF_H2_assert(h2 != NULL);
+
+  /* Reply with a PONG (=PING + ACK) with the same data*/
+
+  body = serf_bucket_simple_copy_create(data, len,
+                                        h2->allocator);
+
+  status = serf_http2__enqueue_frame(
+                  h2,
+                  serf__bucket_http2_frame_create(body,
+                                                  HTTP2_FRAME_TYPE_PING,
+                                                  HTTP2_FLAG_ACK,
+                                                  NULL, NULL, NULL,
+                                                  h2->lr_max_framesize,
+                                                  NULL, NULL,
+                                                  h2->allocator),
+                  TRUE /* pump */);
+
+  if (SERF_BUCKET_READ_ERROR(status))
+    return status;
+
+  return APR_SUCCESS;
+}
+
+/* Implements serf_bucket_prefix_handler_t.
+   Handles PING frames for pings initiated locally */
+static apr_status_t
+http2_handle_ping_ack(void *baton,
+                      serf_bucket_t *bucket,
+                      const char *data,
+                      apr_size_t len)
+{
+  serf_http2_protocol_t *h2 = baton;
+  if (len != HTTP2_PING_DATA_SIZE)
+    return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+
+  SERF_H2_assert(h2 != NULL);
+
+  /* Did we send a ping? */
+
+  return APR_SUCCESS;
+}
+
+/* Implements serf_bucket_prefix_handler_t.
+   Handles SETTINGS frames */
+static apr_status_t
+http2_handle_settings(void *baton,
+                      serf_bucket_t *bucket,
+                      const char *data,
+                      apr_size_t len)
+{
+  serf_http2_protocol_t *h2 = baton;
+  apr_size_t i;
+  const struct setting_t
+  {
+    unsigned char s1, s0;
+    unsigned char v3, v2, v1, v0;
+  } *setting;
+
+  if ((len % HTTP2_SETTING_SIZE) != 0)
+    return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+
+  /* ### TODO: Handle settings */
+  setting = (const void *)data;
+  for (i = 0, setting = (const void *)data;
+       i < len;
+       i += sizeof(*setting), setting++)
+    {
+      apr_uint16_t id = (setting->s1 << 8) | setting->s0;
+      apr_uint32_t value = (setting->v3 << 24) | (setting->v2 << 16)
+                           | (setting->v1 << 8) | setting->v0;
+
+      switch (id)
+        {
+          case HTTP2_SETTING_HEADER_TABLE_SIZE:
+            /* TODO: Send to hpack table */
+            serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
+                      "Setting HPACK Table size to %u\n", value);
+            break;
+          case HTTP2_SETTING_ENABLE_PUSH:
+            serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
+                      "Setting Push enabled: %u\n", value);
+            h2->lr_push_enabled = (value != 0);
+            break;
+          case HTTP2_SETTING_MAX_CONCURRENT_STREAMS:
+            serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
+                      "Setting Max Concurrent %u\n", value);
+            h2->lr_max_concurrent = value;
+            break;
+          case HTTP2_SETTING_INITIAL_WINDOW_SIZE:
+            /* Sanitize? */
+            serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
+                      "Setting Initial Window Size %u\n", value);
+            h2->lr_default_window = value;
+            break;
+          case HTTP2_SETTING_MAX_FRAME_SIZE:
+            /* Sanitize? */
+            serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
+                      "Setting Max framesize %u\n", value);
+            h2->lr_max_framesize = value;
+            break;
+          case HTTP2_SETTING_MAX_HEADER_LIST_SIZE:
+            serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
+                      "Setting Max header list size %u\n", value);
+            h2->lr_max_headersize = value;
+            break;
+          default:
+            /* An endpoint that receives a SETTINGS frame with any unknown
+               or unsupported identifier MUST ignore that setting. */
+            serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
+                      "Ignoring unknown setting %d, value %u\n", id, value);
+            break;
+        }
+    }
+
+  /* Always ack settings */
+  serf_http2__enqueue_frame(
+                    h2,
                     serf__bucket_http2_frame_create(
                                     NULL,
                                     HTTP2_FRAME_TYPE_SETTINGS,
                                     HTTP2_FLAG_ACK,
                                     NULL, NULL, NULL,
                                     HTTP2_DEFAULT_MAX_FRAMESIZE,
-                                    NULL, NULL, conn->allocator),
-                    TRUE);
-                }
-              else if (frametype == HTTP2_FRAME_TYPE_DATA)
-                {
-                  /* Provide a bit of window space to the server after 
-                     receiving data */
-                  serf_http2__enqueue_frame(
-                    ctx,
-                    serf__bucket_http2_frame_create(
-                      serf_bucket_create_numberv(conn->allocator, "4", 
(apr_int32_t)16384),
-                              HTTP2_FRAME_TYPE_WINDOW_UPDATE, 0,
-                              &streamid, NULL, NULL,
-                              HTTP2_DEFAULT_MAX_FRAMESIZE,
-                              NULL, NULL, conn->allocator),
+                                    NULL, NULL, h2->allocator),
                     TRUE);
-                }
-              else if (frametype == HTTP2_FRAME_TYPE_PING)
-                {
-                  /* TODO: PONG (=Ping Ack) */
-                }
-
-              serf_bucket_destroy(ctx->cur_payload);
-              ctx->cur_frame = ctx->cur_payload = NULL;
-              ctx->in_payload = FALSE;
-              ctx->buffer_used = 0;
+
+  return APR_SUCCESS;
+}
+
+/* Implements serf_bucket_prefix_handler_t.
+   Handles GOAWAY frames */
+static apr_status_t
+http2_handle_goaway(void *baton,
+                    serf_bucket_t *bucket,
+                    const char *data,
+                    apr_size_t len)
+{
+  serf_http2_protocol_t *h2 = baton;
+
+  SERF_H2_assert(h2 != NULL);
+
+  return APR_SUCCESS;
+}
+
+
+/* Implements serf_bucket_aggregate_eof_t */
+static apr_status_t
+http2_handle_continuation(void *baton,
+                          serf_bucket_t *aggregate_bucket)
+{
+  serf_http2_protocol_t *h2 = baton;
+  apr_status_t status;
+  const char *data;
+  apr_size_t len;
+
+  if (h2->continuation_bucket != aggregate_bucket)
+    return APR_EOF; /* This is all we have */
+
+  SERF_H2_assert(h2->read_frame == NULL);
+  SERF_H2_assert(h2->continuation_bucket == aggregate_bucket);
+
+  status = http2_process(h2);
+  if (status)
+    return status;
+
+  if (h2->continuation_bucket == aggregate_bucket)
+    {
+      /* We expect more data in the future. Something
+         was done in http2_process() or it didn't
+         return APR_SUCCESS */
+      return APR_SUCCESS;
+    }
+
+  /* As h2->continuation_bucket is no longer attached we don't
+     recurse on peeking. Just check if there is more */
+  return serf_bucket_peek(aggregate_bucket, &data, &len);
+}
+
+/* Implements the serf__bucket_http2_unframe_set_eof callback */
+static apr_status_t
+http2_end_of_frame(void *baton,
+                   serf_bucket_t *frame)
+{
+  serf_http2_protocol_t *h2 = baton;
+
+  SERF_H2_assert(h2->read_frame == frame);
+  h2->read_frame = NULL;
+  h2->in_frame = FALSE;
+  h2->processor = NULL;
+  h2->processor_baton = NULL;
+
+  return APR_SUCCESS;
+}
+
+/* Implements serf_http2_processor_t */
+static apr_status_t
+http2_bucket_processor(void *baton,
+                       serf_http2_protocol_t *h2,
+                       serf_bucket_t *frame_bucket)
+{
+  struct iovec vecs[16];
+  int vecs_used;
+  serf_bucket_t *payload = baton;
+  apr_status_t status;
+
+  status = serf_bucket_read_iovec(payload, SERF_READ_ALL_AVAIL, 16,
+                                  vecs, &vecs_used);
+
+  if (APR_STATUS_IS_EOF(status))
+    {
+      SERF_H2_assert(!h2->in_frame && !h2->read_frame);
+      serf_bucket_destroy(payload);
+    }
+
+  return status;
+}
+
+/* Processes incoming HTTP2 data */
+static apr_status_t
+http2_process(serf_http2_protocol_t *h2)
+{
+  while (TRUE)
+    {
+      apr_status_t status;
+      serf_bucket_t *body;
+
+      if (h2->processor)
+        {
+          status = h2->processor(h2->processor_baton, h2, h2->read_frame);
+
+          if (SERF_BUCKET_READ_ERROR(status))
+            return status;
+          else if (APR_STATUS_IS_EOF(status))
+            {
+              /* ### frame ended */
+              SERF_H2_assert(h2->read_frame == NULL);
+              h2->processor = NULL;
+              h2->processor_baton = NULL;
             }
-          else
-            continue;
+          else if (h2->in_frame)
+            {
+              if (status)
+                return status;
+              else
+                continue;
+            }
+        }
+      else
+        {
+          SERF_H2_assert(!h2->in_frame);
         }
 
-      if (APR_STATUS_IS_EOF(status))
+      body = h2->read_frame;
+
+      if (! body)
         {
-          const char *data;
-          apr_size_t len;
-          status = serf_bucket_peek(conn->stream, &data, &len);
+          SERF_H2_assert(!h2->in_frame);
+
+          body = serf__bucket_http2_unframe_create(
+                                             h2->conn->stream, FALSE,
+                                             h2->rl_max_framesize,
+                                             h2->allocator);
+
+          serf__bucket_http2_unframe_set_eof(body,
+                                             http2_end_of_frame, h2);
 
-          if (SERF_BUCKET_READ_ERROR(status)
-              || APR_STATUS_IS_EOF(status))
+          serf_bucket_set_config(body, h2->config);
+          h2->read_frame = body;
+        }
+
+      if (! h2->in_frame)
+        {
+          apr_int32_t sid;
+          unsigned char frametype;
+          unsigned char frameflags;
+          apr_size_t remaining;
+          serf_http2_processor_t process_handler = NULL;
+          void *process_baton = NULL;
+          serf_bucket_t *process_bucket = NULL;
+          serf_http2_stream_t *stream;
+          apr_uint32_t reset_reason;
+
+          status = serf__bucket_http2_unframe_read_info(body, &sid,
+                                                        &frametype, 
&frameflags);
+
+          if (APR_STATUS_IS_EOF(status))
             {
-              /* We have a real EOF*/
-              break;
+              /* Entire frame is already read (just header) */
+              SERF_H2_assert(h2->read_frame == NULL);
+              SERF_H2_assert(! h2->in_frame);
+            }
+          else if (status)
+            {
+              SERF_H2_assert(h2->read_frame != NULL);
+              SERF_H2_assert(! h2->in_frame);
+              return status;
+            }
+          else
+            {
+              h2->in_frame = TRUE;
+              SERF_H2_assert(h2->read_frame != NULL);
             }
-        }
 
-      ctx->cur_frame = ctx->cur_payload =
-            serf__bucket_http2_unframe_create(conn->stream, FALSE,
-                                              HTTP2_DEFAULT_MAX_FRAMESIZE,
-                                              conn->stream->allocator);
-    }
+          serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
+                    "Reading 0x%x frame, stream=%x, flags=0x%x\n",
+                    frametype, sid, frameflags);
 
-  return status;
+          /* If status is EOF then the frame doesn't have/declare a body */
+          switch (frametype)
+            {
+      /* ---------------------------------------------------- */
+              case HTTP2_FRAME_TYPE_DATA:
+              case HTTP2_FRAME_TYPE_HEADERS:
+              case HTTP2_FRAME_TYPE_PUSH_PROMISE:
+                if (h2->continuation_bucket)
+                  {
+                    h2->continuation_bucket = NULL;
+                    h2->continuation_streamid = 0;
+                    return APR_EAGAIN;
+                  }
+
+                stream = serf_http2__stream_get(h2, sid, TRUE, TRUE);
+
+                if (sid == 0)
+                  {
+                    /* DATA, HEADERS and PUSH_PROMISE:
+
+                      These frames MUST be associated with a stream.  If a
+                      XXX frame is received whose stream identifier field is 
0x0,
+                      the recipient MUST respond with a connection error
+                      (Section 5.4.1) of type PROTOCOL_ERROR. */
+                    return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
+                  }
+
+                reset_reason = 0;
+
+                if (frametype == HTTP2_FRAME_TYPE_DATA)
+                  {
+                    remaining = (apr_size_t)serf_bucket_get_remaining(body);
+
+                    if (h2->rl_window < remaining)
+                      {
+                        if (h2->enforce_flow_control)
+                          reset_reason = SERF_ERROR_HTTP2_FLOW_CONTROL_ERROR;
+
+                        h2->rl_window = 0;
+                      }
+                    else
+                      h2->rl_window -= remaining;
+
+                    if (stream)
+                      {
+                        if (stream->rl_window < remaining)
+                          {
+                            if (h2->enforce_flow_control)
+                              reset_reason = 
SERF_ERROR_HTTP2_FLOW_CONTROL_ERROR;
+
+                            stream->rl_window = 0;
+                          }
+                        else
+                          stream->rl_window -= remaining;
+                      }
+                  }
+
+                /* DATA, HEADERS and PUSH_PROMISE can have padding */
+                if (frameflags & HTTP2_FLAG_PADDED)
+                  {
+                    body = serf__bucket_http2_unpad_create(body, TRUE,
+                                                           h2->allocator);
+                  }
+
+                /* An HEADERS frame can have an included priority 'frame' */
+                if (frametype == HTTP2_FRAME_TYPE_HEADERS
+                                        && (frameflags & HTTP2_FLAG_PRIORITY))
+                  {
+                    body = serf_bucket_prefix_create(body,
+                                                     HTTP2_PRIORITY_DATA_SIZE,
+                                                     http2_handle_priority,
+                                                     stream, h2->allocator);
+                  }
+
+                if (frametype == HTTP2_FRAME_TYPE_PUSH_PROMISE)
+                  {
+                    body = serf_bucket_prefix_create(body,
+                                                     HTTP2_PROMISE_DATA_SIZE,
+                                                     http2_handle_promise,
+                                                     stream, h2->allocator);
+                  }
+
+                if (!stream)
+                  {
+                    if (!reset_reason)
+                      reset_reason = SERF_ERROR_HTTP2_STREAM_CLOSED;
+                  }
+                else
+                  switch (frametype)
+                    {
+                      case HTTP2_FRAME_TYPE_DATA:
+                        if (stream->status != H2S_OPEN
+                            && stream->status != H2S_HALFCLOSED_LOCAL)
+                          {
+                            reset_reason = SERF_ERROR_HTTP2_STREAM_CLOSED;
+                          }
+                        break;
+                      case HTTP2_FRAME_TYPE_HEADERS:
+                        if (stream->status != H2S_IDLE
+                            && stream->status != H2S_RESERVED_LOCAL
+                            && stream->status != H2S_OPEN
+                            && stream->status != H2S_HALFCLOSED_REMOTE)
+                          {
+                            reset_reason = SERF_ERROR_HTTP2_STREAM_CLOSED;
+                          }
+                        break;
+                      case HTTP2_FRAME_TYPE_PUSH_PROMISE:
+                        if (stream->status != H2S_OPEN
+                            && stream->status != H2S_HALFCLOSED_REMOTE)
+                          {
+                            reset_reason = SERF_ERROR_HTTP2_STREAM_CLOSED;
+                          }
+                        break;
+                    }
+
+                if (reset_reason)
+                  {
+                    if (stream)
+                      serf_http2__stream_reset(stream, reset_reason, TRUE);
+                    else
+                      serf_http2__enqueue_stream_reset(h2, sid, reset_reason);
+                  }
+
+                if (frametype == HTTP2_FRAME_TYPE_HEADERS
+                    || frametype == HTTP2_FRAME_TYPE_PUSH_PROMISE)
+                  {
+                    if (!(frameflags & HTTP2_FLAG_END_HEADERS))
+                      {
+                        /* This header frame is *directly* followed by
+                           continuation frames... We hide this from the
+                           stream code, by providing an aggregate that will
+                           read through the body of multiple frames */
+
+                        h2->continuation_bucket = serf_bucket_aggregate_create(
+                                                          h2->allocator);
+                        h2->continuation_streamid = sid;
+
+                        serf_bucket_aggregate_append(h2->continuation_bucket,
+                                                     body);
+
+                        serf_bucket_aggregate_hold_open(
+                                      h2->continuation_bucket,
+                                      http2_handle_continuation, h2);
+                      }
+
+                    if (stream && !reset_reason)
+                      {
+                        body = serf_http2__stream_handle_hpack(
+                                          stream, body, frametype,
+                                          (frameflags & HTTP2_FLAG_END_STREAM),
+                                          h2->rl_max_headersize,
+                                          h2->hpack_tbl, h2->config,
+                                          h2->allocator);
+                      }
+                    else
+                      {
+                        /* Even when we don't want to process the headers we
+                            must read them to update the HPACK state */
+                        body = serf__bucket_hpack_decode_create(
+                                          body, NULL, NULL, 
h2->rl_max_headersize,
+                                          h2->hpack_tbl, h2->allocator);
+                      }
+                  }
+                else /* We have a data bucket */
+                  {
+                    body = serf_http2__stream_handle_data(
+                                        stream, body, frametype,
+                                        (frameflags & HTTP2_FLAG_END_STREAM),
+                                        h2->config, h2->allocator);
+                  }
+
+                if (body)
+                  process_bucket = body; /* We will take care of discarding */
+                else
+                  {
+                    /* The stream wants to handle the reading itself */
+                    process_handler = serf_http2__stream_processor;
+                    process_baton = stream;
+                  }
+                break;
+
+      /* ---------------------------------------------------- */
+              case HTTP2_FRAME_TYPE_PRIORITY:
+                if (sid == 0)
+                  {
+                    /* The PRIORITY frame always identifies a stream.  If a
+                       PRIORITY frame is received with a stream identifier of
+                       0x0, the recipient MUST respond with a connection error
+                       (Section 5.4.1) of type PROTOCOL_ERROR.*/
+
+                    return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
+                  }
+                else if (serf_bucket_get_remaining(body)
+                                                  != HTTP2_PRIORITY_DATA_SIZE)
+                  {
+                    /* A PRIORITY frame with a length other than 5 octets MUST
+                       be treated as a stream error (Section 5.4.2) of type
+                       FRAME_SIZE_ERROR.*/
+
+                    /* ### But we currently upgrade this to a connection error 
*/
+                    return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+                  }
+
+                stream = serf_http2__stream_get(h2, sid, TRUE, TRUE);
+
+                if (stream)
+                  {
+                    body = serf_bucket_prefix_create(body,
+                                                     HTTP2_PRIORITY_DATA_SIZE,
+                                                     http2_handle_priority,
+                                                     stream, h2->allocator);
+                  }
+
+                /* Just reading will do the right thing now */
+                process_bucket = body;
+                break;
+
+      /* ---------------------------------------------------- */
+              case HTTP2_FRAME_TYPE_RST_STREAM:
+                if (sid == 0)
+                  {
+                    /* RST_STREAM frames MUST be associated with a stream.
+                       If a RST_STREAM frame is received with a stream
+                       identifier of 0x0, the recipient MUST treat this as a
+                       connection error (Section 5.4.1) of type PROTOCOL_ERROR.
+                     */
+
+                    return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
+                  }
+                else if (serf_bucket_get_remaining(body)
+                                                  != HTTP2_RST_DATA_SIZE)
+                  {
+                    /* A RST_STREAM frame with a length other than 4 octets 
MUST
+                       be treated as a connection error (Section 5.4.1) of type
+                       FRAME_SIZE_ERROR. */
+
+                    return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+                  }
+
+                stream = serf_http2__stream_get(h2, sid, TRUE, TRUE);
+
+                if (stream)
+                  {
+                    body = serf_bucket_prefix_create(body,
+                                                     
HTTP2_FRAME_TYPE_RST_STREAM,
+                                                     http2_handle_frame_reset,
+                                                     stream, h2->allocator);
+                  }
+
+                /* Just reading will do the right thing now */
+                process_bucket = body;
+                break;
+
+      /* ---------------------------------------------------- */
+              case HTTP2_FRAME_TYPE_SETTINGS:
+                if (sid != 0)
+                  {
+                    /* SETTINGS frames always apply to a connection, never a
+                       single stream. The stream identifier for a SETTINGS
+                       frame MUST be zero (0x0).  If an endpoint receives a
+                       SETTINGS frame whose stream identifier field is
+                       anything other than 0x0, the endpoint MUST respond
+                       with a connection error (Section 5.4.1) of type
+                       PROTOCOL_ERROR.
+                    */
+                    return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
+                  }
+
+                remaining = (apr_size_t)serf_bucket_get_remaining(body);
+                if (frameflags & HTTP2_FLAG_ACK)
+                  {
+                    if (remaining != 0)
+                      {
+                        /* When this bit is set, the payload of the SETTINGS
+                           frame MUST be empty. Receipt of a SETTINGS frame
+                           with the ACK flag set and a length field value
+                           other than 0 MUST be treated as a connection error
+                           (Section 5.4.1) of type FRAME_SIZE_ERROR. */
+                        return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+                      }
+                    h2->setting_acks++;
+                  }
+                else if ((remaining % HTTP2_SETTING_SIZE) != 0)
+                  {
+                    /* A SETTINGS frame with a length other than a multiple of
+                       6 octets MUST be treated as a connection error (Section
+                       5.4.1) of type FRAME_SIZE_ERROR. */
+                    return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+                  }
+                else
+                  {
+                    /* Just read everything... We checked it against our
+                       max-framesize */
+                    body = serf_bucket_prefix_create(body, remaining,
+                                                     http2_handle_settings, h2,
+                                                     h2->allocator);
+                  }
+
+                /* Just reading will do the right thing now */
+                process_bucket = body;
+                break;
+
+      /* ---------------------------------------------------- */
+              case HTTP2_FRAME_TYPE_PING:
+                if (sid != 0)
+                  {
+                    /* PING frames are not associated with any individual
+                       stream.  If a PING frame is received with a stream
+                       identifier field value other than 0x0, the recipient
+                       MUST respond with a connection error (Section 5.4.1)
+                       of type PROTOCOL_ERROR.*/
+                    return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
+                  }
+                else if (serf_bucket_get_remaining(body)
+                                                  != HTTP2_PING_DATA_SIZE)
+                  {
+                    /* Receipt of a PING frame with a length field value other
+                       than 8 MUST be treated as a connection error (Section
+                       5.4.1) of type FRAME_SIZE_ERROR.. */
+                    return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+                  }
+
+                body = serf_bucket_prefix_create(body, HTTP2_PING_DATA_SIZE,
+                                                 (frameflags & HTTP2_FLAG_ACK)
+                                                   ? http2_handle_ping
+                                                   : http2_handle_ping_ack,
+                                                 h2, h2->allocator);
+
+                /* Just reading will do the right thing now */
+                process_bucket = body;
+                break;
+      /* ---------------------------------------------------- */
+              case HTTP2_FRAME_TYPE_GOAWAY:
+                if (sid != 0)
+                  {
+                    /* The GOAWAY frame applies to the connection, not a
+                       specific stream. An endpoint MUST treat a GOAWAY frame
+                       with a stream identifier other than 0x0 as a connection
+                       error(Section 5.4.1) of type PROTOCOL_ERROR. */
+                    return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
+                  }
+
+                /* As the final go-away frame is best effort only we are not
+                   checking the bodysize against HTTP2_GOWAWAY_DATA_SIZE here.
+                   We'll see what we get in the goaway handler.
+
+                   Go away frames may contain additional opaque debug
+                   information at the end, so instead of reading
+                   HTTP2_GOWAWAY_DATA_SIZE bytes, we just read the whole frame.
+                 */
+                remaining = (apr_size_t)serf_bucket_get_remaining(body);
+
+                body = serf_bucket_prefix_create(body, remaining,
+                                                 http2_handle_goaway, h2,
+                                                 h2->allocator);
+
+                /* Just reading will do the right thing now */
+                process_bucket = body;
+                break;
+      /* ---------------------------------------------------- */
+              case HTTP2_FRAME_TYPE_WINDOW_UPDATE:
+                if (serf_bucket_get_remaining(body)
+                                != HTTP2_WINDOW_UPDATE_DATA_SIZE)
+                  {
+                    /* A WINDOW_UPDATE frame with a length other than 4 octets
+                       MUST be treated as a connection error (Section 5.4.1)
+                       of type FRAME_SIZE_ERROR. */
+                    return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+                  }
+
+                if (sid == 0)
+                  {
+                    body = serf_bucket_prefix_create(
+                                  body,
+                                  HTTP2_WINDOW_UPDATE_DATA_SIZE,
+                                  http2_handle_connection_window_update, h2,
+                                  h2->allocator);
+                  }
+                else
+                  {
+                    stream = serf_http2__stream_get(h2, sid, TRUE, TRUE);
+
+                    if (stream)
+                      body = serf_bucket_prefix_create(
+                                  body,
+                                  HTTP2_WINDOW_UPDATE_DATA_SIZE,
+                                  http2_handle_stream_window_update, stream,
+                                  h2->allocator);
+                  }
+
+                /* Just reading will do the right thing now */
+                process_bucket = body;
+                break;
+
+      /* ---------------------------------------------------- */
+              case HTTP2_FRAME_TYPE_CONTINUATION:
+                if (!h2->continuation_bucket
+                    || sid != h2->continuation_streamid)
+                  {
+                    /* A CONTINUATION frame MUST be preceded by a HEADERS,
+                       PUSH_PROMISE or CONTINUATION frame without the
+                       END_HEADERS flag set. A recipient that observes
+                       violation of this rule MUST respond with a connection
+                       error(Section 5.4.1) of type PROTOCOL_ERROR. */
+                    h2->continuation_bucket = NULL;
+                    h2->continuation_streamid = 0;
+                    return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
+                  }
+
+                serf_bucket_aggregate_append(h2->continuation_bucket, body);
+
+                if (frameflags & HTTP2_FLAG_END_HEADERS)
+                  {
+                    h2->continuation_bucket = NULL;
+                    h2->continuation_streamid = 0;
+                  }
+
+                return APR_SUCCESS;
+
+      /* ---------------------------------------------------- */
+              default:
+                /* We explicitly ignore all other frames as required,
+                   so reading will do the right thing now */
+                process_bucket = body;
+            } /* switch */
+
+          if (body)
+            serf_bucket_set_config(body, h2->config);
+
+          SERF_H2_assert(h2->processor == NULL);
+
+          if (process_handler)
+            {
+              h2->processor = process_handler;
+              h2->processor_baton = process_baton;
+            }
+          else
+            {
+              SERF_H2_assert(process_bucket != NULL);
+              h2->processor = http2_bucket_processor;
+              h2->processor_baton = process_bucket;
+            }
+        }
+    } /* while(TRUE) */
 }
 
 static apr_status_t
@@ -510,7 +1241,7 @@ http2_protocol_read(serf_connection_t *c
       conn->ctx->dirty_pollset = 1;
     }
 
-  status = http2_read(conn);
+  status = http2_process(conn->protocol_baton);
 
   if (!status)
     return APR_SUCCESS;
@@ -601,8 +1332,8 @@ serf_http2__allocate_stream_id(void *bat
   */
   if (stream->streamid < 0)
     {
-      stream->streamid = stream->h2->next_local_streamid;
-      stream->h2->next_local_streamid += 2;
+      stream->streamid = stream->h2->lr_next_streamid;
+      stream->h2->lr_next_streamid += 2;
 
       if (stream->status == H2S_INIT)
         stream->status = H2S_IDLE;
@@ -628,7 +1359,7 @@ serf_http2__stream_get(serf_http2_protoc
   if (streamid < 0)
     return NULL;
 
-  for (stream = h2->first; stream; stream->next)
+  for (stream = h2->first; stream; stream = stream->next)
     {
       if (stream->streamid == streamid)
         {
@@ -640,13 +1371,13 @@ serf_http2__stream_get(serf_http2_protoc
     }
 
   if (create_for_remote
-      && (streamid & 0x01) == (h2->next_remote_streamid & 0x01))
+      && (streamid & 0x01) == (h2->rl_next_streamid & 0x01))
     {
       serf_http2_stream_t *rs;
       stream = serf_http2__stream_create(h2, streamid,
-                                         h2->default_lr_window,
-                                         h2->default_rl_window,
-                                         h2->conn->allocator);
+                                         h2->lr_default_window,
+                                         h2->rl_default_window,
+                                         h2->allocator);
 
       if (h2->first)
         {
@@ -657,10 +1388,10 @@ serf_http2__stream_get(serf_http2_protoc
       else
         h2->last = h2->first = stream;
 
-      if (streamid < h2->next_remote_streamid)
+      if (streamid < h2->rl_next_streamid)
         stream->status = H2S_CLOSED;
       else
-        h2->next_remote_streamid = (streamid + 2);
+        h2->rl_next_streamid = (streamid + 2);
 
       for (rs = h2->first; rs; rs = rs->next)
         {
@@ -681,3 +1412,11 @@ serf_http2__stream_get(serf_http2_protoc
     }
   return NULL;
 }
+
+apr_status_t
+serf_http2__enqueue_stream_reset(serf_http2_protocol_t *h2,
+                                 apr_int32_t streamid,
+                                 apr_status_t reason)
+{
+  return APR_SUCCESS;
+}

Modified: serf/trunk/protocols/http2_protocol.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.h?rev=1711679&r1=1711678&r2=1711679&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.h (original)
+++ serf/trunk/protocols/http2_protocol.h Sat Oct 31 19:53:57 2015
@@ -24,6 +24,13 @@
 #include "serf.h"
 #include "serf_private.h"
 
+#ifdef _DEBUG
+#include <assert.h>
+#define SERF_H2_assert(x) assert(x)
+#else
+#define SERF_H2_assert(x) ((void)0)
+#endif
+
 #define SERF_LOGHTTP2 \
     SERF_LOGCOMP_PROTOCOL, (__FILE__ ":" APR_STRINGIFY(__LINE__))
 
@@ -34,9 +41,19 @@ extern "C" {
 /* ********** HTTP2 Frame types ********** */
 
 /* The standard maximum framesize. Always supported */
-#define HTTP2_DEFAULT_MAX_FRAMESIZE 16384
+#define HTTP2_DEFAULT_MAX_FRAMESIZE     16384
 /* The default stream and connection window size before updates */
-#define HTTP2_DEFAULT_WINDOW_SIZE   65535
+#define HTTP2_DEFAULT_WINDOW_SIZE       65535
+#define HTTP2_DEFAULT_MAX_CONCURRENT    0xFFFFFFFF
+
+#define HTTP2_PRIORITY_DATA_SIZE        5
+#define HTTP2_RST_DATA_SIZE             4
+#define HTTP2_PROMISE_DATA_SIZE         4
+#define HTTP2_PING_DATA_SIZE            8
+#define HTTP2_GOWAWAY_DATA_SIZE         8
+#define HTTP2_WINDOW_UPDATE_DATA_SIZE   4
+
+#define HTTP2_SETTING_SIZE              6
 
 /* Frame type is an 8 bit unsigned integer */
 
@@ -87,17 +104,20 @@ extern "C" {
 
 /* ------------------------------------- */
 typedef struct serf_http2_protocol_t serf_http2_protocol_t;
+typedef struct serf_http2_stream_data_t serf_http2_stream_data_t;
+
 typedef struct serf_http2_stream_t
 {
   struct serf_http2_protocol_t *h2;
   serf_bucket_alloc_t *alloc;
 
+  /* Opaque implementation details */
+  serf_http2_stream_data_t *data;
+
   /* Linked list of currently existing streams */
   struct serf_http2_stream_t *next;
   struct serf_http2_stream_t *prev;
 
-  serf_request_t *request; /* May be NULL as streams may outlive requests */
-
   apr_int64_t lr_window; /* local->remote */
   apr_int64_t rl_window; /* remote->local */
 
@@ -119,6 +139,10 @@ typedef struct serf_http2_stream_t
   /* TODO: Priority, etc. */
 } serf_http2_stream_t;
 
+typedef apr_status_t (* serf_http2_processor_t)(void *baton,
+                                                serf_http2_protocol_t *h2,
+                                                serf_bucket_t *body);
+
 /* Enques an http2 frame for output */
 apr_status_t
 serf_http2__enqueue_frame(serf_http2_protocol_t *h2,
@@ -133,6 +157,12 @@ serf_http2__stream_create(serf_http2_pro
                           apr_uint32_t rl_window,
                           serf_bucket_alloc_t *alloc);
 
+
+apr_status_t
+serf_http2__enqueue_stream_reset(serf_http2_protocol_t *h2,
+                                 apr_int32_t streamid,
+                                 apr_status_t reason);
+
 /* Allocates a new stream id for a stream.
    BATON is a serf_http2_stream_t * instance.
 
@@ -147,10 +177,10 @@ void
 serf_http2__stream_cleanup(serf_http2_stream_t *stream);
 
 serf_http2_stream_t *
-serf_http2__stream_get_by_id(serf_http2_protocol_t *h2,
-                             apr_int32_t streamid,
-                             int create_for_remote,
-                             int move_first);
+serf_http2__stream_get(serf_http2_protocol_t *h2,
+                       apr_int32_t streamid,
+                       int create_for_remote,
+                       int move_first);
 
 /* Sets up STREAM to handle REQUEST */
 apr_status_t
@@ -158,7 +188,33 @@ serf_http2__stream_setup_request(serf_ht
                                  serf_hpack_table_t *hpack_tbl,
                                  serf_request_t *request);
 
+apr_status_t
+serf_http2__stream_reset(serf_http2_stream_t *stream,
+                         apr_status_t reason,
+                         int local_reset);
+
+serf_bucket_t *
+serf_http2__stream_handle_hpack(serf_http2_stream_t *stream,
+                                serf_bucket_t *bucket,
+                                unsigned char frametype,
+                                int end_stream,
+                                apr_size_t max_entry_size,
+                                serf_hpack_table_t *hpack_tbl,
+                                serf_config_t *config,
+                                serf_bucket_alloc_t *allocator);
+
+serf_bucket_t *
+serf_http2__stream_handle_data(serf_http2_stream_t *stream,
+                               serf_bucket_t *bucket,
+                               unsigned char frametype,
+                               int end_stream,
+                               serf_config_t *config,
+                               serf_bucket_alloc_t *allocator);
 
+apr_status_t
+serf_http2__stream_processor(void *baton,
+                             serf_http2_protocol_t *h2,
+                             serf_bucket_t *bucket);
 
 #ifdef __cplusplus
 }

Modified: serf/trunk/protocols/http2_stream.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_stream.c?rev=1711679&r1=1711678&r2=1711679&view=diff
==============================================================================
--- serf/trunk/protocols/http2_stream.c (original)
+++ serf/trunk/protocols/http2_stream.c Sat Oct 31 19:53:57 2015
@@ -29,6 +29,12 @@
 #include "protocols/http2_buckets.h"
 #include "protocols/http2_protocol.h"
 
+struct serf_http2_stream_data_t
+{
+  serf_request_t *request; /* May be NULL as streams may outlive requests */
+  serf_bucket_t *response_agg;
+};
+
 serf_http2_stream_t *
 serf_http2__stream_create(serf_http2_protocol_t *h2,
                           apr_int32_t streamid,
@@ -41,8 +47,12 @@ serf_http2__stream_create(serf_http2_pro
   stream->h2 = h2;
   stream->alloc = alloc;
 
+  stream->data = serf_bucket_mem_alloc(alloc, sizeof(*stream->data));
+
   stream->next = stream->prev = NULL;
-  stream->request = NULL;
+
+  stream->data->request = NULL;
+  stream->data->response_agg = NULL;
 
   stream->lr_window = lr_window;
   stream->rl_window = rl_window;
@@ -60,6 +70,14 @@ serf_http2__stream_create(serf_http2_pro
 void
 serf_http2__stream_cleanup(serf_http2_stream_t *stream)
 {
+  if (stream->data)
+    {
+      if (stream->data->response_agg)
+        serf_bucket_destroy(stream->data->response_agg);
+
+      serf_bucket_mem_free(stream->alloc, stream->data);
+      stream->data = NULL;
+    }
   serf_bucket_mem_free(stream->alloc, stream);
 }
 
@@ -72,7 +90,7 @@ serf_http2__stream_setup_request(serf_ht
   serf_bucket_t *hpack;
   serf_bucket_t *body;
 
-  stream->request = request;
+  stream->data->request = request;
 
   if (!request->req_bkt)
     {
@@ -112,3 +130,145 @@ serf_http2__stream_setup_request(serf_ht
 
   return APR_SUCCESS;
 }
+
+apr_status_t
+serf_http2__stream_reset(serf_http2_stream_t *stream,
+                         apr_status_t reason,
+                         int local_reset)
+{
+  stream->status = H2S_CLOSED;
+
+  if (stream->streamid < 0)
+    return APR_SUCCESS;
+
+  if (local_reset)
+    return serf_http2__enqueue_stream_reset(stream->h2,
+                                            stream->streamid,
+                                            reason);
+
+  return APR_SUCCESS;
+}
+
+apr_status_t
+stream_response_eof(void *baton,
+                    serf_bucket_t *aggregate_bucket)
+{
+  serf_http2_stream_t *stream = baton;
+
+  switch (stream->status)
+    {
+      case H2S_CLOSED:
+      case H2S_HALFCLOSED_REMOTE:
+        return APR_EOF;
+      default:
+        return APR_EAGAIN;
+    }
+}
+
+serf_bucket_t *
+serf_http2__stream_handle_hpack(serf_http2_stream_t *stream,
+                                serf_bucket_t *bucket,
+                                unsigned char frametype,
+                                int end_stream,
+                                apr_size_t max_entry_size,
+                                serf_hpack_table_t *hpack_tbl,
+                                serf_config_t *config,
+                                serf_bucket_alloc_t *allocator)
+{
+  if (!stream->data->response_agg)
+    {
+      stream->data->response_agg = serf_bucket_aggregate_create(stream->alloc);
+      serf_bucket_aggregate_hold_open(stream->data->response_agg,
+                                      stream_response_eof, stream);
+      serf_bucket_set_config(stream->data->response_agg, config);
+    }
+
+  bucket = serf__bucket_hpack_decode_create(bucket, NULL, NULL, max_entry_size,
+                                            hpack_tbl, allocator);
+
+  serf_bucket_aggregate_append(stream->data->response_agg, bucket);
+
+  if (end_stream)
+    {
+      if (stream->status == H2S_HALFCLOSED_LOCAL)
+        stream->status = H2S_CLOSED;
+      else
+        stream->status = H2S_HALFCLOSED_REMOTE;
+    }
+
+  return NULL;
+}
+
+serf_bucket_t *
+serf_http2__stream_handle_data(serf_http2_stream_t *stream,
+                               serf_bucket_t *bucket,
+                               unsigned char frametype,
+                               int end_stream,
+                               serf_config_t *config,
+                               serf_bucket_alloc_t *allocator)
+{
+  if (!stream->data->response_agg)
+    {
+      stream->data->response_agg = serf_bucket_aggregate_create(stream->alloc);
+      serf_bucket_aggregate_hold_open(stream->data->response_agg,
+                                      stream_response_eof, stream);
+
+      serf_bucket_set_config(stream->data->response_agg, config);
+    }
+
+  serf_bucket_aggregate_append(stream->data->response_agg, bucket);
+
+  if (end_stream)
+    {
+      if (stream->status == H2S_HALFCLOSED_LOCAL)
+        stream->status = H2S_CLOSED;
+      else
+        stream->status = H2S_HALFCLOSED_REMOTE;
+    }
+
+  return NULL;
+}
+
+apr_status_t
+serf_http2__stream_processor(void *baton,
+                             serf_http2_protocol_t *h2,
+                             serf_bucket_t *bucket)
+{
+  serf_http2_stream_t *stream = baton;
+  apr_status_t status = APR_SUCCESS;
+
+  if (!stream->data->response_agg)
+    return APR_EAGAIN;
+
+  /* ### TODO: Delegate to request */
+  while (!status)
+    {
+      const char *data;
+      apr_size_t len;
+
+      status = serf_bucket_read(stream->data->response_agg,
+                                SERF_READ_ALL_AVAIL, &data, &len);
+
+      if (!SERF_BUCKET_READ_ERROR(status))
+        {
+          char *printable = serf_bstrmemdup(bucket->allocator, data, len);
+          char *c;
+
+          for (c = printable; *c; c++)
+            {
+              if (((*c < ' ') || (*c > '\x7E')) && !strchr("\r\n", *c)) /* 
Poor mans isctrl*/
+                {
+                  *c = ' ';
+                }
+            }
+
+#ifdef _DEBUG
+          fputs(printable, stdout);
+#endif
+
+          serf_bucket_mem_free(bucket->allocator, printable);
+        }
+    }
+
+  return status;
+}


Reply via email to